package streamable

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file pipe_rpc_intf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
(** A Streamable.Pipe_rpc is just like a Pipe_rpc except the updates are
    streamed out gradually rather than sent in one big [bin_io] blob. *)

open! Core
open! Async_kernel
open! Import

module type S = sig
  val name    : string
  val version : int

  type query [@@deriving bin_io]
  type response

  module Response : Main.S_rpc with type t = response

  val client_pushes_back : bool
end

module type Pipe_rpc = sig
  module type S = S

  type ('q, 'r) t

  val description : _ t -> Rpc.Description.t

  val dispatch
    :  ?metadata:Rpc_metadata.t
    -> ('q, 'r) t
    -> Rpc.Connection.t
    -> 'q
    -> 'r Pipe.Reader.t Deferred.Or_error.t

  val implement
    :  ?on_exception:Rpc.On_exception.t (** default: [On_exception.continue] **)
    -> ('q, 'r) t
    -> ('conn_state -> 'q -> 'r Pipe.Reader.t Deferred.Or_error.t)
    -> 'conn_state Rpc.Implementation.t

  val bin_query_shape    : _ t -> Bin_prot.Shape.t
  val bin_response_shape : _ t -> Bin_prot.Shape.t

  module Make (X : S) : sig
    val rpc : (X.query, X.response) t

    (** [implement'] is like [implement rpc] except that it allows the server
        to control the conversion from [response]s to parts. *)
    val implement'
      :  ?on_exception:Rpc.On_exception.t (** default: [On_exception.continue] **)
      -> ('conn_state
          -> X.query
          -> X.Response.Intermediate.Part.t Pipe.Reader.t Pipe.Reader.t
               Deferred.Or_error.t)
      -> 'conn_state Rpc.Implementation.t
  end
end
OCaml

Innovation. Community. Security.