package grpc-async
An Async implementation of gRPC
Install
Dune Dependency
Authors
Maintainers
Sources
0.1.0.tar.gz
md5=62c8b2b2dea48f779dcc216ff9213723
sha512=ef4b89f080590fb68dce9b1fe3072fc4201a8d043c2a5d5c5024b3c4f9887c974fc9356479a63b305d1c50ac77a996e99a10705daafede7046a713dc3261e94d
doc/src/grpc-async/client.ml.html
Source file client.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
open! Core open! Async type response_handler = H2.Client_connection.response_handler type do_request = ?flush_headers_immediately:bool -> ?trailers_handler:(H2.Headers.t -> unit) -> H2.Request.t -> response_handler:response_handler -> H2.Body.Writer.t let make_request ~scheme ~service ~rpc ~headers = let request = H2.Request.create ~scheme `POST ("/" ^ service ^ "/" ^ rpc) ~headers in request let default_headers = H2.Headers.of_list [ ("te", "trailers"); ("content-type", "application/grpc+proto") ] let call ~service ~rpc ?(scheme = "https") ~handler ~do_request ?(headers = default_headers) () = let request = make_request ~service ~rpc ~scheme ~headers in let read_body_ivar = Ivar.create () in let handler_res_ivar = Ivar.create () in let out_ivar = Ivar.create () in let trailers_status_ivar = Ivar.create () in let trailers_handler headers = let code = match H2.Headers.get headers "grpc-status" with | None -> None | Some s -> ( match int_of_string_opt s with | None -> None | Some i -> Grpc.Status.code_of_int i) in match code with | None -> () | Some code -> ( match Ivar.is_empty trailers_status_ivar with | true -> let message = H2.Headers.get headers "grpc-message" in let status = Grpc.Status.v ?message code in Ivar.fill trailers_status_ivar status | _ -> (* This should never happen, but just in case. *) ()) in let response_handler (response : H2.Response.t) (body : H2.Body.Reader.t) = Ivar.fill read_body_ivar body; don't_wait_for (match response.status with | `OK -> let%bind handler_res = Ivar.read handler_res_ivar in Ivar.fill out_ivar (Ok handler_res); return () | _ -> Ivar.fill out_ivar (Error (Grpc.Status.v Grpc.Status.Unknown)); return ()); trailers_handler response.headers in let flush_headers_immediately = None in let write_body : H2.Body.Writer.t = do_request ?flush_headers_immediately ?trailers_handler:(Some trailers_handler) request ~response_handler in don't_wait_for (let%bind handler_res = handler write_body (Ivar.read read_body_ivar) in Ivar.fill handler_res_ivar handler_res; return ()); let%bind out = Ivar.read out_ivar in let%bind trailers_status = (* In case no grpc-status appears in headers or trailers. *) if Ivar.is_full trailers_status_ivar then Ivar.read trailers_status_ivar else return (Grpc.Status.v ~message:"Server did not return grpc-status" Grpc.Status.Unknown) in match out with | Error _ as e -> return e | Ok out -> return (Ok (out, trailers_status)) module Rpc = struct type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t Deferred.t -> 'a Deferred.t let bidirectional_streaming ~handler write_body read_body = let decoder_r, decoder_w = Async.Pipe.create () in don't_wait_for (let%map read_body = read_body in Connection.grpc_recv_streaming read_body decoder_w); let encoder_r, encoder_w = Async.Pipe.create () in don't_wait_for (Connection.grpc_send_streaming_client write_body encoder_r); let%bind out = handler encoder_w decoder_r in if not (Pipe.is_closed encoder_w) then Pipe.close encoder_w; if not (Pipe.is_closed decoder_w) then Pipe.close decoder_w; return out let client_streaming ~handler write_body read_body = bidirectional_streaming ~handler:(fun encoder_w _decoder_r -> handler encoder_w) write_body read_body let server_streaming ~handler ~encoded_request write_body read_body = bidirectional_streaming ~handler:(fun encoder_w decoder_r -> Async.Pipe.write_without_pushback encoder_w encoded_request; Async.Pipe.close encoder_w; handler decoder_r) write_body read_body let unary ~handler ~encoded_request write_body read_body = bidirectional_streaming ~handler:(fun encoder_w decoder_r -> Async.Pipe.write_without_pushback encoder_w encoded_request; Async.Pipe.close encoder_w; match%bind Async.Pipe.read decoder_r with | `Eof -> handler None | `Ok a -> handler (Some a)) write_body read_body end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>