Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file async_bus.ml
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364open!Core_kernelopen!Async_kernelopen!Importopen!Busletpipe1_exn(t:('a->unit,_)t)here=letr,w=Pipe.create()inletsubscription=subscribe_exnthere~f:(function|v->Pipe.write_without_pushback_if_openwv)~on_close:(fun()->Pipe.closew)inupon(Pipe.closedw)(fun()->unsubscribetsubscription);r;;moduleFirst_arity=structtype(_,_,_)t=|Arity1:('a->unit,'a->'roption,'r)t|Arity2:('a->'b->unit,'a->'b->'roption,'r)t|Arity3:('a->'b->'c->unit,'a->'b->'c->'roption,'r)t|Arity4:('a->'b->'c->'d->unit,'a->'b->'c->'d->'roption,'r)t[@@derivingsexp_of]endletfirst_exn(typecfr)?stopthere(first_arity:(c,f,r)First_arity.t)~(f:f)=letmoduleA=First_arityinDeferred.create(funivar->letsubscriber:cBus.Subscriber.toptionref=refNoneinletfinish:roption->unit=function|None->()|Somer->Ivar.fillivarr;Bus.unsubscribet(Option.value_exn!subscriber)in(* We define [can_finish] separately from [finish] because we must call [can_finish]
before we call [f], so that we do not call [f] if [stop] is determined. *)letcan_finish=matchstopwith|None->fun()->true|Somestop->uponstop(fun()->Bus.unsubscribet(Option.value_exn!subscriber));fun()->not(Deferred.is_determinedstop)inletcallback:c=matchfirst_aritywith|Arity1->funa->ifcan_finish()thenfinish(fa)|Arity2->funa1a2->ifcan_finish()thenfinish(fa1a2)|Arity3->funa1a2a3->ifcan_finish()thenfinish(fa1a2a3)|Arity4->funa1a2a3a4->ifcan_finish()thenfinish(fa1a2a3a4)insubscriber:=Some(Bus.subscribe_exnthere~on_callback_raise:(letmonitor=Monitor.current()infunerror->Monitor.send_exnmonitor(Error.to_exnerror))~f:callback));;