package shuttle

  1. Overview
  2. Docs

Source file tcp_channel.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
open! Core
open! Async

let close_channels reader writer =
  let%bind () = Output_channel.close writer in
  Input_channel.close reader
;;

let collect_errors writer fn =
  let monitor = Output_channel.monitor writer in
  Monitor.detach monitor;
  choose
    [ choice (Monitor.get_next_error monitor) (fun e -> Error e)
    ; choice (Monitor.try_with ~run:`Now ~rest:`Log fn) Fn.id
    ]
;;

let listen
  ?max_connections
  ?max_accepts_per_batch
  ?backlog
  ?socket
  ?max_buffer_size
  ?buf_len
  ?write_timeout
  ?time_source
  ~on_handler_error
  where_to_listen
  handler
  =
  Tcp.Server.create_sock
    ?max_connections
    ?max_accepts_per_batch
    ?backlog
    ?socket
    ?time_source
    ~on_handler_error
    where_to_listen
    (fun addr socket ->
    let fd = Socket.fd socket in
    let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
    let output_channel =
      Output_channel.create ?max_buffer_size ?buf_len ?write_timeout ?time_source fd
    in
    let%bind res =
      Deferred.any
        [ collect_errors output_channel (fun () ->
            handler addr input_channel output_channel)
        ; Output_channel.remote_closed output_channel |> Deferred.ok
        ]
    in
    let%bind () = close_channels input_channel output_channel in
    match res with
    | Ok () -> Deferred.unit
    | Error exn -> raise exn)
;;

let listen_inet
  ?max_connections
  ?max_accepts_per_batch
  ?backlog
  ?socket
  ?max_buffer_size
  ?buf_len
  ?write_timeout
  ?time_source
  ~on_handler_error
  where_to_listen
  handler
  =
  Tcp.Server.create_sock_inet
    ?max_connections
    ?max_accepts_per_batch
    ?backlog
    ?socket
    ?time_source
    ~on_handler_error
    where_to_listen
    (fun addr socket ->
    let fd = Socket.fd socket in
    let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
    let output_channel =
      Output_channel.create ?max_buffer_size ?buf_len ?write_timeout ?time_source fd
    in
    let%bind res =
      Deferred.any
        [ collect_errors output_channel (fun () ->
            handler addr input_channel output_channel)
        ; Output_channel.remote_closed output_channel |> Deferred.ok
        ]
    in
    let%bind () = close_channels input_channel output_channel in
    match res with
    | Ok () -> Deferred.unit
    | Error exn -> raise exn)
;;

let with_connection
  ?interrupt
  ?connect_timeout
  ?max_buffer_size
  ?buf_len
  ?write_timeout
  ?time_source
  where_to_connect
  f
  =
  let%bind socket =
    Tcp.connect_sock ?interrupt ?timeout:connect_timeout ?time_source where_to_connect
  in
  let fd = Socket.fd socket in
  let input_channel = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
  let output_channel =
    Output_channel.create ?max_buffer_size ?buf_len ?time_source ?write_timeout fd
  in
  let res = collect_errors output_channel (fun () -> f input_channel output_channel) in
  let%bind () =
    Deferred.any_unit
      [ (res >>| fun _ -> ())
      ; Output_channel.close_finished output_channel
      ; Input_channel.closed input_channel
      ]
  in
  let%bind () = close_channels input_channel output_channel in
  match%map res with
  | Ok v -> v
  | Error exn ->
    Exn.reraise exn "Shuttle.Connection: Unhandled exception in TCP client connection"
;;

let connect
  ?interrupt
  ?connect_timeout
  ?max_buffer_size
  ?buf_len
  ?write_timeout
  ?time_source
  where_to_connect
  =
  let%map socket =
    Tcp.connect_sock ?interrupt ?timeout:connect_timeout ?time_source where_to_connect
  in
  let fd = Socket.fd socket in
  let reader = Input_channel.create ?max_buffer_size ?buf_len ?time_source fd in
  let writer =
    Output_channel.create ?max_buffer_size ?buf_len ?time_source ?write_timeout fd
  in
  reader, writer
;;
OCaml

Innovation. Community. Security.