package capnp-rpc-lwt

  1. Overview
  2. Docs

Source file service.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
open Capnp_core
open Lwt.Infix

module Log = Capnp_rpc.Debug.Log

module Response = Response
module RO_array = Capnp_rpc.RO_array

type abstract_response_promise = Core_types.struct_ref

type abstract

type abstract_method_t =
  abstract Schema.reader_t -> (unit -> unit) -> abstract_response_promise

type 'a response_promise = abstract_response_promise
type ('a, 'b) method_t = 'a -> (unit -> unit) -> Core_types.struct_ref

let pp_method = Capnp.RPC.Registry.pp_method

class type generic = object
  method dispatch : interface_id:Stdint.Uint64.t -> method_id:int -> abstract_method_t
  method release : unit
  method pp : Format.formatter -> unit
end

let local (s:#generic) =
  object (_ : Core_types.cap)
    inherit Core_types.service as super

    method! pp f = Fmt.pf f "%t(%t)" s#pp super#pp_refcount

    method! private release =
      super#release;
      s#release

    method call results msg =
      let open Schema.Reader in
      let call = Msg.Request.readable msg in
      let interface_id = Call.interface_id_get call in
      let method_id = Call.method_id_get call in
      Log.debug (fun f -> f "Invoking local method %a" pp_method (interface_id, method_id));
      let p = Call.params_get call in
      let m : abstract_method_t = s#dispatch ~interface_id ~method_id in
      let release_params () = Core_types.Request_payload.release msg in
      let contents : abstract Schema.reader_t =
        Payload.content_get p |> Schema.ReaderOps.deref_opt_struct_pointer |> Schema.ReaderOps.cast_struct in
      match m contents release_params with
      | r -> results#resolve r
      | exception ex ->
        release_params ();
        Log.warn (fun f -> f "Uncaught exception handling %a: %a" pp_method (interface_id, method_id) Fmt.exn ex);
        Core_types.resolve_payload results
          (Error (Capnp_rpc.Error.exn "Internal error from %a" pp_method (interface_id, method_id)))
  end

(* The simple case for returning a message (rather than another value). *)
let return resp =
  Core_types.return @@ Response.finish resp

let return_empty () =
  return @@ Response.create_empty ()

(* A convenient way to implement a simple blocking local function, where
   pipelining is not supported (messages sent to the result promise will be
   queued up at this host until it returns). *)
let return_lwt fn =
  let result, resolver = Local_struct_promise.make () in
  Lwt.async (fun () ->
      Lwt.catch (fun () ->
          fn () >|= function
          | Ok resp      -> Core_types.resolve_ok resolver @@ Response.finish resp;
          | Error (`Capnp e) -> Core_types.resolve_payload resolver (Error e)
        )
        (fun ex ->
           Log.warn (fun f -> f "Uncaught exception: %a" Fmt.exn ex);
           Core_types.resolve_exn resolver @@ Capnp_rpc.Exception.v "Internal error";
           Lwt.return_unit
        );
    );
  result

let fail = Core_types.fail

let fail_lwt ?ty fmt =
  fmt |> Fmt.kstr @@ fun msg ->
  Lwt_result.fail (`Capnp (`Exception (Capnp_rpc.Exception.v ?ty msg)))
OCaml

Innovation. Community. Security.