Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
connection.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
open! Core open! Async let grpc_recv_streaming body buffer_push = let request_buffer = Grpc.Buffer.v () in let on_eof () = Async.Pipe.close buffer_push in let rec on_read buffer ~off ~len = Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer ~dst:request_buffer ~length:len; Grpc.Message.extract_all (Async.Pipe.write_without_pushback buffer_push) request_buffer; H2.Body.Reader.schedule_read body ~on_read ~on_eof in H2.Body.Reader.schedule_read body ~on_read ~on_eof let grpc_send_streaming_client body encoder_stream = let%map () = Async.Pipe.iter encoder_stream ~f:(fun encoder -> let payload = Grpc.Message.make encoder in H2.Body.Writer.write_string body payload; return ()) in H2.Body.Writer.close body let grpc_send_streaming request encoder_stream status_mvar = let body = H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request (H2.Response.create ~headers: (H2.Headers.of_list [ ("content-type", "application/grpc+proto") ]) `OK) in let%bind () = Async.Pipe.iter encoder_stream ~f:(fun input -> let payload = Grpc.Message.make input in H2.Body.Writer.write_string body payload; H2.Body.Writer.flush body (fun () -> ()); return ()) in let%map status = Async.Mvar.take status_mvar in H2.Reqd.schedule_trailers request (H2.Headers.of_list ([ ( "grpc-status", string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) ); ] @ match Grpc.Status.message status with | None -> [] | Some message -> [ ("grpc-message", message) ])); H2.Body.Writer.close body