package picos

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file condition.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
open Picos

type state = Empty | Queue of { head : Trigger.t list; tail : Trigger.t list }
type t = state Atomic.t

let create ?(padded = false) () =
  let t = Atomic.make Empty in
  if padded then Multicore_magic.copy_as_padded t else t

let broadcast t =
  if Atomic.get t != Empty then
    match Atomic.exchange t Empty with
    | Empty -> ()
    | Queue r ->
        List.iter Trigger.signal r.head;
        List.iter Trigger.signal (List.rev r.tail)

(* We try to avoid starvation of signal by making it so that when, at the start
   of signal or wait, the head is empty, the tail is reversed into the head.
   This way both signal and wait attempt O(1) and O(n) operations at the same
   time. *)

let rec signal t backoff =
  match Atomic.get t with
  | Empty -> ()
  | Queue r as before -> begin
      match r.head with
      | trigger :: head ->
          signal_cas t backoff before
            (if head == [] && r.tail == [] then Empty else Queue { r with head })
            trigger
      | [] -> begin
          match List.rev r.tail with
          | trigger :: head ->
              signal_cas t backoff before
                (if head == [] then Empty else Queue { head; tail = [] })
                trigger
          | [] -> failwith "impossible"
        end
    end

and signal_cas t backoff before after trigger =
  if Atomic.compare_and_set t before after then Trigger.signal trigger
  else signal t (Backoff.once backoff)

let signal t = signal t Backoff.default

let rec cleanup backoff trigger t =
  (* We have been canceled.  If we can't drop our trigger from the variable, we
     signal the next trigger in queue to make sure each signal wakes up at least
     one non-canceled waiter if possible. *)
  match Atomic.get t with
  | Empty -> ()
  | Queue r as before -> begin
      if r.head != [] then
        match List_ext.drop_first_or_not_found trigger r.head with
        | head ->
            cleanup_cas backoff trigger t before
              (if head == [] && r.tail == [] then Empty
               else Queue { r with head })
        | exception Not_found -> begin
            match List_ext.drop_first_or_not_found trigger r.tail with
            | tail ->
                cleanup_cas backoff trigger t before (Queue { r with tail })
            | exception Not_found -> signal t
          end
      else
        match List_ext.drop_first_or_not_found trigger r.tail with
        | tail ->
            cleanup_cas backoff trigger t before
              (if tail == [] then Empty else Queue { head = []; tail })
        | exception Not_found -> signal t
    end

and cleanup_cas backoff trigger t before after =
  if not (Atomic.compare_and_set t before after) then
    cleanup (Backoff.once backoff) trigger t

let rec wait t mutex trigger fiber backoff =
  let before = Atomic.get t in
  let after =
    match before with
    | Empty -> Queue { head = [ trigger ]; tail = [] }
    | Queue r ->
        if r.head != [] then Queue { r with tail = trigger :: r.tail }
        else Queue { head = List.rev_append r.tail [ trigger ]; tail = [] }
  in
  if Atomic.compare_and_set t before after then begin
    Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
    let result = Trigger.await trigger in
    let forbid = Fiber.exchange fiber ~forbid:true in
    Mutex.lock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
    Fiber.set fiber ~forbid;
    match result with
    | None -> ()
    | Some exn_bt ->
        cleanup Backoff.default trigger t;
        Exn_bt.raise exn_bt
  end
  else wait t mutex trigger fiber (Backoff.once backoff)

let wait t mutex =
  let fiber = Fiber.current () in
  let trigger = Trigger.create () in
  wait t mutex trigger fiber Backoff.default
OCaml

Innovation. Community. Security.