package ocluster-api

  1. Overview
  2. Docs
Cap'n Proto API for OCluster

Install

Dune Dependency

Authors

Maintainers

Sources

ocluster-0.2.1.tbz
sha256=3b88db5ad1edfaf3295bb145c64d5afc6fb7271ac20f69054eb91860dd4a5dff
sha512=88b885c2556b822f7970f4ef9ffd6402ff6cc21cf9aa9ac10a13992e144d23398a99ed1dc71ff09f47c026222c6cfd009af316a45e425d5f058c2f522ebb909d

doc/src/ocluster-api/pool_admin.ml.html

Source file pool_admin.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
open Lwt.Infix
open Capnp_rpc_lwt

type worker_info = {
  name : string;
  active : bool;
  connected : bool;
}

let pp_worker_info f { name; active; connected } =
  let notes = if active then [] else ["paused"] in
  let notes = if connected then notes else "disconnected" :: notes in
  if notes = [] then
    Fmt.string f name
  else
    Fmt.pf f "%s (@[<h>%a@])" name Fmt.(list ~sep:comma string) notes

let local ~show ~workers ~worker ~set_active ~drain ~update ~forget ~set_rate =
  let module X = Raw.Service.PoolAdmin in
  X.local @@ object
    inherit X.service

    method show_impl _params release_param_caps =
      let open X.Show in
      release_param_caps ();
      let response, results = Service.Response.create Results.init_pointer in
      Results.state_set results (show ());
      Service.return response

    method workers_impl _params release_param_caps =
      let open X.Workers in
      release_param_caps ();
      let response, results = Service.Response.create Results.init_pointer in
      let items = workers () in
      let arr = Results.workers_init results (List.length items) in
      items |> List.iteri (fun i { name; active; connected } ->
          let slot = Capnp.Array.get arr i in
          let module B = Raw.Builder.WorkerInfo in
          B.name_set slot name;
          B.active_set slot active;
          B.connected_set slot connected
        );
      Service.return response

    method set_active_impl params release_param_caps =
      let open X.SetActive in
      let worker = Params.worker_get params in
      let active = Params.active_get params in
      let auto_create = Params.auto_create_get params in
      release_param_caps ();
      match set_active ~auto_create worker active with
      | Ok () -> Service.return_empty ()
      | Error `Unknown_worker -> Service.fail "Unknown worker"

    method worker_impl params release_param_caps =
      let open X.Worker in
      let name = Params.worker_get params in
      release_param_caps ();
      match worker name with
      | None -> Service.fail "Unknown worker"
      | Some cap ->
        let response, results = Service.Response.create Results.init_pointer in
        Results.worker_set results (Some cap);
        Capability.dec_ref cap;
        Service.return response

    method update_impl params release_param_caps =
      let open X.Update in
      let name = Params.worker_get params in
      let progress = Params.progress_get params in
      release_param_caps ();
      Service.return_lwt @@ fun () ->
      Lwt.finalize
        (fun () -> update ?progress name)
        (fun () -> Option.iter Capability.dec_ref progress; Lwt.return_unit)

    method drain_impl params release_param_caps =
      let open X.Drain in
      let name = Params.worker_get params in
      let progress = Params.progress_get params in
      release_param_caps ();
      Service.return_lwt @@ fun () ->
      Lwt.finalize
        (fun () -> drain ?progress name)
        (fun () -> Option.iter Capability.dec_ref progress; Lwt.return_unit)

    method forget_impl params release_param_caps =
      let open X.Forget in
      let name = Params.worker_get params in
      release_param_caps ();
      forget name

    method set_rate_impl params release_param_caps =
      let open X.SetRate in
      let client_id = Params.id_get params in
      let rate = Params.rate_get params in
      release_param_caps ();
      match set_rate ~client_id rate with
      | Ok () -> Service.return_empty ()
      | Error `No_such_user -> Service.fail "No such user"
  end

module X = Raw.Client.PoolAdmin

type t = X.t Capability.t

let show t =
  let open X.Show in
  let request = Capability.Request.create_no_args () in
  Capability.call_for_value_exn t method_id request >|= Results.state_get

let workers t =
  let open X.Workers in
  let request = Capability.Request.create_no_args () in
  Capability.call_for_value_exn t method_id request >|= fun results ->
  let module R = Raw.Reader.WorkerInfo in
  Results.workers_get_list results |> List.map @@ fun worker ->
  let name = R.name_get worker in
  let active = R.active_get worker in
  let connected = R.connected_get worker in
  { name; active; connected }

let worker t worker =
  let open X.Worker in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.worker_set params worker;
  Capability.call_for_caps t method_id request Results.worker_get_pipelined

let set_active ?(auto_create=false) t worker active =
  let open X.SetActive in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.worker_set params worker;
  Params.active_set params active;
  Params.auto_create_set params auto_create;
  Capability.call_for_unit_exn t method_id request

let drain ?progress t worker =
  let open X.Drain in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.worker_set params worker;
  Params.progress_set params progress;
  Capability.call_for_unit_exn t method_id request

let update ?progress t worker =
  let open X.Update in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.worker_set params worker;
  Params.progress_set params progress;
  Capability.call_for_unit t method_id request

let forget t worker =
  let open X.Forget in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.worker_set params worker;
  Capability.call_for_unit t method_id request

let set_rate t ~client_id rate =
  let open X.SetRate in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.id_set params client_id;
  Params.rate_set params rate;
  Capability.call_for_unit_exn t method_id request
OCaml

Innovation. Community. Security.