package grpc-async

  1. Overview
  2. Docs

Source file connection.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
open! Core
open! Async

let grpc_recv_streaming body buffer_push =
  let request_buffer = Grpc.Buffer.v () in
  let on_eof () = Async.Pipe.close buffer_push in
  let rec on_read buffer ~off ~len =
    Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer
      ~dst:request_buffer ~length:len;
    Grpc.Message.extract_all
      (Async.Pipe.write_without_pushback buffer_push)
      request_buffer;
    H2.Body.Reader.schedule_read body ~on_read ~on_eof
  in
  H2.Body.Reader.schedule_read body ~on_read ~on_eof

let grpc_send_streaming_client body encoder_stream =
  let%map () =
    Async.Pipe.iter encoder_stream ~f:(fun encoder ->
        let payload = Grpc.Message.make encoder in
        H2.Body.Writer.write_string body payload;
        return ())
  in
  H2.Body.Writer.close body

let grpc_send_streaming request encoder_stream status_mvar =
  let body =
    H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request
      (H2.Response.create
         ~headers:
           (H2.Headers.of_list [ ("content-type", "application/grpc+proto") ])
         `OK)
  in
  let%bind () =
    Async.Pipe.iter encoder_stream ~f:(fun input ->
        let payload = Grpc.Message.make input in
        H2.Body.Writer.write_string body payload;
        H2.Body.Writer.flush body (fun () -> ());
        return ())
  in
  let%map status = Async.Mvar.take status_mvar in
  H2.Reqd.schedule_trailers request
    (H2.Headers.of_list
       ([
          ( "grpc-status",
            string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) );
        ]
       @
       match Grpc.Status.message status with
       | None -> []
       | Some message -> [ ("grpc-message", message) ]));
  H2.Body.Writer.close body
OCaml

Innovation. Community. Security.