package eio

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file sync.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
(* A lock-free synchronous channel with cancellation, using Cells.

   Producers and consumers are paired off and then the producer transfers its
   value to the consumer. This is effectively a bounded queue with a capacity
   of zero.

   Both producers and consumers can cancel while waiting.

   There is an atomic int ([balance]), plus two queues ([consumers] and
   [producers]) made using Cells. When [balance] is positive, it is the number
   of producers waiting with values that no one is yet responsible for
   resuming. When negative, it is the (negative) number of waiting consumers
   that no one is responsible for resuming.

   To put an item:

   1. The producer increments [balance].
   2. If it was negative, the producer resumes one waiting consumer on the [consumers] queue.
      Otherwise, it suspends itself on the [producers] queue.

   To take an item:

   1. The consumer decrements [balance].
   2. If it was positive, the consumer resumes one waiting producer on the [producers] queue.
      Otherwise, it suspends itself on the [consumers] queue.

   Therefore, we never try to resume on a queue unless another party has
   started the process of suspending on it.

   The system will not become idle while a client is responsible for resuming
   something. Therefore, when idle:

   - If [balance <= 0] then there are no waiting producers.
   - If [balance >= 0] then there are no waiting consumers.
   - So, we never have waiting consumers and producers at the same time.

   As usual with Cells, either party may get to the new cell first. Whichever party
   arrives first writes a callback, which the other party will then call when they arrive.

   Note on terminology:

   - The "suspender" of a cell is the party that incremented the queue's suspend index,
     and the "resumer" of a cell is the party that incremented the resume index.

   - Whether "suspending" or "resuming" a cell, you may still have to suspend
     your fiber and resume it later.

   States

   There are four cell states:

   - [In_transition] indicates that the cell is still being initialised, or might be
     getting cancelled. Either way, the suspending party is actively working to
     change the cell's state.

   - [Item] indicates that the producer is ready to provide an item.

   - [Slot] indicates that the consumer is ready to receive an item.

   - [Finished] indicates that the cell is no longer being used (the value has
     been consumed or the cell has finished being cancelled).

   The possible sequences of states on the [producers] queue are:

   In_transition -C> Slot -P> Finished    (consumer arrives first)
                 `P> Item -C> Finished    (producer arrives first)
                          `P> In_transition -P> Finished   (producer cancels)
                                            `C> Slot -P> Finished   (cancellation interrupted)

   Only the producer can cancel here. For the [consumers] queue it's the
   opposite - the consumer can cancel its [Slot].

   Cancellation

   Note that there are two kinds of cancellation here:

   1. A cancelled cell is not considered part of its queue. Anyone seeing one
      (due to a race) will skip over it and use the next cell.

   2. After a consumer and producer have been paired off (and the cell removed
      from its queue), the consumer callback may reject the value. If this
      happens, the producer must start all over again to find another consumer.

   Whenever a consumer sets its callback to reject values, it should then start
   the process of cancelling its cell (if acting as a suspender) so that the
   cell can be GC'd.

   A consumer can only cancel its cell when it's on the [consumers] queue.
   If it's on [producers], it knows a wake up will be coming shortly anyway.
   A consumer cancels its cell as follows:

   1. The consumer sets its cell in [consumers] to [In_transition].
   2. It increments [balance] (from a negative value). It is now committed to cancelling.
   3. It sets its cell to [Finished].

   (1) will fail if the cell got resumed first. In that case the consumer just
   rejects the cancellation attempt.

   (2) will fail if [balance >= 0]. In that case the consumer has not cancelled,
   and is about to be resumed instead. It tries to return to the [Slot] state.
   If that fails, the cell now contains an Item and the consumer takes it.

   (3) will fail if a producer arrived after the consumer committed to cancelling.
   In that case, the consumer passes the Item on to the next consumer (there
   must be another one, since both the consumer and producer incremented
   [balance] from a negative value).

   Cancelling a producer is very similar to cancelling a consumer, just with the
   [producers] queue and decrementing the balance from a positive value.

   Non-blocking take

   To perform a non-blocking take:

   1. The consumer decrements [balance] from a positive number.
   2. The consumer takes the next resume cell from [producers].
   3. The consumer takes the [Item] from the cell, setting it to [Finished].

   (1) will fail if there are no unassigned items available.
   Then the [take_nonblocking] returns [None], as there are no items waiting.

   (3) will fail if the producer is initialising or cancelling. In either case,
   the consumer sets its cell to a request with a dummy callback that rejects
   all values and continues immediately.

   The exchange

   Once a producer and consumer have been paired off (and so their cell is now Finished),
   the producer's value is passed to the consumer's callback. If the consumer accepts it,
   then both fibers are resumed. If not, the producer starts again (incrementing [balance]
   again) and waits for another consumer.

   The above has not been formally verified (exercise for reader!). *)

(* Import these directly because we copy this file for the dscheck tests. *)
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend
module Cancel = Eio__core.Cancel

type 'a item = {
  v : 'a;
  kp : (bool, exn) result -> unit;      (* [Ok false] means consumer refused the item; retry. *)
  cancel : [
    | `Resuming                         (* In the process of resuming, so can't cancel. *)
    | `Suspended of (unit -> bool)      (* Call this function to attempt to leave the queue. *)
    | `Cancelled of exn                 (* Already cancelled. *)
  ] Atomic.t;
}

type 'a cell =
  | In_transition
  | Slot of ('a -> bool)
  | Item of 'a item
  | Finished

module Cell = struct
  type 'a t = 'a cell

  let init = In_transition

  let segment_order = 2

  let dump f = function
    | In_transition -> Fmt.string f "In_transition"
    | Slot _ -> Fmt.string f "Slot"
    | Item _ -> Fmt.string f "Item"
    | Finished -> Fmt.string f "Finished"
end

module Q = Cells.Make(Cell)

type 'a t = {
  balance : int Atomic.t;
  consumers : 'a Q.t;
  producers : 'a Q.t;
}

type 'a loc =
  | Short of 'a Cell.t Atomic.t                 (* Acting as resumer of cell *)
  | Long of ('a Q.segment * 'a Cell.t Atomic.t) (* Acting as suspender of cell; can cancel *)

let dump f t =
  Fmt.pf f "@[<v2>Sync (balance=%d)@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"
    (Atomic.get t.balance)
    Q.dump t.consumers
    Q.dump t.producers

(* Give [item] to consumer [kc]. [item]'s cell is now Finished. *)
let exchange item kc = item.kp (Ok (kc item.v))

(* Add [value] to [cell].
   If the cell is in transition, place [value] there and let the other party handle it later.
   If the peer's value is already present, do the exchange.
   If the peer cancelled the cell then try the next one on the given resume queue (if we're adding
   to a suspend queue then it can't be cancelled, because the caller controls cancellation).
   This is only used when our fiber is already suspended,
   since we can't create [value] before we have the continuation. *)
let rec add_to_cell queue value cell =
  match Atomic.get cell, value with
  | Finished, _ -> add_to_cell queue value (Q.next_resume queue)     (* Cancelled - skip *)
  | (Slot kc   as old), Item item
  | (Item item as old), Slot kc ->
    if Atomic.compare_and_set cell old Finished then exchange item kc
    else add_to_cell queue value cell
  | In_transition, _ ->
    if Atomic.compare_and_set cell In_transition value then ()
    else add_to_cell queue value cell
  | (Slot _ | Item _), _ -> assert false

(* Cancelling *)

let rec decr_balance_if_positive t =
  let cur = Atomic.get t.balance in
  if cur > 0 then (
    if Atomic.compare_and_set t.balance cur (cur - 1) then true
    else decr_balance_if_positive t
  ) else false

let rec incr_balance_if_negative t =
  let cur = Atomic.get t.balance in
  if cur < 0 then (
    if Atomic.compare_and_set t.balance cur (cur + 1) then true
    else incr_balance_if_negative t
  ) else false

(* Cancel [cell] on our suspend queue.
   This function works for both consumers and producers, as we can tell from
   the value what our role is (and if there isn't a value, we're finished anyway).
   Neither party will try to cancel before writing its own value.
   Returns [true] if the caller cancelled successfully,
   or [false] if it must wait (as it's being resumed). *)
let cancel t (segment, cell) =
  let cancel2 update_balance ~old =
    if Atomic.compare_and_set cell old In_transition then (
      if update_balance t then (
        (* At this point, we are committed to cancelling. *)
        begin match Atomic.exchange cell Finished with
          | Finished -> assert false
          | In_transition -> Q.cancel_cell segment
          | Item request -> add_to_cell t.consumers (Item request) (Q.next_resume t.consumers)
          | Slot kc      -> add_to_cell t.producers (Slot kc)      (Q.next_resume t.producers)
        end;
        true
      ) else (
        (* We decided not to cancel. We know a resume is coming. *)
        if Atomic.compare_and_set cell In_transition old then false
        else (
          match old, Atomic.get cell with
          | Slot kc, Item request
          | Item request, Slot kc ->
            Atomic.set cell Finished;
            exchange request kc;
            false
          | _ -> assert false
        )
      )
    ) else false          (* The peer resumed us first *)
  in
  match Atomic.get cell with
  | Finished -> false     (* The peer resumed us first *)
  | Slot _ as old -> cancel2 incr_balance_if_negative ~old      (* We are a consumer *)
  | Item _ as old -> cancel2 decr_balance_if_positive ~old      (* We are a producer *)
  | In_transition ->
    (* Either we're initialising the cell, in which case we haven't told the
       application how to cancel this location yet, or we're already
       cancelling, but cancelling twice isn't permitted. *)
    assert false

(* A producer can't cancel if it is resuming on the [consumers] queue, and will instead
   just wait for the slot in that case, which will arrive soon. However, after getting
   a slot the producer may be rejected and be asked to start again on the [producers] queue,
   so we need to remember that we were cancelled to prevent that. It's also possible that
   we're already restarting but haven't got around to updating [request.cancel] yet; we'll
   notice the new [`Cancelled] state when we do. *)
let cancel_put request ex =
  match Atomic.exchange request.cancel (`Cancelled ex) with
  | `Cancelled _ -> failwith "Already cancelled!"
  | `Resuming -> false  (* Cancellation fails for now, but we remember we wanted to cancel. *)
  | `Suspended cancel -> cancel ()

(* Putting. *)

(* Like [add_to_cell], but we haven't created our value yet as we haven't suspended the fiber. *)
let rec producer_resume_cell t ~success ~in_transition cell =
  match Atomic.get (cell : _ Cell.t Atomic.t) with
  | Item _ -> assert false
  | In_transition -> in_transition cell
  | Finished -> producer_resume_cell t ~success ~in_transition (Q.next_resume t.consumers)
  | Slot k as old ->
    if Atomic.compare_and_set cell old Finished then success k
    else producer_resume_cell t ~success ~in_transition cell

(* This is essentially the main [put] function, but parameterised so it can be shared with
   the rejoin-after-rejection case. *)
let producer_join (t : _ t) ~success ~suspend =
  let old = Atomic.fetch_and_add t.balance (+1) in
  if old < 0 then (
    let cell = Q.next_resume t.consumers in
    producer_resume_cell t cell
      ~success
      ~in_transition:(fun cell -> suspend (Short cell))
  ) else (
    suspend (Long (Q.next_suspend t.producers))
  )

(* Called when a consumer took our value but then rejected it.
   We start the put operation again, except that our fiber is already suspended
   so no need to do that again. We're probably running in the consumer's domain
   (unless the consumer provided their callback while we were cancelling). *)
let put_already_suspended t request =
  producer_join t
    ~success:(exchange request)
    ~suspend:(fun loc ->
        let Short cell | Long (_, cell) = loc in
        add_to_cell t.consumers (Item request) cell;
        let rec aux () =
          match Atomic.get request.cancel, loc with
          | (`Suspended _ | `Resuming as prev), Long loc ->
            (* We might be suspended for a while. Update the cancel function with the new location. *)
            let cancel_fn () = cancel t loc in
            if not (Atomic.compare_and_set request.cancel prev (`Suspended cancel_fn)) then aux ()
          | `Cancelled ex, Long loc ->
            (* We got cancelled after the peer removed our cell and before we updated the
               cancel function with the new location, or we were cancelled while doing a
               (non-cancellable) resume. Deal with it now. *)
            if cancel t loc then request.kp (Error ex);
            (* else we got resumed first *)
          | _, Short _ ->
            (* We can't cancel while in the process of resuming a cell on the [consumers] queue.
               We could set [cancel] to [`Resuming] here, but there's no need as trying to use the
               old cancel function will find the old cell is cancelled and set [request.cancel]
               to [`Cancelled]), as required. *)
            ()
        in aux ()
      )

(* We tried to [put] and no slot was immediately available.
   Suspend the fiber and use the continuation to finish initialising the cell.
   Note that we may be suspending the fiber even when using the "resume" queue,
   if the consumer is still in the process of writing its slot. *)
let put_suspend t v loc =
  Suspend.enter_unchecked @@ fun ctx enqueue ->
  let cancel =
    match loc with
    | Short _ -> `Resuming      (* Can't cancel this *)
    | Long loc -> `Suspended (fun () -> cancel t loc)
  in
  let rec item = {
    v;
    cancel = Atomic.make cancel;
    kp = function
      | Error _ as e -> enqueue e                 (* Cancelled by [put_already_suspended]. *)
      | Ok true -> enqueue (Ok ())                (* Success! *)
      | Ok false -> put_already_suspended t item  (* Consumer rejected value. Restart. *)
  } in
  let Short cell | Long (_, cell) = loc in
  add_to_cell t.consumers (Item item) cell;
  (* Set up the cancel handler in either case because we might change queues later: *)
  match Fiber_context.get_error ctx with
  | Some ex ->
    if cancel_put item ex then enqueue (Error ex);
    (* else being resumed *)
  | None ->
    Fiber_context.set_cancel_fn ctx (fun ex ->
        if cancel_put item ex then enqueue (Error ex)
        (* else being resumed *)
      )

let rec put (t : _ t) v =
  producer_join t
    ~success:(fun kc -> if kc v then () else put t v)
    ~suspend:(put_suspend t v)

(* Taking. *)

(* Mirror of [producer_resume_cell]. *)
let rec consumer_resume_cell t ~success ~in_transition cell =
  match Atomic.get (cell : _ Cell.t Atomic.t) with
  | Slot _ -> assert false
  | In_transition -> in_transition cell
  | Finished -> consumer_resume_cell t ~success ~in_transition (Q.next_resume t.producers)
  | Item req as old ->
    if Atomic.compare_and_set cell old Finished then success req
    else consumer_resume_cell t ~success ~in_transition cell

let take_suspend t loc =
  Suspend.enter_unchecked @@ fun ctx enqueue ->
  let Short cell | Long (_, cell) = loc in
  let kc v = enqueue (Ok v); true in
  add_to_cell t.producers (Slot kc) cell;
  match loc with
  | Short _ -> ()
  | Long loc ->
    match Fiber_context.get_error ctx with
    | Some ex ->
      if cancel t loc then enqueue (Error ex);
      (* else being resumed *)
    | None ->
      Fiber_context.set_cancel_fn ctx (fun ex ->
          if cancel t loc then enqueue (Error ex)
          (* else being resumed *)
        )

let take (t : _ t) =
  let old = Atomic.fetch_and_add t.balance (-1) in
  if old > 0 then (
    let cell = Q.next_resume t.producers in
    consumer_resume_cell t cell
      ~success:(fun item -> item.kp (Ok true); item.v)
      ~in_transition:(fun cell -> take_suspend t (Short cell))
  ) else (
    take_suspend t (Long (Q.next_suspend t.consumers))
  )

let reject = Slot (fun _ -> false)

let take_nonblocking (t : _ t) =
  if decr_balance_if_positive t then (
    let rec aux cell =
      consumer_resume_cell t cell
        ~success:(fun item ->
            item.kp (Ok true);          (* Always accept the item *)
            Some item.v
          )
        ~in_transition:(fun cell ->
            (* Our producer is still in the process of writing its [Item], but
               we're non-blocking and can't wait. We're always acting as the
               resumer, so we can't cancel the cell. Instead, we provide a
               consumer callback that always rejects.
               todo: could spin for a bit here first - the Item will probably arrive soon,
               and that would avoid making the producer start again. *)
            Domain.cpu_relax ();        (* Brief wait to encourage producer to finish *)
            if Atomic.compare_and_set cell In_transition reject then None
            else aux cell
          )
    in aux (Q.next_resume t.producers)
  ) else None   (* No waiting producers for us *)

(* Creation and status. *)

let create () =
  {
    consumers = Q.make ();
    producers = Q.make ();
    balance = Atomic.make 0;
  }

let balance t = Atomic.get t.balance
OCaml

Innovation. Community. Security.