package domainslib

  1. Overview
  2. Docs

Source file chan.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
(* mutex_condvar will be used per domain; so multiple fibers or
   systhreads may share a mutex_condvar variable *)
type mutex_condvar = {
  mutex: Mutex.t;
  condition: Condition.t
}

type waiting_notified =
  | Waiting
  | Notified

type 'a contents =
  | Empty of {receivers: ('a option ref * mutex_condvar) Fun_queue.t}
  | NotEmpty of {senders: ('a * waiting_notified ref * mutex_condvar) Fun_queue.t; messages: 'a Fun_queue.t}

type 'a t = {
  buffer_size: int option;
  contents: 'a contents Atomic.t
}

let mutex_condvar_key =
  Domain.DLS.new_key (fun () ->
    let m = Mutex.create () in
    let c = Condition.create () in
    {mutex=m; condition=c})

let make_bounded n =
  if n < 0 then raise (Invalid_argument "Chan.make_bounded") ;
  {buffer_size= Some n;
   contents = Atomic.make (Empty {receivers= Fun_queue.empty; })}

let make_unbounded () =
  {buffer_size= None;
   contents = Atomic.make (Empty {receivers= Fun_queue.empty})}

(* [send'] is shared by both the blocking and polling versions. Returns a
 * boolean indicating whether the send was successful. Hence, it always returns
 * [true] if [polling] is [false]. *)
let rec send' {buffer_size; contents} v ~polling =
  let open Fun_queue in
  let old_contents = Atomic.get contents in
  match old_contents with
  | Empty {receivers} -> begin
    (* The channel is empty (no senders) *)
    match pop receivers with
    | None ->
        (* The channel is empty (no senders) and no waiting receivers *)
        if buffer_size = Some 0 then
          (* The channel is empty (no senders), no waiting receivers, and
            * buffer size is 0 *)
          begin if not polling then begin
            (* The channel is empty (no senders), no waiting receivers,
              * buffer size is 0 and we're not polling *)
            let mc = Domain.DLS.get mutex_condvar_key in
            let cond_slot = ref Waiting in
            let new_contents =
              NotEmpty
                {messages= empty; senders= push empty (v, cond_slot, mc)}
            in
            if Atomic.compare_and_set contents old_contents new_contents
            then begin
              Mutex.lock mc.mutex;
              while !cond_slot = Waiting do
                Condition.wait mc.condition mc.mutex
              done;
              Mutex.unlock mc.mutex;
              true
            end else send' {buffer_size; contents} v ~polling
          end else
            (* The channel is empty (no senders), no waiting receivers,
              * buffer size is 0 and we're polling *)
            false
          end
        else
          (* The channel is empty (no senders), no waiting receivers, and
            * the buffer size is non-zero *)
          let new_contents =
            NotEmpty {messages= push empty v; senders= empty}
          in
          if Atomic.compare_and_set contents old_contents new_contents
          then true
          else send' {buffer_size; contents} v ~polling
    | Some ((r, mc), receivers') ->
        (* The channel is empty (no senders) and there are waiting receivers
         * *)
        let new_contents = Empty {receivers= receivers'} in
        if Atomic.compare_and_set contents old_contents new_contents
        then begin
          r := Some v;
          Mutex.lock mc.mutex;
          Mutex.unlock mc.mutex;
          Condition.broadcast mc.condition;
          true
         end else send' {buffer_size; contents} v ~polling
  end
  | NotEmpty {senders; messages} ->
      (* The channel is not empty *)
      if buffer_size = Some (length messages) then
        (* The channel is not empty, and the buffer is full *)
        begin if not polling then
          (* The channel is not empty, the buffer is full and we're not
            * polling *)
          let cond_slot = ref Waiting in
          let mc = Domain.DLS.get mutex_condvar_key in
          let new_contents =
            NotEmpty {senders= push senders (v, cond_slot, mc); messages}
          in
          if Atomic.compare_and_set contents old_contents new_contents then begin
            Mutex.lock mc.mutex;
            while !cond_slot = Waiting do
              Condition.wait mc.condition mc.mutex;
            done;
            Mutex.unlock mc.mutex;
            true
          end else send' {buffer_size; contents} v ~polling
        else
          (* The channel is not empty, the buffer is full and we're
            * polling *)
          false
        end
      else
        (* The channel is not empty, and the buffer is not full *)
        let new_contents =
          NotEmpty {messages= push messages v; senders}
        in
        if Atomic.compare_and_set contents old_contents new_contents
        then true
        else send' {buffer_size; contents} v ~polling

let send c v =
  let r = send' c v ~polling:false in
  assert r

let send_poll c v = send' c v ~polling:true

(* [recv'] is shared by both the blocking and polling versions. Returns a an
 * optional value indicating whether the receive was successful. Hence, it
 * always returns [Some v] if [polling] is [false]. *)
let rec recv' {buffer_size; contents} ~polling =
  let open Fun_queue in
  let old_contents = Atomic.get contents in
  match old_contents with
  | Empty {receivers} ->
      (* The channel is empty (no senders) *)
      if not polling then begin
        (* The channel is empty (no senders), and we're not polling *)
        let msg_slot = ref None in
        let mc = Domain.DLS.get mutex_condvar_key in
        let new_contents =
          Empty {receivers= push receivers (msg_slot, mc)}
        in
        if Atomic.compare_and_set contents old_contents new_contents then
        begin
          Mutex.lock mc.mutex;
          while !msg_slot = None do
            Condition.wait mc.condition mc.mutex;
          done;
          Mutex.unlock mc.mutex;
          !msg_slot
        end else recv' {buffer_size; contents} ~polling
      end else
        (* The channel is empty (no senders), and we're polling *)
        None
  | NotEmpty {senders; messages} ->
      (* The channel is not empty *)
      match (pop messages, pop senders) with
      | None, None ->
          (* The channel is not empty, but no senders or messages *)
          failwith "Chan.recv: Impossible - channel state"
      | Some (m, messages'), None ->
          (* The channel is not empty, there is a message and no
            * waiting senders *)
          let new_contents =
            if length messages' = 0 then
              Empty {receivers = empty}
            else
              NotEmpty {messages= messages'; senders}
          in
          if Atomic.compare_and_set contents old_contents new_contents
          then Some m
          else recv' {buffer_size; contents} ~polling
      | None, Some ((m, c, mc), senders') ->
          (* The channel is not empty, there are no messages, and there
            * is a waiting sender. This is only possible is the buffer
            * size is 0. *)
          assert (buffer_size = Some 0) ;
          let new_contents =
            if length senders' = 0 then
              Empty {receivers = empty}
            else
              NotEmpty {messages; senders= senders'}
          in
          if Atomic.compare_and_set contents old_contents new_contents
          then begin
            c := Notified;
            Mutex.lock mc.mutex;
            Mutex.unlock mc.mutex;
            Condition.broadcast mc.condition;
            Some m
          end else recv' {buffer_size; contents} ~polling
      | Some (m, messages'), Some ((ms, sc, mc), senders') ->
          (* The channel is not empty, there is a message, and there is a
            * waiting sender. *)
          let new_contents =
            NotEmpty {messages= push messages' ms; senders= senders'}
          in
          if Atomic.compare_and_set contents old_contents new_contents
          then begin
            sc := Notified;
            Mutex.lock mc.mutex;
            Mutex.unlock mc.mutex;
            Condition.broadcast mc.condition;
            Some m
          end else recv' {buffer_size; contents} ~polling

let recv c =
  match recv' c ~polling:false with
  | None -> failwith "Chan.recv: impossible - no message"
  | Some m -> m

let recv_poll c =
  match Atomic.get c.contents with
  | Empty _ -> None
  | _ -> recv' c ~polling:true
OCaml

Innovation. Community. Security.