package async_rpc_kernel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file transport.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
open Core
open Async_kernel

module Header = struct
  let length = 8
  let unsafe_get_payload_length buf ~pos = Bigstring.unsafe_get_int64_le_exn buf ~pos

  let unsafe_set_payload_length buf ~pos payload_len =
    Bigstring.unsafe_set_int64_le buf ~pos payload_len
  ;;
end

module Handler_result = Transport_intf.Handler_result

module Reader = struct
  module type S = Transport_intf.Reader

  type t = T : (module S with type t = 'a) * 'a -> t

  let pack m t = T (m, t)

  (* We put type annotations to be sure the type is not a function type, i.e. to avoid
     creating closures *)
  let sexp_of_t (T ((module M), t)) : Sexp.t = M.sexp_of_t t
  let close (T ((module M), t)) : unit Deferred.t = M.close t
  let is_closed (T ((module M), t)) : bool = M.is_closed t
  let bytes_read (T ((module M), t)) : Int63.t = M.bytes_read t

  let read_forever (T ((module M), t)) ~on_message ~on_end_of_batch : _ Deferred.t =
    M.read_forever t ~on_message ~on_end_of_batch
  ;;

  let read_one_message_bin_prot t (bin_reader : _ Bin_prot.Type_class.reader) =
    read_forever
      t
      ~on_message:(fun buf ~pos ~len ->
        let pos_ref = ref pos in
        let x = bin_reader.read buf ~pos_ref in
        if !pos_ref <> pos + len
        then
          failwithf
            "message length (%d) did not match expected length (%d)"
            (!pos_ref - pos)
            len
            ()
        else Stop x)
      ~on_end_of_batch:ignore
  ;;
end

module Send_result = Transport_intf.Send_result

module Writer = struct
  module type S = Transport_intf.Writer

  type 'a writer =
    { impl : (module S with type t = 'a)
    ; t : 'a
    (* We cache the result of [stopped] because it is often the [Deferred.any] of several
       other deferreds and we want [can_send] to be simple. *)
    ; stopped : unit Deferred.t
    }

  type t = T : 'a writer -> t

  let pack (type a) (module M : S with type t = a) t =
    T { impl = (module M); t; stopped = M.stopped t }
  ;;

  let sexp_of_t (T { impl = (module M); t; _ }) : Sexp.t = M.sexp_of_t t
  let close (T { impl = (module M); t; _ }) : unit Deferred.t = M.close t
  let is_closed (T { impl = (module M); t; _ }) : bool = M.is_closed t
  let monitor (T { impl = (module M); t; _ }) : Monitor.t = M.monitor t
  let bytes_to_write (T { impl = (module M); t; _ }) : int = M.bytes_to_write t
  let bytes_written (T { impl = (module M); t; _ }) : Int63.t = M.bytes_written t
  let flushed (T { impl = (module M); t; _ }) : unit Deferred.t = M.flushed t

  let ready_to_write (T { impl = (module M); t; _ }) : unit Deferred.t =
    M.ready_to_write t
  ;;

  let send_bin_prot (T { impl = (module M); t; _ }) bin_writer x : _ Send_result.t =
    M.send_bin_prot t bin_writer x
  ;;

  let send_bin_prot_and_bigstring
        (T { impl = (module M); t; _ })
        bin_writer
        x
        ~buf
        ~pos
        ~len
    : _ Send_result.t
    =
    M.send_bin_prot_and_bigstring t bin_writer x ~buf ~pos ~len
  ;;

  let send_bin_prot_and_bigstring_non_copying
        (T { impl = (module M); t; _ })
        bin_writer
        x
        ~buf
        ~pos
        ~len
    : _ Send_result.t
    =
    M.send_bin_prot_and_bigstring_non_copying t bin_writer x ~buf ~pos ~len
  ;;

  let stopped (T { stopped; _ }) = stopped

  let can_send (T { impl = (module M); t; stopped }) =
    not (M.is_closed t || Deferred.is_determined stopped)
  ;;

  let transfer t ?(max_num_values_per_read = 1_000) pipe f =
    let consumer =
      Pipe.add_consumer pipe ~downstream_flushed:(fun () ->
        let%map () = flushed t in
        `Ok)
    in
    let end_of_pipe =
      Deferred.create (fun ivar ->
        let rec iter () =
          if can_send t
          then (
            match
              Pipe.read_now' pipe ~consumer ~max_queue_length:max_num_values_per_read
            with
            | `Ok q ->
              Queue.iter q ~f;
              Pipe.Consumer.values_sent_downstream consumer;
              ready_to_write t >>> iter
            | `Nothing_available ->
              Pipe.values_available pipe >>> fun (`Ok | `Eof) -> iter ()
            | `Eof -> Ivar.fill ivar ())
        in
        iter ())
    in
    let%map () = Deferred.any [ end_of_pipe; stopped t ] in
    Pipe.close_read pipe
  ;;
end

type t =
  { reader : Reader.t
  ; writer : Writer.t
  }
[@@deriving sexp_of]

let close t =
  let%bind () = Writer.close t.writer in
  Reader.close t.reader
;;
OCaml

Innovation. Community. Security.