package vcaml

  1. Overview
  2. Docs

Source file msgpack_rpc.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
open Core_kernel
open Async

let bitsize =
  let open Int.O in
  2 ** 32
;;

type event =
  { method_name : string
  ; params : Msgpack.t list
  }
[@@deriving sexp]

module type Connection = sig
  type t

  val reader : t -> Async.Reader.t
  val writer : t -> Async.Writer.t
end

module type S = sig
  type conn
  type t

  val subscribe : t -> Source_code_position.t -> event Pipe.Reader.t

  val call
    :  t
    -> method_name:string
    -> parameters:Msgpack.t
    -> (Msgpack.t, Msgpack.t) Deferred.Result.t

  val connect : conn -> t

  val register_method
    :  name:string
    -> f:(Msgpack.t list -> Msgpack.t Or_error.t)
    -> unit Or_error.t
end

module Make (M : Connection) () = struct
  type conn = M.t
  type t = M.t * (event -> unit) Bus.Read_only.t

  module Id_factory = Unique_id.Int63 ()

  let pending_requests : (Msgpack.t, Msgpack.t) Result.t Ivar.t Int.Table.t =
    Int.Table.create ()
  ;;

  let subscribe ((_, notifications_bus) : t) = Async_bus.pipe1_exn notifications_bus

  let synchronous_callbacks : (Msgpack.t list -> Msgpack.t Or_error.t) String.Table.t =
    String.Table.create ()
  ;;

  let register_method ~name ~f =
    match Hashtbl.add synchronous_callbacks ~key:name ~data:f with
    | `Ok -> Ok ()
    | `Duplicate -> Or_error.errorf "duplicate method name %s" name
  ;;

  let event_loop conn notifications_bus =
    let handle_message = function
      | Msgpack.Array [ Integer 1; Integer msgid; Nil; result ] ->
        (match Hashtbl.find pending_requests msgid with
         | None ->
           Log.Global.error "Unknown message ID: %d" msgid;
           return ()
         | Some box ->
           Ivar.fill box (Ok result);
           return ())
      | Msgpack.Array [ Integer 1; Integer msgid; err; Nil ] ->
        (match Hashtbl.find pending_requests msgid with
         | None ->
           Log.Global.error "Unknown message ID: %d" msgid;
           return ()
         | Some box ->
           Ivar.fill box (Error err);
           return ())
      | Msgpack.Array [ Integer 2; String method_name; Array params ] ->
        Bus.write notifications_bus { method_name; params };
        return ()
      | Msgpack.Array [ Integer 0; Integer msgid; String method_name; Array params ] ->
        let resp =
          match Hashtbl.find synchronous_callbacks method_name with
          | None ->
            Msgpack.Array
              [ Msgpack.Integer 1
              ; Integer msgid
              ; String (sprintf "no method %s" method_name)
              ; Nil
              ]
          | Some f ->
            (match Or_error.try_with_join (fun () -> f params) with
             | Ok r -> Msgpack.Array [ Msgpack.Integer 1; Integer msgid; Nil; r ]
             | Error e ->
               Msgpack.Array
                 [ Msgpack.Integer 1
                 ; Integer msgid
                 ; String (e |> [%sexp_of: Error.t] |> Sexp.to_string)
                 ; Nil
                 ])
        in
        Async.Writer.write (M.writer conn) (Msgpack.string_of_t_exn resp);
        return ()
      | msg ->
        Log.Global.error !"unexpected msgpack response: %{sexp:Msgpack.t}\n" msg;
        return ()
    in
    match%bind
      Angstrom_async.parse_many
        Msgpack.Internal.Parser.msg
        handle_message
        (M.reader conn)
    with
    | Ok () -> return ()
    | Error s ->
      Log.Global.error "Unable to parse messagepack-rpc response: %s" s;
      return ()
  ;;

  let register msg_id =
    let box = Ivar.create () in
    Hashtbl.set pending_requests ~key:msg_id ~data:box;
    box, msg_id
  ;;

  let wait_for_response (box, msg_id) =
    let%bind result = Ivar.read box in
    Hashtbl.remove pending_requests msg_id;
    return result
  ;;

  let connect conn =
    let notifications_bus =
      Bus.create
        [%here]
        Arity1
        ~on_subscription_after_first_write:Allow
        ~on_callback_raise:(* This should be impossible. *)
          Error.raise
    in
    don't_wait_for (event_loop conn notifications_bus);
    conn, Bus.read_only notifications_bus
  ;;

  let call (conn, _) ~method_name ~parameters =
    let open Msgpack in
    let msg_id = Id_factory.create () |> Id_factory.to_int_exn in
    let method_name = String method_name in
    let query_msg =
      Array [ Integer 0; Integer (msg_id mod bitsize); method_name; parameters ]
    in
    let result_box = register msg_id in
    let query = Msgpack.string_of_t_exn query_msg in
    let () = Async.Writer.write (M.writer conn) query in
    wait_for_response result_box
  ;;
end
OCaml

Innovation. Community. Security.