package moonpool

  1. Overview
  2. Docs

Source file ws_deque_.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
module A = Atomic_

(* terminology:

   - Bottom: where we push/pop normally. Only one thread can do that.
   - top: where work stealing happens (older values).
     This only ever grows.

   Elements are always added on the bottom end. *)

(** Circular array (size is [2 ^ log_size]) *)
module CA : sig
  type 'a t

  val create : dummy:'a -> unit -> 'a t
  val size : 'a t -> int
  val get : 'a t -> int -> 'a
  val set : 'a t -> int -> 'a -> unit
end = struct
  (** The array has size 256. *)
  let log_size = 8

  type 'a t = { arr: 'a array } [@@unboxed]

  let[@inline] size (_self : _ t) = 1 lsl log_size
  let create ~dummy () : _ t = { arr = Array.make (1 lsl log_size) dummy }

  let[@inline] get (self : 'a t) (i : int) : 'a =
    Array.unsafe_get self.arr (i land ((1 lsl log_size) - 1))

  let[@inline] set (self : 'a t) (i : int) (x : 'a) : unit =
    Array.unsafe_set self.arr (i land ((1 lsl log_size) - 1)) x
end

type 'a t = {
  top: int A.t;  (** Where we steal *)
  bottom: int A.t;  (** Where we push/pop from the owning thread *)
  mutable top_cached: int;  (** Last read value of [top] *)
  arr: 'a CA.t;  (** The circular array *)
}

let create ~dummy () : _ t =
  let top = A.make 0 in
  let arr = CA.create ~dummy () in
  (* allocate far from [top] to avoid false sharing *)
  let bottom = A.make 0 in
  { top; top_cached = 0; bottom; arr }

let[@inline] size (self : _ t) : int = max 0 (A.get self.bottom - A.get self.top)

exception Full

let push (self : 'a t) (x : 'a) : bool =
  try
    let b = A.get self.bottom in
    let t_approx = self.top_cached in

    (* Section 2.3: over-approximation of size.
       Only if it seems too big do we actually read [t]. *)
    let size_approx = b - t_approx in
    if size_approx >= CA.size self.arr - 1 then (
      (* we need to read the actual value of [top], which might entail contention. *)
      let t = A.get self.top in
      self.top_cached <- t;
      let size = b - t in

      if size >= CA.size self.arr - 1 then (* full! *) raise_notrace Full
    );

    CA.set self.arr b x;
    A.set self.bottom (b + 1);
    true
  with Full -> false

let pop (self : 'a t) : 'a option =
  let b = A.get self.bottom in
  let b = b - 1 in
  A.set self.bottom b;

  let t = A.get self.top in
  self.top_cached <- t;

  let size = b - t in
  if size < 0 then (
    (* reset to basic empty state *)
    A.set self.bottom t;
    None
  ) else if size > 0 then (
    (* can pop without modifying [top] *)
    let x = CA.get self.arr b in
    Some x
  ) else (
    assert (size = 0);
    (* there was exactly one slot, so we might be racing against stealers
       to update [self.top] *)
    if A.compare_and_set self.top t (t + 1) then (
      let x = CA.get self.arr b in
      A.set self.bottom (t + 1);
      Some x
    ) else (
      A.set self.bottom (t + 1);
      None
    )
  )

let steal (self : 'a t) : 'a option =
  (* read [top], but do not update [top_cached]
     as we're in another thread *)
  let t = A.get self.top in
  let b = A.get self.bottom in

  let size = b - t in
  if size <= 0 then
    None
  else (
    let x = CA.get self.arr t in
    if A.compare_and_set self.top t (t + 1) then
      (* successfully increased top to consume [x] *)
      Some x
    else
      None
  )
OCaml

Innovation. Community. Security.