Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file monitor.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476openCore_kernelopenImportopenDeferred_stdmoduleDeferred=Deferred1moduleScheduler=Scheduler1moduleStream=Tail.StreammoduleMonitor=Monitor0includeMonitortypemonitor=t[@@derivingsexp_of]letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~name:ignore~here:ignore~id:ignore~parent:ignore~next_error:(check(funnext_error->assert(Ivar.is_emptynext_error)))~handlers_for_all_errors:ignore~tails_for_all_errors:ignore~has_seen_error:ignore~is_detached:ignore);;letcurrent_execution_context()=Scheduler.(current_execution_context(t()))letcurrent()=Execution_context.monitor(current_execution_context())letdeptht=letreclooptn=matcht.parentwith|None->n|Somet->loopt(n+1)inloopt0;;type'awith_optional_monitor_name=?here:Source_code_position.t->?info:Info.t->?name:string->'aletdetacht=t.is_detached<-truetypehandler_state=|Uninitialized|Runningof(Execution_context.t*(exn->unit))Bag.Elt.t|Terminatedletdetach_and_iter_errorst~f=detacht;letscheduler=Scheduler.t()inletexecution_context=Scheduler.current_execution_contextschedulerinlethandler_state_ref=refUninitializedinletrun_fexn=match!handler_state_refwith|Uninitialized->assertfalse|Terminated->()|Runningbag_elt->(tryfexnwith|inner_exn->handler_state_ref:=Terminated;Bag.removet.handlers_for_all_errorsbag_elt;(* [run_f] always runs in [execution_context]. Hence, [raise inner_exn] sends
[inner_exn] to [execution_context]'s monitor, i.e. the monitor in effect when
[detach_and_iter_errors] was called. *)raiseinner_exn)inhandler_state_ref:=Running(Bag.addt.handlers_for_all_errors(execution_context,run_f));;letdetach_and_get_error_streamt=detacht;lettail=Tail.create()int.tails_for_all_errors<-tail::t.tails_for_all_errors;Tail.collecttail;;letget_next_errort=Ivar.readt.next_errorletdetach_and_get_next_errort=detacht;get_next_errort;;letcreate?here?info?name()=letparent=current()increate_with_parent?here?info?name(Someparent);;moduleExn_for_monitor=structtypet={exn:exn;backtrace:Backtrace.toption;backtrace_history:Backtrace.tlist;monitor:Monitor.t}letbacktrace_truncation_heuristics=letjob_queue="Called from file \"job_queue.ml\""inletdeferred0="Called from file \"deferred0.ml\""inletdeferred1="Called from file \"deferred1.ml\""inletmonitor="Called from file \"monitor.ml\""inletimport0="Raised at file \"import0.ml\""inleterror="Called from file \"error.ml\""infuntraces->(* ../test/test_try_with_error_display.ml makes sure this stays up-to-date. *)lettraces=matchtraceswith|t1::restwhenString.is_prefixt1~prefix:import0->(matchrestwith|t2::restwhenString.is_prefixt2~prefix:error->(matchrestwith|t3::restwhenString.is_prefixt3~prefix:error->rest|_->rest)|_->rest)|_->tracesinmatchList.revtraceswith|t1::restwhenString.is_prefixt1~prefix:job_queue->(matchrestwith|t2::restwhenString.is_prefixt2~prefix:job_queue->(matchrestwith|t2::restwhenString.is_prefixt2~prefix:deferred0(* bind *)||String.is_prefixt2~prefix:deferred1(* map *)||String.is_prefixt2~prefix:monitor(* try_with *)->List.revrest|_->List.revrest)|_->List.revrest)|_->traces;;letsexp_of_t{exn;backtrace;backtrace_history;monitor}=letmonitor=letname=matchInfo.to_string_hummonitor.namewith|""->None|s->Somesinletpos=matchmonitor.herewith|None->None|Somehere->(* We display the full filename, whereas backtraces only have basenames, but
perhaps that's what should change. *)letcolumn=here.pos_cnum-here.pos_bolinSome(sprintf"file %S, line %d, characters %d-%d"here.pos_fnamehere.pos_lnumcolumncolumn)inmatchpos,namewith|None,None->[]|Somepos,None->[sprintf"Caught by monitor at %s"pos]|None,Somename->[sprintf"Caught by monitor %s"name]|Somepos,Somename->[sprintf"Caught by monitor %s at %s"namepos]inletbacktrace=letbacktrace=matchbacktracewith|None->[]|Somebacktrace->Backtrace.to_string_listbacktraceinbacktrace_truncation_heuristicsbacktrace@monitorinletlist_if_not_empty=function|[]->None|_::_asl->Somelin[%sexp(exn:exn),(list_if_not_emptybacktrace:stringlistsexp_option),`backtrace_history(list_if_not_emptybacktrace_history:Backtrace.tlistsexp_option)];;endexceptionError_ofExn_for_monitor.tlet()=Sexplib.Conv.Exn_converter.add[%extension_constructorError_](function|Error_t->[%sexp"monitor.ml.Error"::(t:Exn_for_monitor.t)]|_->(* Reaching this branch indicates a bug in sexplib. *)assertfalse);;letextract_exnexn=matchexnwith|Error_error->error.exn|exn->exn;;letsend_exnt?backtraceexn=letexn=matchexnwith|Error__->exn|_->letbacktrace=matchbacktracewith|None->None|Some`Get->Some(Backtrace.Exn.most_recent())|Some(`Thisb)->Somebinletbacktrace_history=(current_execution_context()).backtrace_historyinError_{Exn_for_monitor.exn;backtrace;backtrace_history;monitor=t}inifDebug.monitor_send_exnthenDebug.log"Monitor.send_exn"(t,exn)[%sexp_of:t*exn];t.has_seen_error<-true;letscheduler=Scheduler.t()inletrecloopt=Ivar.fillt.next_errorexn;t.next_error<-Ivar.create();ift.is_detachedthen(ifDebug.monitor_send_exnthenDebug.log"Monitor.send_exn found listening monitor"(t,exn)[%sexp_of:t*exn];Bag.itert.handlers_for_all_errors~f:(fun(execution_context,f)->Scheduler.enqueueschedulerexecution_contextfexn);List.itert.tails_for_all_errors~f:(funtail->Tail.extendtailexn))else(matcht.parentwith|Somet'->loopt'|None->(* Do not change this branch to print the exception or to exit. Having the
scheduler raise an uncaught exception is the necessary behavior for programs
that call [Scheduler.go] and want to handle it. *)Scheduler.(got_uncaught_exn(t()))exn(!Async_kernel_config.task_id()))inloopt;;moduleExported_for_scheduler=structletwithin_contextcontextf=Scheduler.(with_execution_context(t()))context~f:(fun()->matchResult.try_withfwith|Okx->Okx|Errorexn->send_exn(Execution_context.monitorcontext)exn~backtrace:`Get;Error());;type'awith_options=?monitor:t->?priority:Priority.t->'aletwithin_gen?monitor?priorityf=lettmp_context=Execution_context.create_like(current_execution_context())?monitor?priorityinwithin_contexttmp_contextf;;letwithin'?monitor?priorityf=matchwithin_gen?monitor?priorityfwith|Error()->Deferred.never()|Okd->d;;letwithin_v?monitor?priorityf=matchwithin_gen?monitor?priorityfwith|Error()->None|Okx->Somex;;letwithin?monitor?priorityf=matchwithin_gen?monitor?priorityfwith|Error()->()|Ok()->();;letschedule_with_data?monitor?priorityworkx=letscheduler=Scheduler.t()inScheduler.enqueuescheduler(Execution_context.create_like(Scheduler.current_execution_contextscheduler)?monitor?priority)workx;;letschedule?monitor?prioritywork=schedule_with_data?monitor?prioritywork()letschedule'=(* For performance, we use [schedule_with_data] with a closed function, and inline
[Deferred.create]. *)letupon_work_fill_i(work,i)=upon(work())(funa->Ivar.fillia)infun?monitor?prioritywork->leti=Ivar.create()inschedule_with_data?monitor?priorityupon_work_fill_i(work,i);Ivar.readi;;letpreserve_execution_contextf=letscheduler=Scheduler.t()inletexecution_context=Scheduler.current_execution_contextschedulerinstage(funa->Scheduler.enqueueschedulerexecution_contextfa);;letpreserve_execution_context'f=letscheduler=Scheduler.t()inletexecution_context=Scheduler.current_execution_contextschedulerinletcall_and_fill(f,a,i)=upon(fa)(funr->Ivar.fillir)instage(funa->Deferred.create(funi->Scheduler.enqueueschedulerexecution_contextcall_and_fill(f,a,i)));;endopenExported_for_schedulerletstream_iterstream~f=letrecloopstream=Stream.nextstream>>>function|Nil->()|Cons(v,stream)->loopstream;fvinloopstream;;(* An ['a Ok_and_exns.t] represents the output of a computation running in a detached
monitor. *)moduleOk_and_exns=structtype'at={ok:'aDeferred.t;exns:exnStream.t}[@@derivingfields,sexp_of]letcreate?here?info?name~runf=(* We call [create_with_parent None] because [monitor] does not need a parent. It
does not because we call [detach_and_get_error_stream monitor] and deal with the
errors explicitly, thus [send_exn] would never propagate an exn past [monitor]. *)letmonitor=create_with_parent?here?info?nameNoneinletexns=detach_and_get_error_streammonitorinletok=matchrunwith|`Now->within'~monitorf|`Schedule->schedule'~monitorfin{ok;exns};;endletfill_result_and_handle_background_errorsresult_fillerresultexnshandle_exns_after_result=ifIvar_filler.is_emptyresult_fillerthen(Ivar_filler.fillresult_fillerresult;handle_exns_after_resultexns);;moduleExpert=structlettry_with_log_exn:(exn->unit)ref=ref(funexn->raise_s[%message"failed to set [Monitor.Expert.try_with_log_exn]"(exn:Exn.t)]);;endletmake_handle_exnrest=matchrestwith|`Log->(* We are careful to not close over current context, which is not needed. *)!Expert.try_with_log_exn|`Raise->letparent=current()infunexn->send_exnparentexn?backtrace:None|`Callf->letparent=current()infunexn->within~monitor:parent(fun()->fexn);;lettry_with?here?info?(name="")?extract_exn:(do_extract_exn=false)?(run=`Schedule)?(rest=`Log)f=let{Ok_and_exns.ok;exns}=Ok_and_exns.create?here?info~name~runfinlethandle_exn=make_handle_exnrestinlethandle_exns_after_resultexns=stream_iterexns~f:handle_exnin(* We run [within' ~monitor:main] to avoid holding on to references to the evaluation
context in which [try_with] was called. This avoids a space leak when a chain of
[try_with]'s are run each nested within the previous one. Without the [within'], the
error handling for the innermost [try_with] would keep alive the entire chain. *)within'~monitor:main(fun()->ifDeferred.is_determinedokthen(handle_exns_after_resultexns;return(Ok(Deferred.value_exnok)))else(letresult_filler,result=Ivar_filler.create()inuponok(funres->fill_result_and_handle_background_errorsresult_filler(Okres)exnshandle_exns_after_result);upon(Stream.nextexns)(function|Nil->assertfalse|Cons(exn,exns)->letexn=ifdo_extract_exnthenextract_exnexnelseexninfill_result_and_handle_background_errorsresult_filler(Errorexn)exnshandle_exns_after_result);result));;lettry_with_or_error?here?info?(name="try_with_or_error")?extract_exnf=try_withf?here?info~name?extract_exn~run:`Now~rest:`Log>>|Or_error.of_exn_result;;lettry_with_join_or_error?here?info?(name="try_with_join_or_error")?extract_exnf=try_with_or_errorf?here?info~name?extract_exn>>|Or_error.join;;letprotect?here?info?(name="Monitor.protect")?extract_exn?runf~finally=let%bindr=try_with?extract_exn?here?info?run~namefinlet%mapfr=try_with~extract_exn:false?here?info~name:"finally"finallyinmatchr,frwith|Errorexn,Errorfinally_exn->raise_s[%message"Async finally"(exn:exn)(finally_exn:exn)]|Errore,Ok()|Ok_,Errore->raisee|Okr,Ok()->r;;lethandle_errors?here?info?namefhandler=let{Ok_and_exns.ok;exns}=Ok_and_exns.create?here?info?name~run:`Nowfinstream_iterexns~f:handler;ok;;letcatch_stream?here?info?namef=let{Ok_and_exns.exns;_}=Ok_and_exns.create?here?info?name~run:`Now(fun()->f();return())inexns;;letcatch?here?info?namef=match%mapStream.next(catch_stream?here?info?namef)with|Cons(x,_)->x|Nil->raise_s[%message"Monitor.catch got unexpected empty stream"];;letcatch_error?here?info?namef=catch?here?info?namef>>|Error.of_exn