Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file sync.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552(* A lock-free synchronous channel with cancellation, using Cells.
Producers and consumers are paired off and then the producer transfers its
value to the consumer. This is effectively a bounded queue with a capacity
of zero.
Both producers and consumers can cancel while waiting.
There is an atomic int ([balance]), plus two queues ([consumers] and
[producers]) made using Cells. When [balance] is positive, it is the number
of producers waiting with values that no one is yet responsible for
resuming. When negative, it is the (negative) number of waiting consumers
that no one is responsible for resuming.
To put an item:
1. The producer increments [balance].
2. If it was negative, the producer resumes one waiting consumer on the [consumers] queue.
Otherwise, it suspends itself on the [producers] queue.
To take an item:
1. The consumer decrements [balance].
2. If it was positive, the consumer resumes one waiting producer on the [producers] queue.
Otherwise, it suspends itself on the [consumers] queue.
Therefore, we never try to resume on a queue unless another party has
started the process of suspending on it.
The system will not become idle while a client is responsible for resuming
something. Therefore, when idle:
- If [balance <= 0] then there are no waiting producers.
- If [balance >= 0] then there are no waiting consumers.
- So, we never have waiting consumers and producers at the same time.
As usual with Cells, either party may get to the new cell first. Whichever party
arrives first writes a callback, which the other party will then call when they arrive.
Note on terminology:
- The "suspender" of a cell is the party that incremented the queue's suspend index,
and the "resumer" of a cell is the party that incremented the resume index.
- Whether "suspending" or "resuming" a cell, you may still have to suspend
your fiber and resume it later.
States
There are four cell states:
- [In_transition] indicates that the cell is still being initialised, or might be
getting cancelled. Either way, the suspending party is actively working to
change the cell's state.
- [Item] indicates that the producer is ready to provide an item.
- [Slot] indicates that the consumer is ready to receive an item.
- [Finished] indicates that the cell is no longer being used (the value has
been consumed or the cell has finished being cancelled).
The possible sequences of states on the [producers] queue are:
In_transition -C> Slot -P> Finished (consumer arrives first)
`P> Item -C> Finished (producer arrives first)
`P> In_transition -P> Finished (producer cancels)
`C> Slot -P> Finished (cancellation interrupted)
Only the producer can cancel here. For the [consumers] queue it's the
opposite - the consumer can cancel its [Slot].
Cancellation
Note that there are two kinds of cancellation here:
1. A cancelled cell is not considered part of its queue. Anyone seeing one
(due to a race) will skip over it and use the next cell.
2. After a consumer and producer have been paired off (and the cell removed
from its queue), the consumer callback may reject the value. If this
happens, the producer must start all over again to find another consumer.
Whenever a consumer sets its callback to reject values, it should then start
the process of cancelling its cell (if acting as a suspender) so that the
cell can be GC'd.
A consumer can only cancel its cell when it's on the [consumers] queue.
If it's on [producers], it knows a wake up will be coming shortly anyway.
A consumer cancels its cell as follows:
1. The consumer sets its cell in [consumers] to [In_transition].
2. It increments [balance] (from a negative value). It is now committed to cancelling.
3. It sets its cell to [Finished].
(1) will fail if the cell got resumed first. In that case the consumer just
rejects the cancellation attempt.
(2) will fail if [balance >= 0]. In that case the consumer has not cancelled,
and is about to be resumed instead. It tries to return to the [Slot] state.
If that fails, the cell now contains an Item and the consumer takes it.
(3) will fail if a producer arrived after the consumer committed to cancelling.
In that case, the consumer passes the Item on to the next consumer (there
must be another one, since both the consumer and producer incremented
[balance] from a negative value).
Cancelling a producer is very similar to cancelling a consumer, just with the
[producers] queue and decrementing the balance from a positive value.
Non-blocking take
To perform a non-blocking take:
1. The consumer decrements [balance] from a positive number.
2. The consumer takes the next resume cell from [producers].
3. The consumer takes the [Item] from the cell, setting it to [Finished].
(1) will fail if there are no unassigned items available.
Then the [take_nonblocking] returns [None], as there are no items waiting.
(3) will fail if the producer is initialising or cancelling. In either case,
the consumer sets its cell to a request with a dummy callback that rejects
all values and continues immediately.
Close
The LSB of the balance atomic is used to indicate that the stream has been closed.
When closed, the balance is always zero and no new consumers or producers can be added.
The closing thread is responsible for cancelling all pre-existing users.
The exchange
Once a producer and consumer have been paired off (and so their cell is now Finished),
the producer's value is passed to the consumer's callback. If the consumer accepts it,
then both fibers are resumed. If not, the producer starts again (incrementing [balance]
again) and waits for another consumer.
The above has not been formally verified (exercise for reader!). *)(* Import these directly because we copy this file for the dscheck tests. *)moduleFiber_context=Eio__core.Private.Fiber_contextmoduleSuspend=Eio__core.Private.SuspendmoduleCancel=Eio__core.Canceltypeproducer_result=|Sent(* Consumer accepted item. *)|Rejected(* Consumer rejected the item. Retry. *)|Failedofexn(* Cancelled or closed. *)type'aitem={v:('a,[`Closed])result;kp:producer_result->unit;cancel:[|`Resuming(* In the process of resuming, so can't cancel. *)|`Suspendedof(unit->bool)(* Call this function to attempt to leave the queue. *)|`Cancelledofexn(* Already cancelled. *)]Atomic.t;}type'acell=|In_transition|Slotof(('a,[`Closed])result->bool)|Itemof'aitem|FinishedmoduleCell=structtype'at='acellletinit=In_transitionletsegment_order=2letdumpf=function|In_transition->Fmt.stringf"In_transition"|Slot_->Fmt.stringf"Slot"|Item_->Fmt.stringf"Item"|Finished->Fmt.stringf"Finished"endmoduleQ=Cells.Make(Cell)typeupdate_result=|Updated|Update_refused|Balance_closedmoduleBalance:sigtypetvalmake:unit->tvalclose:t->(int,[>`Closed])result(* Mark as closed and return the previous state. *)valget:t->(int,[>`Closed])result(** [get t] is the number of items available (if non-negative) or the
number of consumers waiting for an item. *)valfetch_and_add:t->int->(int,[>`Closed])result(** [fetch_and_add t diff] increases the value by [diff] and returns the old value. *)valincr_if_negative:t->update_resultvaldecr_if_positive:t->update_resultvalpp:tFmt.tend=structtypet=intAtomic.tletclosed=1letcounterx=xasr1letis_closedx=(xland1)<>0letvaluex=ifis_closedxthenError`ClosedelseOk(counterx)letfetch_and_addxdiff=value(Atomic.fetch_and_addx(difflsl1))letrecdecr_if_positivet=letx=Atomic.gettinifis_closedxthenBalance_closedelseifcounterx>0then(ifAtomic.compare_and_settx(x-2)thenUpdatedelsedecr_if_positivet)elseUpdate_refusedletrecincr_if_negativet=letx=Atomic.gettinifis_closedxthenBalance_closedelseifcounterx<0then(ifAtomic.compare_and_settx(x+2)thenUpdatedelseincr_if_negativet)elseUpdate_refusedletmake()=Atomic.make0letcloset=value(Atomic.exchangetclosed)letgett=value(Atomic.gett)letppft=matchgettwith|Okx->Fmt.intfx|Error`Closed->Fmt.stringf"(closed)"endtype'at={balance:Balance.t;consumers:'aQ.t;producers:'aQ.t;}type'aloc=|Shortof'aCell.tAtomic.t(* Acting as resumer of cell *)|Longof('aQ.segment*'aCell.tAtomic.t)(* Acting as suspender of cell; can cancel *)letdumpft=Fmt.pff"@[<v2>Sync (balance=%a)@,@[<v2>Consumers:@,%a@]@,@[<v2>Producers:@,%a@]@]"Balance.ppt.balanceQ.dumpt.consumersQ.dumpt.producers(* Give [item] to consumer [kc]. [item]'s cell is now Finished. *)letexchangeitemkc=item.kp(ifkcitem.vthenSentelseRejected)(* Add [value] to [cell].
If the cell is in transition, place [value] there and let the other party handle it later.
If the peer's value is already present, do the exchange.
If the peer cancelled the cell then try the next one on the given resume queue (if we're adding
to a suspend queue then it can't be cancelled, because the caller controls cancellation).
This is only used when our fiber is already suspended,
since we can't create [value] before we have the continuation. *)letrecadd_to_cellqueuevaluecell=matchAtomic.getcell,valuewith|Finished,_->add_to_cellqueuevalue(Q.next_resumequeue)(* Cancelled - skip *)|(Slotkcasold),Itemitem|(Itemitemasold),Slotkc->ifAtomic.compare_and_setcelloldFinishedthenexchangeitemkcelseadd_to_cellqueuevaluecell|In_transition,_->ifAtomic.compare_and_setcellIn_transitionvaluethen()elseadd_to_cellqueuevaluecell|(Slot_|Item_),_->assertfalse(* Cancelling *)(* Cancel [cell] on our suspend queue.
This function works for both consumers and producers, as we can tell from
the value what our role is (and if there isn't a value, we're finished anyway).
Neither party will try to cancel before writing its own value.
Returns [true] if the caller cancelled successfully,
or [false] if it must wait (as it's being resumed). *)letcancelt(segment,cell)=letcancel2update_balance~old=ifAtomic.compare_and_setcelloldIn_transitionthen(matchupdate_balancet.balancewith|Updated->(* At this point, we are committed to cancelling. *)beginmatchAtomic.exchangecellFinishedwith|Finished->assertfalse|In_transition->Q.cancel_cellsegment|Itemrequest->add_to_cellt.consumers(Itemrequest)(Q.next_resumet.consumers)|Slotkc->add_to_cellt.producers(Slotkc)(Q.next_resumet.producers)end;true|Update_refused|Balance_closed->(* We decided not to cancel. We know a resume is coming. *)ifAtomic.compare_and_setcellIn_transitionoldthenfalseelse(matchold,Atomic.getcellwith|Slotkc,Itemrequest|Itemrequest,Slotkc->Atomic.setcellFinished;exchangerequestkc;false|_->assertfalse))elsefalse(* The peer resumed us first *)inmatchAtomic.getcellwith|Finished->false(* The peer resumed us first *)|Slot_asold->cancel2Balance.incr_if_negative~old(* We are a consumer *)|Item_asold->cancel2Balance.decr_if_positive~old(* We are a producer *)|In_transition->(* Either we're initialising the cell, in which case we haven't told the
application how to cancel this location yet, or we're already
cancelling, but cancelling twice isn't permitted. *)assertfalse(* A producer can't cancel if it is resuming on the [consumers] queue, and will instead
just wait for the slot in that case, which will arrive soon. However, after getting
a slot the producer may be rejected and be asked to start again on the [producers] queue,
so we need to remember that we were cancelled to prevent that. It's also possible that
we're already restarting but haven't got around to updating [request.cancel] yet; we'll
notice the new [`Cancelled] state when we do. *)letcancel_putrequestex=matchAtomic.exchangerequest.cancel(`Cancelledex)with|`Cancelled_->failwith"Already cancelled!"|`Resuming->false(* Cancellation fails for now, but we remember we wanted to cancel. *)|`Suspendedcancel->cancel()(* Putting. *)(* Like [add_to_cell], but we haven't created our value yet as we haven't suspended the fiber. *)letrecproducer_resume_cellt~success~in_transitioncell=matchAtomic.get(cell:_Cell.tAtomic.t)with|Item_->assertfalse|In_transition->in_transitioncell|Finished->producer_resume_cellt~success~in_transition(Q.next_resumet.consumers)|Slotkasold->ifAtomic.compare_and_setcelloldFinishedthensuccesskelseproducer_resume_cellt~success~in_transitioncell(* This is essentially the main [put] function, but parameterised so it can be shared with
the rejoin-after-rejection case. *)letproducer_join(t:_t)~success~suspend~closed=matchBalance.fetch_and_addt.balance(+1)with|Error`Closed->closed()|Okold->ifold<0then(letcell=Q.next_resumet.consumersinproducer_resume_celltcell~success~in_transition:(funcell->suspend(Shortcell)))else(suspend(Long(Q.next_suspendt.producers)))letput_closed_err=Invalid_argument"Stream closed"(* Called when a consumer took our value but then rejected it.
We start the put operation again, except that our fiber is already suspended
so no need to do that again. We're probably running in the consumer's domain
(unless the consumer provided their callback while we were cancelling). *)letput_already_suspendedtrequest=producer_joint~success:(exchangerequest)~closed:(fun()->request.kp(Failedput_closed_err))~suspend:(funloc->letShortcell|Long(_,cell)=locinadd_to_cellt.consumers(Itemrequest)cell;letrecaux()=matchAtomic.getrequest.cancel,locwith|(`Suspended_|`Resumingasprev),Longloc->(* We might be suspended for a while. Update the cancel function with the new location. *)letcancel_fn()=canceltlocinifnot(Atomic.compare_and_setrequest.cancelprev(`Suspendedcancel_fn))thenaux()|`Cancelledex,Longloc->(* We got cancelled after the peer removed our cell and before we updated the
cancel function with the new location, or we were cancelled while doing a
(non-cancellable) resume. Deal with it now. *)ifcanceltlocthenrequest.kp(Failedex);(* else we got resumed first *)|_,Short_->(* We can't cancel while in the process of resuming a cell on the [consumers] queue.
We could set [cancel] to [`Resuming] here, but there's no need as trying to use the
old cancel function will find the old cell is cancelled and set [request.cancel]
to [`Cancelled]), as required. *)()inaux())(* We tried to [put] and no slot was immediately available.
Suspend the fiber and use the continuation to finish initialising the cell.
Note that we may be suspending the fiber even when using the "resume" queue,
if the consumer is still in the process of writing its slot. *)letput_suspendtvloc=Suspend.enter_unchecked"Sync.put"@@functxenqueue->letcancel=matchlocwith|Short_->`Resuming(* Can't cancel this *)|Longloc->`Suspended(fun()->canceltloc)inletrecitem={v=Okv;cancel=Atomic.makecancel;kp=function|Failede->enqueue(Errore)|Sent->enqueue(Ok())(* Success! *)|Rejected->put_already_suspendedtitem(* Consumer rejected value. Restart. *)}inletShortcell|Long(_,cell)=locinadd_to_cellt.consumers(Itemitem)cell;(* Set up the cancel handler in either case because we might change queues later: *)matchFiber_context.get_errorctxwith|Someex->ifcancel_putitemexthenenqueue(Errorex);(* else being resumed *)|None->Fiber_context.set_cancel_fnctx(funex->ifcancel_putitemexthenenqueue(Errorex)(* else being resumed *))letrecput(t:_t)v=producer_joint~success:(funkc->ifkc(Okv)then()elseputtv)~suspend:(put_suspendtv)~closed:(fun()->raiseput_closed_err)(* Taking. *)(* Mirror of [producer_resume_cell]. *)letrecconsumer_resume_cellt~success~in_transitioncell=matchAtomic.get(cell:_Cell.tAtomic.t)with|Slot_->assertfalse|In_transition->in_transitioncell|Finished->consumer_resume_cellt~success~in_transition(Q.next_resumet.producers)|Itemreqasold->ifAtomic.compare_and_setcelloldFinishedthensuccessreqelseconsumer_resume_cellt~success~in_transitioncelllettake_suspendtloc=Suspend.enter_unchecked"Sync.take"@@functxenqueue->letShortcell|Long(_,cell)=locinletkcv=enqueue(Okv);trueinadd_to_cellt.producers(Slotkc)cell;matchlocwith|Short_->()|Longloc->matchFiber_context.get_errorctxwith|Someex->ifcanceltlocthenenqueue(Errorex);(* else being resumed *)|None->Fiber_context.set_cancel_fnctx(funex->ifcanceltlocthenenqueue(Errorex)(* else being resumed *))lettake(t:_t)=matchBalance.fetch_and_addt.balance(-1)with|Error`Closedase->e|Okold->ifold>0then(letcell=Q.next_resumet.producersinconsumer_resume_celltcell~success:(funitem->item.kpSent;item.v)~in_transition:(funcell->take_suspendt(Shortcell)))else(take_suspendt(Long(Q.next_suspendt.consumers)))lettaket=(taket:(_,[`Closed])result:>(_,[>`Closed])result)letreject=Slot(fun_->false)lettake_nonblocking(t:_t)=matchBalance.decr_if_positivet.balancewith|Balance_closed->Error`Closed|Update_refused->Error`Would_block(* No waiting producers for us *)|Updated->letrecauxcell=consumer_resume_celltcell~success:(funitem->item.kpSent;(* Always accept the item *)(item.v:>(_,[`Closed|`Would_block])result))~in_transition:(funcell->(* Our producer is still in the process of writing its [Item], but
we're non-blocking and can't wait. We're always acting as the
resumer, so we can't cancel the cell. Instead, we provide a
consumer callback that always rejects.
todo: could spin for a bit here first - the Item will probably arrive soon,
and that would avoid making the producer start again. *)Domain.cpu_relax();(* Brief wait to encourage producer to finish *)ifAtomic.compare_and_setcellIn_transitionrejectthenError`Would_blockelseauxcell)inaux(Q.next_resumet.producers)lettake_nonblockingt=(take_nonblockingt:(_,[`Would_block|`Closed])result:>(_,[>`Would_block|`Closed])result)(* Creation and status. *)letcreate()={consumers=Q.make();producers=Q.make();balance=Balance.make();}letcloset=matchBalance.closet.balancewith|Error`Closed->()|Okold->ifold>0then((* Reject each waiting producer. They will try to restart and then discover the stream is closed. *)for_=1toolddoletcell=Q.next_resumet.producersinadd_to_cellt.consumersrejectcell;done)else(letreject_consumer=Item{v=Error`Closed;kp=ignore;cancel=Atomic.make`Resuming}in(* Reject each waiting consumer. *)for_=1to-olddoletcell=Q.next_resumet.consumersinadd_to_cellt.consumersreject_consumercelldone)letbalancet=Balance.gett.balance