package obatcher

  1. Overview
  2. Docs
A Framework for building Batched Concurrent Data Structures

Install

Dune Dependency

Authors

Maintainers

Sources

obatcher-1.1.tbz
sha256=2ee8f97a1e4a55899f8fdc48aa422e553d6a4d256e71b59e4257448beaf27dd3
sha512=61d0645dc5bd6955f3e663f133d27d9c8c61081e24bc8d88e73f86380432e783fa50bc4d980a9b17ccb949f6af9b90ef834f379ec9171b692745f05d9a34c0f9

doc/src/obatcher/obatcher.ml.html

Source file obatcher.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
module type Service = sig
  type t
  type cfg
  type 'a op
  type wrapped_op = Mk : 'a op * 'a Picos.Computation.t -> wrapped_op

  val init : ?cfg:cfg -> unit -> t
  val run : t -> wrapped_op array -> unit
end

module Make (S : Service) = struct
  type t = {
    internal : S.t;
    running : bool Atomic.t;
    container : S.wrapped_op Ts_container.t;
  }

  let init ?cfg () =
    {
      internal = S.init ?cfg ();
      running = Atomic.make false;
      container = Ts_container.create ();
    }

  (* TODO: This currently does busy polling by rescheduling threads
     the run at a later time, ideally we should just do something like
     a backoff wait instead *)
  let exec t op =
    let open Picos in
    let comp = Computation.create () in
    let op_set = S.Mk (op, comp) in
    Ts_container.add t.container op_set;
    (* Try launching batch *)
    while Computation.peek comp = None do
      if
        Ts_container.size t.container > 0
        && Atomic.compare_and_set t.running false true
      then (
        (* Batching Fiber *)
        let batch = Ts_container.get t.container in
        S.run t.internal batch;
        Atomic.set t.running false)
      else
        (* A batch is being processed, yield and try again later *)
        Fiber.yield ()
    done;
    Computation.await comp

  let get_internal t = t.internal
end

module type Service_Poly = sig
  type 'a t
  type cfg
  type ('a, 'b) op

  type 'a wrapped_op =
    | Mk : ('a, 'b) op * 'b Picos.Computation.t -> 'a wrapped_op

  val init : ?cfg:cfg -> unit -> 'a t
  val run : 'a t -> 'a wrapped_op array -> unit
end

module Make_Poly (S : Service_Poly) = struct
  type 'a t = {
    internal : 'a S.t;
    running : bool Atomic.t;
    container : 'a S.wrapped_op Ts_container.t;
  }

  let init ?cfg () =
    {
      internal = S.init ?cfg ();
      running = Atomic.make false;
      container = Ts_container.create ();
    }

  (* TODO: This currently does busy polling by rescheduling threads
     the run at a later time, ideally we should just do something like
     a backoff wait instead *)
  let exec t op =
    let open Picos in
    let comp = Computation.create () in
    let op_set = S.Mk (op, comp) in
    Ts_container.add t.container op_set;
    (* Try launching batch *)
    while Computation.peek comp = None do
      if
        Ts_container.size t.container > 0
        && Atomic.compare_and_set t.running false true
      then (
        (* Batching Fiber *)
        let batch = Ts_container.get t.container in
        S.run t.internal batch;
        Atomic.set t.running false)
      else
        (* A batch is being processed, yield and try again later *)
        Fiber.yield ()
    done;
    Computation.await comp

  let get_internal t = t.internal
end
OCaml

Innovation. Community. Security.