package async_smtp
SMTP client and server
Install
Dune Dependency
Authors
Maintainers
Sources
v0.17.0.tar.gz
sha256=c416027c2537e22129f7049bf03ec3f867557d47b194d7e91d72c399fe656b27
doc/src/async_smtp/message.ml.html
Source file message.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
module Stable = struct open Core.Core_stable open Email_message.Email_message_stable open Async_smtp_types.Async_smtp_types_stable module Time = Time_float_unix.Stable module Unstable_mail_log = Mail_log module Mail_log = Mail_log.Stable module Retry_interval = Smtp_envelope.Retry_interval module Quarantine_reason = Quarantine_reason.Stable module Id = struct module V1 = struct include String.V1 let to_string t = t let of_string t = t let%expect_test _ = print_endline [%bin_digest: t]; [%expect {| d9a8da25d5656b016fb4dbdc2e4197fb |}] ;; end end module Status = struct module V1 = struct type t = [ `Send_now | `Send_at of Time.V1.t | `Sending | `Frozen | `Removed | `Quarantined of Quarantine_reason.V1.t | `Delivered ] [@@deriving sexp, bin_io] let%expect_test _ = print_endline [%bin_digest: t]; [%expect {| 424465fabd3656a7dfa206491ab934af |}] ;; end end module V1 = struct type t = { spool_dir : string ; id : Id.V1.t ; flows : Mail_log.Flows.V1.t [@default Unstable_mail_log.Flows.none] ; parent_id : Smtp_envelope.Id.V1.t ; spool_date : Time.V1.t ; next_hop_choices : [ `Inet of Host_and_port.V1.t ] list ; mutable retry_intervals : Retry_interval.V2.t list ; mutable remaining_recipients : Email_address.V1.t list ; mutable failed_recipients : Email_address.V1.t list ; mutable relay_attempts : (Time.V1.t * Error.V1.t) list ; mutable status : Status.V1.t ; mutable envelope_info : Smtp_envelope.Info.V1.t } [@@deriving sexp, bin_io] let%expect_test _ = print_endline [%bin_digest: t]; [%expect {| 49b13cef6568275307ed409f67857b25 |}] ;; end module V2 = struct type t = { spool_dir : string ; id : Id.V1.t ; flows : Mail_log.Flows.V1.t [@default Unstable_mail_log.Flows.none] ; parent_id : Smtp_envelope.Id.V1.t ; spool_date : Time.V1.t ; next_hop_choices : [ `Inet of Host_and_port.V1.t ] list ; mutable retry_intervals : Retry_interval.V2.t list ; mutable remaining_recipients : Email_address.V1.t list ; mutable failed_recipients : Email_address.V1.t list ; mutable relay_attempts : (Time.V1.t * Error.V1.t) list ; mutable status : Status.V1.t ; mutable envelope_info : Smtp_envelope.Info.V2.t } [@@deriving sexp, bin_io] let of_v1 (v1 : V1.t) = { spool_dir = v1.spool_dir ; id = v1.id ; flows = v1.flows ; parent_id = v1.parent_id ; spool_date = v1.spool_date ; next_hop_choices = v1.next_hop_choices ; retry_intervals = v1.retry_intervals ; remaining_recipients = v1.remaining_recipients ; failed_recipients = v1.failed_recipients ; relay_attempts = v1.relay_attempts ; status = v1.status ; envelope_info = Smtp_envelope.Info.V2.of_v1 v1.envelope_info } ;; let%expect_test _ = print_endline [%bin_digest: t]; [%expect {| 586728abcc44ce512a1d7ef9abb4f35b |}] ;; end module V3 = struct type t = { spool_dir : string ; id : Id.V1.t ; flows : Mail_log.Flows.V1.t [@default Unstable_mail_log.Flows.none] ; parent_id : Smtp_envelope.Id.V1.t ; spool_date : Time.V1.t ; next_hop_choices : Host_and_port.V1.t list ; mutable retry_intervals : Retry_interval.V2.t list ; mutable remaining_recipients : Email_address.V1.t list ; mutable failed_recipients : Email_address.V1.t list ; mutable relay_attempts : (Time.V1.t * Error.V1.t) list ; mutable status : Status.V1.t ; mutable envelope_info : Smtp_envelope.Info.V2.t } [@@deriving sexp, bin_io] let of_v2 (v2 : V2.t) = { spool_dir = v2.spool_dir ; id = v2.id ; flows = v2.flows ; parent_id = v2.parent_id ; spool_date = v2.spool_date ; next_hop_choices = Core.List.map v2.next_hop_choices ~f:(fun (`Inet i) -> i) ; retry_intervals = v2.retry_intervals ; remaining_recipients = v2.remaining_recipients ; failed_recipients = v2.failed_recipients ; relay_attempts = v2.relay_attempts ; status = v2.status ; envelope_info = v2.envelope_info } ;; let of_v1 v1 = of_v2 (V2.of_v1 v1) let%expect_test _ = print_endline [%bin_digest: t]; [%expect {| 7c91581c5678eddbae053a4a79d731a0 |}] ;; end end open Core open Async open Async_smtp_types module Time = Time_float_unix (* Includes parent id and an incrementing counter. *) module Id = struct include String let counter = ref 0 let create ~original_msg = let parent_id = Smtp_envelope.id original_msg in let t = sprintf !"%{Smtp_envelope.Id}-%s" parent_id (Smtp_envelope.Id.urlbase64_encode_float ~length:6 (!counter |> Int.to_float) |> Smtp_envelope.Id.to_string) in incr counter; t ;; end module Status = Stable.Status.V1 module Queue = struct type t = | Active | Frozen | Removed | Quarantine [@@deriving sexp, enumerate, compare] let to_string = function | Active -> "active" | Frozen -> "frozen" | Removed -> "removed" | Quarantine -> "quarantine" ;; let to_dirname = to_string let of_status status = match status with | `Frozen -> Some Frozen | `Send_now | `Send_at _ | `Sending -> Some Active | `Removed -> Some Removed | `Quarantined _ -> Some Quarantine | `Delivered -> None ;; let of_status' status = match of_status status with | Some queue -> Ok queue | None -> Or_error.error_s [%message "Specified status not associated with a queue" (status : Status.t)] ;; end module Data = struct type t = Email.t let map_headers headers ~encode_or_decode = let f = match encode_or_decode with | `Encode -> fun s -> Dot_escaping.encode_line_string s |> String_monoid.to_string | `Decode -> Dot_escaping.decode_line_string in Email_headers.map' ~normalize:`None headers ~f:(fun ~name ~value -> f name, value) ;; let map_raw_content_bstr body ~encode_or_decode = let eol, f = match encode_or_decode with | `Encode -> "\r\n", Dot_escaping.encode_line_bigstring | `Decode -> "\n", fun s -> Dot_escaping.decode_line_bigstring s |> String_monoid.of_bigstring in (* Most likely, the output buffer will be the same length as the input buffer. Give ourselves some leeway to avoid having to resize. *) let buffer = Bigbuffer.create (Bigstring_shared.length body + 100) in let add_transformed_line line = String_monoid.output_bigbuffer (f (Bigstring_shared.to_bigstring line)) buffer in let rec loop seq = match Sequence.hd seq with | None -> () | Some line -> add_transformed_line line; (match Sequence.tl seq with | None -> () | Some tail -> (* Peek the sequence so we don't add an eol marker for the last line. *) if Option.is_some (Sequence.hd tail) then Bigbuffer.add_string buffer eol; loop tail) in loop (Bigstring_shared.lines_seq ~include_empty_last_line:() body); Bigstring_shared.of_bigbuffer_volatile buffer ;; let map_raw_content raw_content ~encode_or_decode = Option.map (Email.Raw_content.Expert.to_bigstring_shared_option raw_content) ~f:(map_raw_content_bstr ~encode_or_decode) |> Email.Raw_content.Expert.of_bigstring_shared_option ;; let map_email t ~encode_or_decode = Email.create ~headers:(map_headers (Email.headers t) ~encode_or_decode) ~raw_content:(map_raw_content (Email.raw_content t) ~encode_or_decode) ;; let to_email = map_email ~encode_or_decode:`Decode let of_email = map_email ~encode_or_decode:`Encode let load path = Deferred.Or_error.try_with ~run:`Schedule ~rest:`Log (fun () -> let%bind contents = Reader.file_contents path in return (Email.of_string contents)) ;; let save ?temp_file t path = Deferred.Or_error.try_with ~run:`Schedule ~rest:`Log (fun () -> Email.save ?temp_file ~fsync:true ~eol_except_raw_content:`CRLF t path) ;; end (* A value of type t should only be modified via [On_disk_spool]. This guarantees that all changes are properly flushed to disk. *) type t = Stable.V3.t = { spool_dir : string ; id : Id.t ; flows : Mail_log.Flows.t ; parent_id : Smtp_envelope.Id.t ; spool_date : Time.t ; next_hop_choices : Host_and_port.t list ; mutable retry_intervals : Smtp_envelope.Retry_interval.t list ; mutable remaining_recipients : Email_address.Stable.V1.t list ; mutable failed_recipients : Email_address.Stable.V1.t list ; mutable relay_attempts : (Time.t * Error.t) list ; mutable status : Status.t ; mutable envelope_info : Smtp_envelope.Info.t } [@@deriving fields ~getters, sexp_of] (* type alias to make code more readable below *) type meta = t [@@deriving sexp_of] let compare t1 t2 = Sexp.compare (sexp_of_t t1) (sexp_of_t t2) let status t = match t.status with | `Send_at time when Time.(time < now ()) -> `Send_now | status -> status ;; let time_on_spool t = Time.diff (Time.now ()) t.spool_date let last_relay_attempt t = List.hd t.relay_attempts let set_status t x = t.status <- x let set_remaining_recipients t x = t.remaining_recipients <- x let set_failed_recipients t x = t.failed_recipients <- x let set_retry_intervals t x = t.retry_intervals <- x let add_retry_intervals t x = t.retry_intervals <- x @ t.retry_intervals let add_relay_attempt t x = t.relay_attempts <- x :: t.relay_attempts let move_failed_recipients_to_remaining_recipients t = t.remaining_recipients <- t.remaining_recipients @ t.failed_recipients; t.failed_recipients <- [] ;; let of_envelope_batch envelope_batch ~gen_id ~spool_dir ~spool_date ~failed_recipients ~relay_attempts ~parent_id ~status ~flows = let email_body = Smtp_envelope.Routed.Batch.email_body envelope_batch in (* We make sure to only map the email body once. *) let data_raw_content = Data.map_raw_content email_body ~encode_or_decode:`Encode in Deferred.Or_error.List.map ~how:`Sequential (Smtp_envelope.Routed.Batch.envelopes envelope_batch) ~f:(fun envelope -> let headers = Smtp_envelope.Bodiless.Routed.headers envelope |> Data.map_headers ~encode_or_decode:`Encode in let envelope_info = Smtp_envelope.Bodiless.Routed.envelope_info envelope in let data = Email.create ~headers ~raw_content:data_raw_content in let next_hop_choices = Smtp_envelope.Bodiless.Routed.next_hop_choices envelope in let retry_intervals = Smtp_envelope.Bodiless.Routed.retry_intervals envelope in let remaining_recipients = Smtp_envelope.Bodiless.Routed.recipients envelope in gen_id () >>|? fun id -> ( { spool_dir ; id ; flows ; parent_id ; spool_date ; next_hop_choices ; retry_intervals ; remaining_recipients ; failed_recipients ; relay_attempts ; status ; envelope_info } , data , Smtp_envelope.Routed.of_bodiless envelope email_body )) ;; module On_disk = struct module Metadata = struct module T = struct include Stable.V3 let t_of_sexp sexp = try t_of_sexp sexp with | error_from_v3 -> (try Stable.V2.t_of_sexp sexp |> Stable.V3.of_v2 with | error_from_v2 -> (try Stable.V1.t_of_sexp sexp |> Stable.V3.of_v1 with | error_from_v1 -> raise_s [%message "[On_disk.Metadata.t_of_sexp]" (error_from_v3 : exn) (error_from_v2 : exn) (error_from_v1 : exn)])) ;; end include T include Sexpable.To_stringable (T) end module Data = Data module Queue = Queue module Name_generator = struct module Unique_name = Id type t = Smtp_envelope.t let next original_msg ~attempt:_ = Id.create ~original_msg end module Throttle = struct (* Don't hit the max open files system limit *) let t = Throttle.create ~continue_on_error:true ~max_concurrent_jobs:400 let enqueue f = Throttle.enqueue t f end end module On_disk_spool = Multispool.Make (On_disk)
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>