package ocluster-api

  1. Overview
  2. Docs
Cap'n Proto API for OCluster

Install

Dune Dependency

Authors

Maintainers

Sources

ocluster-0.2.1.tbz
sha256=3b88db5ad1edfaf3295bb145c64d5afc6fb7271ac20f69054eb91860dd4a5dff
sha512=88b885c2556b822f7970f4ef9ffd6402ff6cc21cf9aa9ac10a13992e144d23398a99ed1dc71ff09f47c026222c6cfd009af316a45e425d5f058c2f522ebb909d

doc/src/ocluster-api/worker.ml.html

Source file worker.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
open Lwt.Infix
open Capnp_rpc_lwt

let ( >>!= ) = Lwt_result.bind

type additional_metric = {
  content_type : string;
  data : string;
}

let local ?(additional_metric = (fun _ -> Lwt.return (Ok None))) ~metrics ~self_update () =
  let module X = Raw.Service.Worker in
  X.local @@ object
    inherit X.service

    method metrics_impl params release_param_caps =
      let open X.Metrics in
      let source = Params.source_get params in
      release_param_caps ();
      let response, results = Service.Response.create Results.init_pointer in
      let collect source =
        Service.return_lwt @@ fun () ->
        metrics source >|= function
        | Ok (content_type, data) ->
          Results.content_type_set results content_type;
          Results.data_set results data;
          Ok response
        | Error (`Msg msg) ->
          Error (`Capnp (Capnp_rpc.Error.exn "%s" msg))
      in
      match source with
      | Agent -> collect `Agent
      | Host -> collect `Host
      | Undefined _ -> Service.fail "Unknown metrics source"

    method additional_metric_impl params release_param_caps =
      let open X.AdditionalMetric in
      let module B = Raw.Builder.Metric in
      let source = Params.source_get params in
      release_param_caps ();
      let response, results = Service.Response.create Results.init_pointer in
      Service.return_lwt @@ fun () ->
      additional_metric source >|= function
      | Ok (Some (content_type, data)) ->
        let b = B.init_root () in
        B.content_type_set b content_type;
        B.data_set b data;
        let am = Raw.Builder.AdditionalMetric.init_root () in
        let _ = Raw.Builder.AdditionalMetric.metric_set_builder am b in
        let _ = Results.metric_set_builder results am in
        Ok response
      | Ok None ->
        let am = Raw.Builder.AdditionalMetric.init_root () in
        let _ = Raw.Builder.AdditionalMetric.not_reported_set am in
        let _ = Results.metric_set_builder results am in
        Ok response
      | Error (`Msg msg) ->
        Error (`Capnp (Capnp_rpc.Error.exn "%s" msg))

    method self_update_impl _params release_param_caps =
      release_param_caps ();
      Service.return_lwt @@ fun () ->
      self_update () >|= function
      | Error (`Msg m) -> Error (`Capnp (Capnp_rpc.Error.exn "%s" m))
      | Ok () -> Ok (Service.Response.create_empty ())
  end

module X = Raw.Client.Worker

type t = X.t Capability.t

let metrics t ~source =
  let open X.Metrics in
  let request, params = Capability.Request.create Params.init_pointer in
  let source =
    match source with
    | `Agent -> Raw.Builder.Worker.MetricsSource.Agent
    | `Host -> Raw.Builder.Worker.MetricsSource.Host
  in
  Params.source_set params source;
  Capability.call_for_value t method_id request >>!= fun results ->
  Lwt_result.return (Results.content_type_get results, Results.data_get results)

let additional_metrics ~extra t =
  let open X.AdditionalMetric in
  let module R = Raw.Reader.AdditionalMetric in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.source_set params extra;
  Capability.call_for_value t method_id request >>!= fun result ->
  let metric = Results.metric_get result in
  match R.get metric with
   | R.Metric m ->
    let content_type = Raw.Reader.Metric.content_type_get m in
    let data = Raw.Reader.Metric.data_get m in
    Lwt.return (Ok (Some { content_type; data }))
   | R.NotReported -> Lwt.return (Ok None)
   | R.Undefined i -> Lwt.return @@ Error (`Capnp (Capnp_rpc.Error.exn "Undefined %i" i))

let self_update t =
  let open X.SelfUpdate in
  let request = Capability.Request.create_no_args () in
  Capability.call_for_unit t method_id request >|= function
  | Ok () -> failwith "update reported success, but should have failed with a disconnection error!"
  | Error (`Capnp (`Exception {ty = `Disconnected; _})) -> Ok ()
  | Error _ as e -> e
OCaml

Innovation. Community. Security.