package async_smtp

  1. Overview
  2. Docs
SMTP client and server

Install

Dune Dependency

Authors

Maintainers

Sources

v0.17.0.tar.gz
sha256=c416027c2537e22129f7049bf03ec3f867557d47b194d7e91d72c399fe656b27

doc/src/async_smtp/client.ml.html

Source file client.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
open Core
open Async
open Async_smtp_types
module Config = Client_config
include Client_raw
module Log = Mail_log

let with_reset t ~log ~flows ~component ~f =
  let component = component @ [ "reset" ] in
  let reset t =
    send_receive t ~log ~flows ~component ~here:[%here] Smtp_command.Reset
    >>=? function
    | `Bsmtp -> return (Ok ())
    | `Received { Smtp_reply.code = `Ok_completed_250; _ } -> return (Ok ())
    | `Received reject ->
      return (Or_error.errorf !"Unexpected response to RSET: %{Smtp_reply}" reject)
  in
  let%bind result =
    Deferred.Or_error.try_with_join ~run:`Schedule ~rest:`Log (fun () -> f t)
  in
  let%map (_ : unit Or_error.t) = reset t in
  result
;;

module Envelope_status = struct
  type envelope_id = string [@@deriving sexp]

  type rejected_recipients = (Email_address.Stable.V1.t * Smtp_reply.t) list
  [@@deriving sexp]

  type ok = envelope_id * rejected_recipients [@@deriving sexp]

  type err =
    [ `Rejected_sender of Smtp_reply.t
    | `Rejected_all_recipients of rejected_recipients
    | `Rejected_sender_and_recipients of Smtp_reply.t * rejected_recipients
    | `Rejected_body of Smtp_reply.t * rejected_recipients
    ]
  [@@deriving sexp]

  type t = (ok, err) Result.t [@@deriving sexp]

  let rejected_recipients_to_string ~ok ~err rejected_recipients =
    if not (List.is_empty rejected_recipients)
    then
      rejected_recipients
      |> List.map ~f:(fun (email, reject) ->
           sprintf !"%{Email_address} (%{Smtp_reply})" email reject)
      |> String.concat ~sep:", "
      |> sprintf "%s %s" err
    else ok
  ;;

  let to_string = function
    | Ok (envelope_id, rejected_recipients) ->
      sprintf
        "Envelope accepted (%s)%s"
        envelope_id
        (rejected_recipients_to_string
           ~ok:""
           ~err:" but rejected recipients: "
           rejected_recipients)
    | Error (`Rejected_sender reject) -> sprintf !"Rejected sender (%{Smtp_reply})" reject
    | Error (`Rejected_all_recipients rejected_recipients) ->
      rejected_recipients_to_string
        ~ok:"No Recipients"
        ~err:"All recipients rejected: "
        rejected_recipients
    | Error (`Rejected_sender_and_recipients (reject, rejected_recipients)) ->
      sprintf
        !"Rejected combination of Sender and Recipients (%{Smtp_reply})%s"
        reject
        (rejected_recipients_to_string
           ~ok:""
           ~err:" and rejected recipients: "
           rejected_recipients)
    | Error (`Rejected_body (reject, rejected_recipients)) ->
      sprintf
        !"Rejected envelope (%{Smtp_reply})%s"
        reject
        (rejected_recipients_to_string
           ~ok:""
           ~err:" and rejected recipients: "
           rejected_recipients)
  ;;

  let ok_or_error ~allow_rejected_recipients = function
    | Ok (envelope_id, rejected_recipients)
      when allow_rejected_recipients || List.is_empty rejected_recipients ->
      Ok envelope_id
    | error -> Error (Error.of_thunk (fun () -> to_string error))
  ;;

  let ok_exn ~allow_rejected_recipients = function
    | Ok (envelope_id, rejected_recipients)
      when allow_rejected_recipients || List.is_empty rejected_recipients -> envelope_id
    | error -> failwith (to_string error)
  ;;
end

(* Better names welcome. *)
module Smtp_monad = struct
  type 'a t = ('a, Envelope_status.err) Result.t Or_error.t Deferred.t

  let ( >>= ) (a : 'a t) f =
    a
    >>=? function
    | Error err -> Deferred.Or_error.return (Error err)
    | Ok a -> f a
  ;;
end

let ( >>=?? ) = Smtp_monad.( >>= )

let flush_writer_with_timeout ~timeout ~writer =
  match%map Clock.with_timeout timeout (Writer.flushed writer) with
  | `Timeout ->
    Or_error.errorf !"Timeout %{Time_float.Span} waiting for data to flush" timeout
  | `Result () -> Ok ()
;;

module Expert = struct
  let send_envelope t ~log ?flows ?(component = []) ~send_data envelope_info =
    let flows =
      match flows with
      | None -> Log.Flows.create `Outbound_envelope
      | Some flows -> flows
    in
    let component = component @ [ "send-envelope" ] in
    with_reset t ~log ~flows ~component ~f:(fun t ->
      Log.info
        log
        (lazy
          (Log.Message.create
             ~here:[%here]
             ~flows
             ~component
             ?remote_address:(remote_address t)
             ?remote_ip_address:(remote_ip_address t)
             ?local_ip_address:(local_ip_address t)
             ~session_marker:`Sending
             "sending"));
      let command =
        Smtp_command.Sender
          (Smtp_envelope.Sender.to_string_with_arguments
             ( Smtp_envelope.Info.sender envelope_info
             , Smtp_envelope.Info.sender_args envelope_info ))
      in
      send_receive
        t
        ~log
        ~flows
        ~component:(component @ [ "sender" ])
        ~here:[%here]
        command
      >>=? (function
             | `Bsmtp -> return (Ok (Ok ()))
             | `Received { Smtp_reply.code = `Ok_completed_250; _ } -> return (Ok (Ok ()))
             | `Received reply ->
               Log.info
                 log
                 (lazy
                   (Log.Message.create
                      ~here:[%here]
                      ~flows
                      ~component:(component @ [ "sender" ])
                      ~sender:(`Sender (Smtp_envelope.Info.sender envelope_info))
                      ~command
                      ~reply
                      "send rejected"));
               return (Ok (Error (`Rejected_sender reply))))
      >>=?? fun () ->
      Deferred.Or_error.List.map
        ~how:`Sequential
        (Smtp_envelope.Info.recipients envelope_info)
        ~f:(fun recipient ->
        let command = Smtp_command.Recipient (recipient |> Email_address.to_string) in
        send_receive
          t
          ~log
          ~flows
          ~component:(component @ [ "recipient" ])
          ~here:[%here]
          command
        >>|? function
        | `Bsmtp -> First recipient
        | `Received { Smtp_reply.code = `Ok_completed_250; _ } -> First recipient
        | `Received reply ->
          Log.info
            log
            (lazy
              (Log.Message.create
                 ~here:[%here]
                 ~flows
                 ~component:(component @ [ "recipient" ])
                 ~recipients:[ `Email recipient ]
                 ~command
                 ~reply
                 "send rejected"));
          Second (recipient, reply))
      >>|? List.partition_map ~f:Fn.id
      >>|? (function
             | [], rejected_recipients ->
               Error (`Rejected_all_recipients rejected_recipients)
             | accepted_recipients, rejected_recipients ->
               Ok (accepted_recipients, rejected_recipients))
      >>=?? fun (accepted_recipients, rejected_recipients) ->
      let command = Smtp_command.Data in
      send_receive t ~log ~flows ~component:(component @ [ "data" ]) ~here:[%here] command
      >>=? (function
             | `Bsmtp -> return (Ok (Ok ()))
             | `Received { Smtp_reply.code = `Start_mail_input_354; _ } ->
               return (Ok (Ok ()))
             | `Received reply ->
               Log.info
                 log
                 (lazy
                   (Log.Message.create
                      ~here:[%here]
                      ~flows
                      ~component:(component @ [ "data" ])
                      ~command
                      ~reply
                      "send rejected"));
               return
                 (Ok
                    (Error (`Rejected_sender_and_recipients (reply, rejected_recipients)))))
      >>=?? fun () ->
      Deferred.Or_error.try_with_join ~run:`Schedule ~rest:`Log (fun () ->
        Log.debug
          log
          (lazy
            (Log.Message.create
               ~here:[%here]
               ~flows
               ~component:(component @ [ "data" ])
               "starting transmitting body"));
        send_data t
        >>=? fun () ->
        let writer = writer t in
        Writer.write writer "\r\n";
        Writer.write writer ".";
        Writer.write writer "\r\n";
        flush_writer_with_timeout
          ~timeout:(Config.send_receive_timeout (config t))
          ~writer
        >>=? fun () ->
        Log.debug
          log
          (lazy
            (Log.Message.create
               ~here:[%here]
               ~flows
               ~component:(component @ [ "data" ])
               "finishing transmitting body"));
        receive
          t
          ~timeout:(Config.final_ok_timeout (config t))
          ~log
          ~flows
          ~component
          ~here:[%here]
        >>|? function
        | `Bsmtp ->
          Log.info
            log
            (lazy
              (Log.Message.create
                 ~here:[%here]
                 ~flows
                 ~component
                 ~recipients:(List.map accepted_recipients ~f:(fun e -> `Email e))
                 "delivered"));
          Ok ("bsmtp", rejected_recipients)
        | `Received { Smtp_reply.code = `Ok_completed_250; raw_message } ->
          let remote_id = String.concat ~sep:"\n" raw_message in
          Log.info
            log
            (lazy
              (Log.Message.create
                 ~here:[%here]
                 ~flows
                 ~component
                 ~sender:(`Sender (Smtp_envelope.Info.sender envelope_info))
                 ~recipients:(List.map accepted_recipients ~f:(fun e -> `Email e))
                 ?remote_address:(remote_address t)
                 ?remote_ip_address:(remote_ip_address t)
                 ?local_ip_address:(local_ip_address t)
                 ~tags:[ "remote-id", remote_id ]
                 "sent"));
          Ok (remote_id, rejected_recipients)
        | `Received reply ->
          Log.info
            log
            (lazy
              (Log.Message.create
                 ~here:[%here]
                 ~flows
                 ~component
                 ~recipients:(List.map accepted_recipients ~f:(fun e -> `Email e))
                 ~reply
                 "send rejected"));
          Error (`Rejected_body (reply, rejected_recipients))))
  ;;
end

let send_data_via_reader_writer t ~email =
  let block_length = ref 0 in
  (* We will send at most [max_block_length + <max line length> + 1]
     bytes per block. *)
  let max_block_length = 16 * 1024 in
  let timeout = Config.send_receive_timeout (config t) in
  let writer = writer t in
  Email.to_string email
  |> String.split ~on:'\n'
  |> List.map ~f:(String.rstrip ~drop:(Char.equal '\r'))
  |> fun lines ->
  let num_lines = List.length lines in
  Deferred.Or_error.List.iteri lines ~how:`Sequential ~f:(fun i line ->
    (if !block_length >= max_block_length
     then (
       block_length := 0;
       flush_writer_with_timeout ~timeout ~writer)
     else Deferred.Or_error.ok_unit)
    >>|? fun () ->
    let encoded = Dot_escaping.encode_line_string line in
    String_monoid.output_unix encoded writer;
    block_length := !block_length + String_monoid.length encoded;
    if not (i = num_lines - 1) then Writer.write writer "\r\n")
;;

let send_envelope t ~log ?flows ?component envelope =
  Expert.send_envelope
    t
    ~log
    ?flows
    ?component
    ~send_data:(send_data_via_reader_writer ~email:(Smtp_envelope.email envelope))
    (Smtp_envelope.info envelope)
;;

module For_test = struct
  let with_
    ?(config = Config.default)
    ?(credentials = Credentials.anon)
    ~log
    ?(flows = Log.Flows.none)
    ?(component = [])
    ?(emulate_tls = false)
    ?local_ip_address
    ?remote_ip_address
    ~remote_address
    reader
    writer
    ~f
    =
    create
      ~remote_address
      ?local_ip_address
      ?remote_ip_address
      ~flows
      ~emulate_tls_for_test:emulate_tls
      reader
      writer
      config
    (* Flow already attatched to the session *)
    |> with_session ~log ~component ~credentials ~f
  ;;
end

module Tcp = struct
  let with_address
    ?buffer_age_limit
    ?interrupt
    ?reader_buffer_size
    ?writer_buffer_size
    ?timeout
    ?time_source
    ?(config = Config.default)
    ?(credentials = Credentials.anon)
    ~log
    ?(flows = Log.Flows.none)
    ?(component = [])
    ?remote_address
    socket_address
    ~f
    =
    let remote_address =
      match remote_address with
      | None -> Socket.Address.Inet.to_host_and_port socket_address
      | Some remote_address -> remote_address
    in
    let flows = Log.Flows.extend flows `Client_session in
    let component = component @ [ "smtp-client" ] in
    let f socket reader writer =
      let inet_address = function
        | `Unix _ -> None
        | `Inet _ as i -> Some i
      in
      let local_ip_address = inet_address (Socket.getsockname socket) in
      let remote_ip_address = inet_address (Socket.getpeername socket) in
      Log.debug
        log
        (lazy
          (Log.Message.create
             ~here:[%here]
             ~flows
             ~component:(component @ [ "tcp" ])
             ~remote_address
             ?local_ip_address
             ?remote_ip_address
             "connection established"));
      create
        ~remote_address
        ?local_ip_address
        ?remote_ip_address
        ~flows
        ~emulate_tls_for_test:false
        reader
        writer
        config
      (* Flow already attatched to the session *)
      |> with_session ~log ~component ~credentials ~f
    in
    Deferred.Or_error.try_with ~run:`Schedule ~rest:`Log (fun () ->
      Tcp.with_connection
        ?buffer_age_limit
        ?interrupt
        ?reader_buffer_size
        ?writer_buffer_size
        ?timeout
        ?time_source
        (Tcp.Where_to_connect.of_inet_address socket_address)
        f)
  ;;

  let resolve_addresses smtp_server =
    match Unix.Inet_addr.of_string smtp_server with
    | inet -> Deferred.Or_error.return [ inet ]
    | exception _not_an_ip ->
      (match%bind
         Deferred.Or_error.try_with ~run:`Schedule ~rest:`Log (fun () ->
           Unix.Host.getbyname smtp_server)
       with
       | Error error ->
         Deferred.Or_error.error_s
           [%message
             "Failed to resolve hostname" (smtp_server : string) (error : Error.t)]
       | Ok None ->
         Deferred.Or_error.error_s
           [%message "Failed to resolve hostname" (smtp_server : string) "Not Found"]
       | Ok (Some host_info) ->
         Array.to_list host_info.addresses |> List.permute |> Deferred.Or_error.return)
  ;;

  let with_
    ?buffer_age_limit
    ?interrupt
    ?reader_buffer_size
    ?writer_buffer_size
    ?timeout
    ?time_source
    ?config
    ?credentials
    ~log
    ?flows
    ?component
    smtp_server
    ~f
    =
    let remote_address = smtp_server in
    let host, port = Host_and_port.tuple smtp_server in
    match%bind resolve_addresses host with
    | Error _ as error -> return error
    | Ok addrs ->
      let rec loop ~errors = function
        | [] ->
          Deferred.Or_error.error_s
            [%message
              "Failed to connect"
                (smtp_server : Host_and_port.t)
                (errors : (Unix.Inet_addr.t * Error.t) list)]
        | inet :: more_addrs ->
          (match%bind
             with_address
               ?buffer_age_limit
               ?interrupt
               ?reader_buffer_size
               ?writer_buffer_size
               ?timeout
               ?time_source
               ?config
               ?credentials
               ~log
               ?flows
               ?component
               ~remote_address
               (Socket.Address.Inet.create inet ~port)
               ~f
           with
           | Ok res -> return res
           | Error error -> loop ~errors:((inet, error) :: errors) more_addrs)
      in
      loop ~errors:[] addrs
  ;;
end

(* BSMTP writing *)
module Bsmtp = struct
  let config =
    { Config.tls = []
    ; greeting = Some "bsmtp"
    ; send_receive_timeout = `This (Time_float.Span.of_sec 5.)
    ; final_ok_timeout = `This (Time_float.Span.of_sec 5.)
    }
  ;;

  let bsmtp_log =
    Lazy.map Async.Log.Global.log ~f:(fun log ->
      Log.adjust_log_levels ~remap_info_to:`Debug log)
  ;;

  let with_ ?(skip_prelude_and_prologue = false) writer ~log ~component ~f =
    create_bsmtp writer config
    |> fun t ->
    if skip_prelude_and_prologue
    then f t
    else do_helo t ~log ~component >>=? fun () -> f t
  ;;

  let write
    ?skip_prelude_and_prologue
    ?(log = Lazy.force bsmtp_log)
    ?flows
    ?(component = [ "bsmtp"; "writer" ])
    writer
    envelopes
    =
    with_ ?skip_prelude_and_prologue writer ~log ~component ~f:(fun client ->
      Deferred.Or_error.try_with ~run:`Schedule ~rest:`Log (fun () ->
        Pipe.iter envelopes ~f:(fun envelope ->
          (* Flow already attatched to the session *)
          send_envelope client ~log ?flows ~component envelope
          >>| Or_error.ok_exn
          >>| Envelope_status.ok_exn ~allow_rejected_recipients:false
          >>| (ignore : string -> unit))))
  ;;

  let to_string ?skip_prelude_and_prologue ?log ?flows ?component envelopes =
    let open Deferred.Or_error.Let_syntax in
    let%bind `Reader reader, `Writer writer =
      Deferred.Or_error.try_with (fun () ->
        Unix.pipe (Info.create_s [%message "Async_smtp.Client.Bsmtp.to_string"]))
    in
    let reader = Reader.create reader in
    let writer = Writer.create writer in
    let%map () =
      let%bind () =
        write
          ?skip_prelude_and_prologue
          ?log
          ?flows
          ?component
          writer
          (Pipe.of_list envelopes)
      in
      let%bind () = Deferred.Or_error.try_with (fun () -> Writer.flushed writer) in
      let%bind () = Deferred.Or_error.try_with (fun () -> Writer.close writer) in
      return ()
    and string = Deferred.Or_error.try_with (fun () -> Reader.contents reader) in
    string
  ;;
end
OCaml

Innovation. Community. Security.