package grpc-async

  1. Overview
  2. Docs
An Async implementation of gRPC

Install

Dune Dependency

Authors

Maintainers

Sources

0.1.0.tar.gz
md5=62c8b2b2dea48f779dcc216ff9213723
sha512=ef4b89f080590fb68dce9b1fe3072fc4201a8d043c2a5d5c5024b3c4f9887c974fc9356479a63b305d1c50ac77a996e99a10705daafede7046a713dc3261e94d

doc/src/grpc-async/client.ml.html

Source file client.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
open! Core
open! Async

type response_handler = H2.Client_connection.response_handler

type do_request =
  ?flush_headers_immediately:bool ->
  ?trailers_handler:(H2.Headers.t -> unit) ->
  H2.Request.t ->
  response_handler:response_handler ->
  H2.Body.Writer.t

let make_request ~scheme ~service ~rpc ~headers =
  let request =
    H2.Request.create ~scheme `POST ("/" ^ service ^ "/" ^ rpc) ~headers
  in
  request

let default_headers =
  H2.Headers.of_list
    [ ("te", "trailers"); ("content-type", "application/grpc+proto") ]

let call ~service ~rpc ?(scheme = "https") ~handler ~do_request
    ?(headers = default_headers) () =
  let request = make_request ~service ~rpc ~scheme ~headers in
  let read_body_ivar = Ivar.create () in
  let handler_res_ivar = Ivar.create () in
  let out_ivar = Ivar.create () in
  let trailers_status_ivar = Ivar.create () in
  let trailers_handler headers =
    let code =
      match H2.Headers.get headers "grpc-status" with
      | None -> None
      | Some s -> (
          match int_of_string_opt s with
          | None -> None
          | Some i -> Grpc.Status.code_of_int i)
    in
    match code with
    | None -> ()
    | Some code -> (
        match Ivar.is_empty trailers_status_ivar with
        | true ->
            let message = H2.Headers.get headers "grpc-message" in
            let status = Grpc.Status.v ?message code in
            Ivar.fill trailers_status_ivar status
        | _ -> (* This should never happen, but just in case. *) ())
  in
  let response_handler (response : H2.Response.t) (body : H2.Body.Reader.t) =
    Ivar.fill read_body_ivar body;
    don't_wait_for
      (match response.status with
      | `OK ->
          let%bind handler_res = Ivar.read handler_res_ivar in
          Ivar.fill out_ivar (Ok handler_res);
          return ()
      | _ ->
          Ivar.fill out_ivar (Error (Grpc.Status.v Grpc.Status.Unknown));
          return ());
    trailers_handler response.headers
  in
  let flush_headers_immediately = None in
  let write_body : H2.Body.Writer.t =
    do_request ?flush_headers_immediately
      ?trailers_handler:(Some trailers_handler) request ~response_handler
  in
  don't_wait_for
    (let%bind handler_res = handler write_body (Ivar.read read_body_ivar) in
     Ivar.fill handler_res_ivar handler_res;
     return ());
  let%bind out = Ivar.read out_ivar in
  let%bind trailers_status =
    (* In case no grpc-status appears in headers or trailers. *)
    if Ivar.is_full trailers_status_ivar then Ivar.read trailers_status_ivar
    else
      return
        (Grpc.Status.v ~message:"Server did not return grpc-status"
           Grpc.Status.Unknown)
  in
  match out with
  | Error _ as e -> return e
  | Ok out -> return (Ok (out, trailers_status))

module Rpc = struct
  type 'a handler =
    H2.Body.Writer.t -> H2.Body.Reader.t Deferred.t -> 'a Deferred.t

  let bidirectional_streaming ~handler write_body read_body =
    let decoder_r, decoder_w = Async.Pipe.create () in
    don't_wait_for
      (let%map read_body = read_body in
       Connection.grpc_recv_streaming read_body decoder_w);
    let encoder_r, encoder_w = Async.Pipe.create () in
    don't_wait_for (Connection.grpc_send_streaming_client write_body encoder_r);
    let%bind out = handler encoder_w decoder_r in
    if not (Pipe.is_closed encoder_w) then Pipe.close encoder_w;
    if not (Pipe.is_closed decoder_w) then Pipe.close decoder_w;
    return out

  let client_streaming ~handler write_body read_body =
    bidirectional_streaming
      ~handler:(fun encoder_w _decoder_r -> handler encoder_w)
      write_body read_body

  let server_streaming ~handler ~encoded_request write_body read_body =
    bidirectional_streaming
      ~handler:(fun encoder_w decoder_r ->
        Async.Pipe.write_without_pushback encoder_w encoded_request;
        Async.Pipe.close encoder_w;
        handler decoder_r)
      write_body read_body

  let unary ~handler ~encoded_request write_body read_body =
    bidirectional_streaming
      ~handler:(fun encoder_w decoder_r ->
        Async.Pipe.write_without_pushback encoder_w encoded_request;
        Async.Pipe.close encoder_w;
        match%bind Async.Pipe.read decoder_r with
        | `Eof -> handler None
        | `Ok a -> handler (Some a))
      write_body read_body
end
OCaml

Innovation. Community. Security.