package paf

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file paf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
module type RUNTIME = sig
  type t

  val next_read_operation : t -> [ `Read | `Yield | `Close ]
  (** [next_read_connection t] returns a value describing the next operation
      that the caller should conduit on behalf of the connection. *)

  val read : t -> Bigstringaf.t -> off:int -> len:int -> int
  (** [read t bigstring ~off ~len] reads bytes of input from the provided range
      of [bigstring] an returns the number of bytes consumed by the connection.
      {!read} should be called after {!next_read_operation} returns a [`Read]
      value an additional input is available for the connection to consume. *)

  val read_eof : t -> Bigstringaf.t -> off:int -> len:int -> int
  (** [read_eof t bigstring ~off ~len] reads bytes of input from the provided
      range of [bigstring] and returns the number of bytes consumed by the
      connection. {!read_eof} should be called after {!next_read_operation}
      returns a [`Read] and an EOF has been received from the communication
      channel. The connection will attempt to consume any buffered input and
      then shutdown the HTTP parser for the connection. *)

  val yield_reader : t -> (unit -> unit) -> unit
  (** [yield_reader t continue] registers with the connection to call [continue]
      when reading should resume. {!yield_reader} should be called after
      {!next_read_operation} returns a [`Yield] value. *)

  val next_write_operation :
    t -> [ `Write of Bigstringaf.t Faraday.iovec list | `Yield | `Close of int ]
  (** [next_write_operation t] returns a value describing the next operation
      that the caller should conduct on behalf the connection. *)

  val report_write_result : t -> [ `Ok of int | `Closed ] -> unit
  (** [report_write_result t result] reports the result of the latest write
      attempt to the connection. {!report_write_result} should be called after a
      call to {!next_write_operation} that returns a [`Write buffer] value.

      - [`Ok n] indicates that the caller successfully wrote [n] bytes of output
        from the buffer that the caller was provided by {!next_write_operation}
        that returns a [`Write buffer] value.
      - [`Closed] indicates that the output destination will no longer accept
        bytes from the write processor. *)

  val yield_writer : t -> (unit -> unit) -> unit
  (** [yield_writer t continue] registers with the connection to call [continue]
      when writing should resume. {!yield_writer} should be called after
      {!next_write_operation} returns a [`Yield] value. *)

  val report_exn : t -> exn -> unit
  (** [report_exn t exn] reports that an error [exn] has been caught and that it
      has been attributed to [t]. Calling this function will switch [t] into an
      error state. Depending on the tate [t] is transitioning from, it may call
      its error handler before terminating the connection. *)

  val is_closed : t -> bool
  (** [is_closed t] is [true] if both the read and write processors have been
      shutdown. When this is the case {!next_read_operation} will return
      [`Close _] and {!next_write_operation} will return a [`Write _] until all
      buffered output has been flushed, at which point it will return [`Close]. *)

  val shutdown : t -> unit
  (** [shutdown t] asks to shutdown the connection. *)
end

type 'conn runtime = (module RUNTIME with type t = 'conn)

exception Flow of string
exception Flow_write of string

let src = Logs.Src.create "paf-flow"

module Log_flow = (val Logs.src_log src : Logs.LOG)

module Make (Flow : Mirage_flow.S) = struct
  type flow = {
    flow : Flow.flow;
    queue : (char, Bigarray.int8_unsigned_elt) Ke.Rke.t;
    mutable rd_closed : bool;
    mutable wr_closed : bool;
  }

  let create flow =
    let queue = Ke.Rke.create ~capacity:0x1000 Bigarray.char in
    Lwt.return { flow; queue; rd_closed = false; wr_closed = false }

  let safely_close flow =
    if flow.rd_closed && flow.wr_closed
    then (
      Log_flow.debug (fun m -> m "Close the connection.") ;
      Flow.close flow.flow)
    else Lwt.return ()

  let blit src src_off dst dst_off len =
    let dst = Cstruct.of_bigarray ~off:dst_off ~len dst in
    Cstruct.blit src src_off dst 0 len

  open Lwt.Infix

  type eof = [ `Eof ]

  let recv flow ~report_error ~report_closed ~read ~read_eof =
    Ke.Rke.compress flow.queue ;
    Flow.read flow.flow >>= function
    | (Error _ | Ok #eof) as v ->
        flow.rd_closed <- true ;
        safely_close flow >>= fun () ->
        let _shift =
          match
            Ke.Rke.compress flow.queue ;
            Ke.Rke.N.peek flow.queue
          with
          | [] -> read_eof Bigstringaf.empty ~off:0 ~len:0
          | [ slice ] -> read_eof slice ~off:0 ~len:(Bigstringaf.length slice)
          | _ -> assert false
          (* XXX(dinosaure): impossible due to [compress]. *) in
        (match v with
        | Ok `Eof -> report_closed ()
        | Error err -> report_error err) ;
        Lwt.return `Closed
    | Ok (`Data v) ->
        let len = Cstruct.length v in
        Ke.Rke.N.push flow.queue ~blit ~length:Cstruct.length ~off:0 ~len v ;
        let[@warning "-8"] (slice :: _) = Ke.Rke.N.peek flow.queue in
        let shift = read slice ~off:0 ~len:(Bigstringaf.length slice) in
        Ke.Rke.N.shift_exn flow.queue shift ;
        Lwt.return `Continue

  let writev ~report_error flow iovecs =
    let iovecs =
      List.map
        (fun { Faraday.buffer; off; len } ->
          Cstruct.to_string (Cstruct.of_bigarray buffer ~off ~len) ~off:0 ~len)
        iovecs in
    let iovecs = List.map Cstruct.of_string iovecs in
    (* XXX(dinosaure): the copy is needed:
       1) [Mirage_flow.S] explicitly says that [write] takes the ownership on
          the given [Cstruct.t]
       2) [ocaml-h2] wants to keep the ownership on given [Faraday.iovec]s

       To protect one from the other, copying is necessary. *)
    Log_flow.debug (fun m ->
        m "Start to write %d byte(s)."
          (List.fold_left (fun acc cs -> Cstruct.length cs + acc) 0 iovecs)) ;
    Flow.writev flow.flow iovecs >>= function
    | Ok () ->
        Lwt.return
          (`Ok
            (List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 iovecs))
    | Error err ->
        Log_flow.err (fun m ->
            m "Got an error when we wrote something: %a." Flow.pp_write_error
              err) ;
        report_error err ;
        flow.wr_closed <- true ;
        safely_close flow >>= fun () -> Lwt.return `Closed

  let send ~report_error flow iovecs =
    if flow.wr_closed
    then safely_close flow >>= fun () -> Lwt.return `Closed
    else writev ~report_error flow iovecs

  let close flow =
    match (flow.rd_closed, flow.wr_closed) with
    | true, true -> Lwt.return_unit
    | _ ->
        flow.rd_closed <- true ;
        flow.wr_closed <- true ;
        Flow.close flow.flow
end

let src = Logs.Src.create "paf-server"

module Log_server = (val Logs.src_log src : Logs.LOG)

module Server (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig
  val server : Runtime.t -> Flow.flow -> unit Lwt.t
end = struct
  module Easy_flow = Make (Flow)
  open Lwt.Infix

  let to_flow_exception err : exn = Flow (Fmt.str "%a" Flow.pp_error err)

  let to_flow_write_exception err : exn =
    Flow_write (Fmt.str "%a" Flow.pp_write_error err)

  let server connection flow =
    Easy_flow.create flow >>= fun flow ->
    let rd_exit, notify_rd_exit = Lwt.wait () in
    let wr_exit, notify_wr_exit = Lwt.wait () in

    let rec rd_fiber () =
      let report_error err =
        Runtime.report_exn connection (to_flow_exception err) in
      let rec go () =
        Log_server.debug (fun m -> m "Compute next read operation.") ;
        match Runtime.next_read_operation connection with
        | `Read ->
            Log_server.debug (fun m -> m "next read operation: `read") ;
            Easy_flow.recv flow ~report_error ~report_closed:ignore
              ~read:(Runtime.read connection)
              ~read_eof:(Runtime.read_eof connection)
            >>= fun _ -> Lwt.pause () >>= go
        | `Yield ->
            Log_server.debug (fun m -> m "next read operation: `yield") ;
            Runtime.yield_reader connection rd_fiber ;
            Lwt.pause ()
        | `Close ->
            Log_server.debug (fun m -> m "next read operation: `close") ;
            Lwt.wakeup_later notify_rd_exit () ;
            Flow.shutdown flow.flow `read in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          Runtime.report_exn connection exn ;
          Lwt.return_unit) in
    let rec wr_fiber () =
      let report_error err =
        Runtime.report_exn connection (to_flow_write_exception err) in
      let rec go () =
        Log_server.debug (fun m -> m "Compute next write operation.") ;
        match Runtime.next_write_operation connection with
        | `Write iovecs ->
            Log_server.debug (fun m -> m "next write operation: `write") ;
            Easy_flow.send ~report_error flow iovecs >>= fun res ->
            Runtime.report_write_result connection res ;
            Lwt.pause () >>= go
        | `Yield ->
            Log_server.debug (fun m -> m "next write operation: `yield") ;
            Runtime.yield_writer connection wr_fiber ;
            Lwt.pause ()
        | `Close _ ->
            Log_server.debug (fun m -> m "next write operation: `close") ;
            Lwt.wakeup_later notify_wr_exit () ;
            Flow.shutdown flow.flow `write in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          (* Runtime.report_write_result connection `Closed ; *)
          Runtime.report_exn connection exn ;
          Lwt.return_unit) in
    rd_fiber () ;
    wr_fiber () ;
    Lwt.join [ rd_exit; wr_exit ] >>= fun () ->
    Log_server.debug (fun m -> m "End of transmission.") ;
    Easy_flow.close flow
end

let src = Logs.Src.create "paf-client"

module Log_client = (val Logs.src_log src : Logs.LOG)

module Client (Flow : Mirage_flow.S) (Runtime : RUNTIME) : sig
  val run : Runtime.t -> Flow.flow -> unit Lwt.t
end = struct
  open Lwt.Infix
  module Easy_flow = Make (Flow)

  let to_flow_exception err : exn = Flow (Fmt.str "%a" Flow.pp_error err)

  let to_flow_write_exception err : exn =
    Flow_write (Fmt.str "%a" Flow.pp_write_error err)

  let run connection flow =
    Easy_flow.create flow >>= fun flow ->
    let rd_exit, notify_rd_exit = Lwt.wait () in
    let wr_exit, notify_wr_exit = Lwt.wait () in

    let rec rd_fiber () =
      let report_error err =
        Runtime.report_exn connection (to_flow_exception err) in
      let rec go () =
        match Runtime.next_read_operation connection with
        | `Read ->
            Log_client.debug (fun m -> m "next read operation: `read") ;
            Easy_flow.recv flow ~report_error ~report_closed:ignore
              ~read:(Runtime.read connection)
              ~read_eof:(Runtime.read_eof connection)
            >>= fun _ -> Lwt.pause () >>= go
        | `Yield ->
            Log_client.debug (fun m -> m "next read operation: `yield") ;
            Runtime.yield_reader connection rd_fiber ;
            Lwt.pause ()
        | `Close ->
            Log_client.debug (fun m -> m "next read operation: `close.") ;
            Lwt.wakeup_later notify_rd_exit () ;
            flow.Easy_flow.rd_closed <- true ;
            Easy_flow.safely_close flow in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          Runtime.report_exn connection exn ;
          Lwt.return_unit) in
    let rec wr_fiber () =
      let report_error err =
        Runtime.report_exn connection (to_flow_write_exception err) in
      let rec go () =
        match Runtime.next_write_operation connection with
        | `Write iovecs ->
            Log_client.debug (fun m -> m "next write operation: `write.") ;
            Easy_flow.send ~report_error flow iovecs >>= fun res ->
            Runtime.report_write_result connection res ;
            Lwt.pause () >>= go
        | `Yield ->
            Log_client.debug (fun m -> m "next write operation: `yield.") ;
            Runtime.yield_writer connection wr_fiber ;
            Lwt.pause ()
        | `Close _ ->
            Log_client.debug (fun m -> m "next write operation: `close.") ;
            Lwt.wakeup_later notify_wr_exit () ;
            flow.Easy_flow.wr_closed <- true ;
            Easy_flow.safely_close flow in
      Lwt.async @@ fun () ->
      Lwt.catch go (fun exn ->
          Runtime.report_exn connection exn ;
          Lwt.return ()) in
    wr_fiber () ;
    rd_fiber () ;
    Lwt.join [ rd_exit; wr_exit ] >>= fun () ->
    Log_client.debug (fun m -> m "End of transmission.") ;
    Easy_flow.close flow
end

type impl = Runtime : 'conn runtime * 'conn -> impl

type 't service =
  | Service : {
      accept : 't -> ('socket, ([> `Closed ] as 'error)) result Lwt.t;
      handshake : 'socket -> ('flow, ([> `Closed ] as 'error)) result Lwt.t;
      connection : 'flow -> (Mimic.flow * impl, 'error) result Lwt.t;
      close : 't -> unit Lwt.t;
    }
      -> 't service

and ('t, 'socket, 'flow, 'error) posix = {
  accept : 't -> ('socket, 'error) result Lwt.t;
  handshake : 'socket -> ('flow, 'error) result Lwt.t;
  close : 't -> unit Lwt.t;
}
  constraint 'error = [> `Closed ]

let service connection handshake accept close =
  Service { accept; connection; handshake; close }

open Lwt.Infix

let serve_when_ready :
    type t socket flow.
    (t, socket, flow, _) posix ->
    ?stop:Lwt_switch.t ->
    handler:(flow -> unit Lwt.t) ->
    t ->
    [ `Initialized of unit Lwt.t ] =
 fun service ?stop ~handler t ->
  let { accept; handshake; close } = service in
  `Initialized
    (let switched_off =
       let t, u = Lwt.wait () in
       Lwt_switch.add_hook stop (fun () ->
           Lwt.wakeup_later u (Ok `Stopped) ;
           Lwt.return_unit) ;
       t in
     let rec loop () =
       accept t >>= function
       | Ok socket ->
           Lwt.async (fun () ->
               handshake socket >>= function
               | Ok flow -> handler flow
               | Error `Closed ->
                   Logs.info (fun m -> m "Connection closed by peer") ;
                   Lwt.return ()
               | Error _err ->
                   Logs.err (fun m ->
                       m "Got an error from a TCP/IP connection.") ;
                   Lwt.return ()) ;
           loop ()
       | Error `Closed -> Lwt.return_error `Closed
       | Error _ -> Lwt.pause () >>= loop in
     let stop_result =
       Lwt.pick [ switched_off; loop () ] >>= function
       | Ok `Stopped -> close t >>= fun () -> Lwt.return_ok ()
       | Error _ as err -> close t >>= fun () -> Lwt.return err in
     stop_result >>= function Ok () | Error `Closed -> Lwt.return_unit)

let server : type t. t runtime -> t -> Mimic.flow -> unit Lwt.t =
 fun (module Runtime) conn flow ->
  let module Server = Server (Mimic) (Runtime) in
  Server.server conn flow

let serve ?stop service t =
  let (Service { accept; handshake; connection; close }) = service in
  let handler flow =
    connection flow >>= function
    | Ok (flow, Runtime (runtime, conn)) -> server runtime conn flow
    | Error _ -> Lwt.return_unit in
  serve_when_ready ?stop ~handler { accept; handshake; close } t

let run : type t. t runtime -> t -> Mimic.flow -> unit Lwt.t =
 fun (module Runtime) conn flow ->
  let module Client = Client (Mimic) (Runtime) in
  Client.run conn flow
OCaml

Innovation. Community. Security.