package async_durable

  1. Overview
  2. Docs

Source file durable_pipe_rpc.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
open Core_kernel
open Async_kernel
open Async_rpc_kernel

module Update = struct
  type ('response, 'error) t =
    | Attempting_new_connection
    | Connection_success of Rpc.Pipe_rpc.Metadata.t
    | Lost_connection
    | Failed_to_connect of Error.t
    | Rpc_error of 'error
    | Update of 'response
end

let filter_map_update update =
  let module L = Durable_state_rpc.Update in
  let module R = Update in
  match update with
  | L.Attempting_new_connection -> Some R.Attempting_new_connection
  | L.Connection_success metadata -> Some (R.Connection_success metadata)
  | L.Lost_connection -> Some R.Lost_connection
  | L.Failed_to_connect e -> Some (R.Failed_to_connect e)
  | L.Rpc_error e -> Some (R.Rpc_error e)
  | L.Update r -> Some (R.Update r)
  | L.State () -> None
;;

let create connection rpc ~query ~resubscribe_delay =
  let dispatch conn =
    Rpc.Pipe_rpc.dispatch rpc conn query
    >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id)
  in
  Durable_state_rpc.Expert.create connection ~dispatch ~resubscribe_delay
  |> Pipe.filter_map ~f:filter_map_update
;;

let create_versioned
      (type query response error)
      connection
      rpc_module
      ~(query : query)
      ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc = (val rpc_module : Versioned_rpc.Both_convert.Pipe_rpc.S
                           with type caller_query = query
                            and type caller_response = response
                            and type caller_error = error)
    in
    Pipe_rpc.dispatch_multi conn query
    >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id)
  in
  Durable_state_rpc.Expert.create connection ~dispatch ~resubscribe_delay
  |> Pipe.filter_map ~f:filter_map_update
;;

let create_versioned'
      (type query response error)
      connection
      rpc_module
      ~(query : query)
      ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc = (val rpc_module : Versioned_rpc.Caller_converts.Pipe_rpc.S
                           with type query = query
                            and type response = response
                            and type error = error)
    in
    Pipe_rpc.dispatch_multi conn query
    >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id)
  in
  Durable_state_rpc.Expert.create connection ~dispatch ~resubscribe_delay
  |> Pipe.filter_map ~f:filter_map_update
;;

let create_or_fail connection rpc ~query ~resubscribe_delay =
  let dispatch conn =
    Rpc.Pipe_rpc.dispatch rpc conn query
    >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id)
  in
  Durable_state_rpc.Expert.create_or_fail connection ~dispatch ~resubscribe_delay
  >>|? Result.map ~f:(Pipe.filter_map ~f:filter_map_update)
;;

let create_or_fail_versioned
      (type query response error)
      connection
      rpc_module
      ~(query : query)
      ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc = (val rpc_module : Versioned_rpc.Both_convert.Pipe_rpc.S
                           with type caller_query = query
                            and type caller_response = response
                            and type caller_error = error)
    in
    Pipe_rpc.dispatch_multi conn query
    >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id)
  in
  Durable_state_rpc.Expert.create_or_fail connection ~dispatch ~resubscribe_delay
  >>|? Result.map ~f:(Pipe.filter_map ~f:filter_map_update)
;;

let create_or_fail_versioned'
      (type query response error)
      connection
      rpc_module
      ~(query : query)
      ~resubscribe_delay
  =
  let dispatch conn =
    let module Pipe_rpc = (val rpc_module : Versioned_rpc.Caller_converts.Pipe_rpc.S
                           with type query = query
                            and type response = response
                            and type error = error)
    in
    Pipe_rpc.dispatch_multi conn query
    >>|? Result.map ~f:(fun (pipe, id) -> (), pipe, id)
  in
  Durable_state_rpc.Expert.create_or_fail connection ~dispatch ~resubscribe_delay
  >>|? Result.map ~f:(Pipe.filter_map ~f:filter_map_update)
;;
OCaml

Innovation. Community. Security.