package trace

  1. Overview
  2. Docs

Source file b_queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
module A = Trace_core.Internal_.Atomic_

type 'a t = {
  mutex: Mutex.t;
  cond: Condition.t;
  q: 'a Mpsc_bag.t;
  mutable closed: bool;
  consumer_waiting: bool A.t;
}

exception Closed

let create () : _ t =
  {
    mutex = Mutex.create ();
    cond = Condition.create ();
    q = Mpsc_bag.create ();
    closed = false;
    consumer_waiting = A.make false;
  }

let close (self : _ t) =
  Mutex.lock self.mutex;
  if not self.closed then (
    self.closed <- true;
    Condition.broadcast self.cond (* awake waiters so they fail  *)
  );
  Mutex.unlock self.mutex

let push (self : _ t) x : unit =
  if self.closed then raise Closed;
  Mpsc_bag.add self.q x;
  if self.closed then raise Closed;
  if A.get self.consumer_waiting then (
    (* wakeup consumer *)
    Mutex.lock self.mutex;
    Condition.broadcast self.cond;
    Mutex.unlock self.mutex
  )

let rec pop_all (self : 'a t) : 'a list =
  match Mpsc_bag.pop_all self.q with
  | Some l -> l
  | None ->
    if self.closed then raise Closed;
    Mutex.lock self.mutex;
    A.set self.consumer_waiting true;
    (* check again, a producer might have pushed an element since we
       last checked. However if we still find
       nothing, because this comes after [consumer_waiting:=true],
       any producer arriving after that will know to wake us up. *)
    (match Mpsc_bag.pop_all self.q with
    | Some l ->
      A.set self.consumer_waiting false;
      Mutex.unlock self.mutex;
      l
    | None ->
      if self.closed then (
        Mutex.unlock self.mutex;
        raise Closed
      );
      Condition.wait self.cond self.mutex;
      A.set self.consumer_waiting false;
      Mutex.unlock self.mutex;
      pop_all self)
OCaml

Innovation. Community. Security.