Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file scheduler.ml
openCoreopenLwt.InfixopenBistro_internalsmoduleW=Bistro_internals.Workflowtypeerror=[|`Msgofstring]moduleTable=String.Tabletype'athread='aEval_thread.tletlwt_bothxy=x>>=funx->y>>=funy->Lwt.return(x,y)moduleGc:sigtypetvalcreate:Db.t->(Logger.event->unit)->tvalregister:t->?target:_W.t->_W.t->unitLwt.tvaltag_workflow_as_built:t->_W.t->unitvaluses_singularity_image:t->_W.t->Command.container_image->unitvalstop:t->unitLwt.t(* val fold_deps :
* t ->
* init:'a ->
* f:('a -> Workflow.any -> Workflow.any -> 'a) ->
* 'a *)typestate={deps:(Workflow.any*Workflow.any)list;protected:Workflow.anylist;}valstate:t->stateend=structmoduleElt=structtypet=|Workflow:W.any->t|Singularity_image:Command.container_image->tletworkfloww=Workflow(W.Anyw)letsingularity_imagex=Singularity_imagexletid=function|Workfloww->W.Any.idw|Singularity_imagei->Db.container_image_identifieriletcomparexy=String.compare(idx)(idy)letequalxy=String.equal(idx)(idy)lethashx=Hashtbl.hash(idx)letpathdb=function|Workfloww->Db.cachedb(W.Any.idw)|Singularity_imagei->Db.singularity_imagedbiendmoduleS=Caml.Set.Make(Elt)moduleT=structincludeCaml.Hashtbl.Make(Elt)letupdatet~key~default~f=letdata=matchfindtkeywith|d->d|exceptionCaml.Not_found->defaultinreplacetkey(fdata)letadj_addtuv=updatet~key:u~default:S.empty~f:(S.addv)letadj_findtu=matchfindtuwith|x->x|exceptionCaml.Not_found->S.emptyletincr_counttu=updatet~key:u~default:0~f:succletdecr_counttu=letn=matchfindtuwith|n->n-1|exceptionCaml.Not_found->assertfalseinreplacetun;nendtypemsg=|Built:Elt.t->msg|Stop:msgtypet={db:Db.t;log:Logger.event->unit;depends_on:S.tT.t;is_used_by:S.tT.t;counts:intT.t;mutableprotected:S.t;inbox:msgLwt_queue.t;end_listener:unitLwt.u;_end_:unitLwt.t;}letstopx=Lwt_queue.pushx.inboxStop;x._end_letupdate_counts_and_collectgcx=letn=T.decr_countgc.countsxinifn=0then(ifnot(S.memxgc.protected)then(gc.log(matchxwith|Workflow(W.Anyw)->Logger.Workflow_collectedw|Singularity_imagei->Logger.Singularity_image_collectedi);Misc.remove_if_exists(Elt.pathgc.dbx))elseLwt.return())elseLwt.return()letrecmaingc=Lwt_queue.popgc.inbox>>=function|Builtx->T.adj_findgc.depends_onx|>S.elements|>List.map~f:(update_counts_and_collectgc)|>Lwt.join>>=fun()->maingc|Stop->Lwt.wakeupgc.end_listener();Lwt.return()letcreatedblog=letinbox=Lwt_queue.create()inletcounts=T.create253inlet_end_,end_listener=Lwt.wait()inletgc={db;log;depends_on=T.create253;is_used_by=T.create253;counts;protected=S.empty;inbox;_end_;end_listener;}inLwt.async(fun()->maingc);gclettag_workflow_as_builtgcw=Lwt_queue.pushgc.inbox(Built(Elt.workfloww))letusesgcuv=matchuwith|None->gc.protected<-S.add(Elt.workflowv)gc.protected|Someu->letu=Elt.workflowuandv=Elt.workflowvinT.adj_addgc.depends_onuv;T.adj_addgc.is_used_byvu;T.incr_countgc.countsvletuses_singularity_imagegcuv=letu=Elt.workflowuandv=Elt.singularity_imagevinT.adj_addgc.depends_onuv;T.adj_addgc.is_used_byvu;T.incr_countgc.countsvletrecregister:typeuv.t->?target:uW.t->vW.t->unitLwt.t=fungc?targetw->matchwwith|Pure_->Lwt.return()|Appapp->lwt_both(registergc?targetapp.f)(registergc?targetapp.x)>|=ignore|Bothboth->lwt_both(registergc?targetboth.fst)(registergc?targetboth.snd)>|=ignore|Listl->List.map~f:(register?targetgc)l.elts|>Lwt.join|Eval_pathx->registergc?targetx.workflow|Spawnx->Lwt_list.iter_p(register_anygc?target)x.deps|List_nthl->registergc?targetl.elts|Input_->Lwt.return()|Selectx->registergc?targetx.dir|Plugin{task=Value_pluginv;_}->usesgctargetw;ifstop_registergcwthenLwt.return()elseregistergc~target:wv|Plugin{task=Path_pluginp;_}->usesgctargetw;ifstop_registergcwthenLwt.return()elseregistergc~target:wp|Shells->usesgctargetw;ifstop_registergcwthenLwt.return()elseLwt_list.iter_p(register_anygc~target:w)s.deps|Globg->registergc?targetg.dirandregister_any:typeu.t->?target:uW.t->W.any->unitLwt.t=fungc?target(Workflow.Anyw)->registergc?targetwandstop_register:typeu.t->uW.t->bool=fungcw->letu=Elt.workflowwinT.memgc.depends_onu||Db.is_in_cachegc.db(W.Anyw)letregistergc?targetw=registergc?targetw(* let fold_deps gc ~init ~f =
* T.fold
* (fun u deps acc -> S.fold (fun v acc -> f acc u v) deps acc)
* gc.depends_on
* init *)typestate={deps:(Workflow.any*Workflow.any)list;protected:Workflow.anylist;}letstategc={deps=T.to_seqgc.depends_on|>Seq.flat_map(fun(u,s)->matchuwith|Elt.Workfloww_u->Seq.filter_map(function|Elt.Workfloww_v->Some(w_u,w_v)|Singularity_image_->None)(S.to_seqs)|Singularity_image_->Seq.empty)|>Caml.List.of_seq;protected=S.to_seqgc.protected|>Seq.filter_map(functionElt.Workfloww->Somew|Singularity_image_->None)|>Caml.List.of_seq;}endmoduleMaybe_gc:sigtypet=Gc.toptionvalregister:t->?target:_W.t->_W.t->unitLwt.tvaluses_singularity_image:t->_W.t->Command.container_image->unitvaltag_workflow_as_built:t->_W.t->unitvalstop:t->unitLwt.tend=structtypet=Gc.toptionletregistero?targetw=matchowith|Somegc->Gc.registergc?targetw|None->Lwt.return()letuses_singularity_imageows=matchowith|Somegc->Gc.uses_singularity_imagegcws|None->()lettag_workflow_as_builtow=matchowith|Somegc->Gc.tag_workflow_as_builtgcw|None->()letstop=function|Somegc->Gc.stopgc|None->Lwt.return()endmoduleSynchro:sigtype'atvalcreate:unit->'atvalsignal:'at->'a->unitvalwait:'at->'aLwt.tend=structtype'at=('aLwt.t*'aLwt.u)letcreate()=Lwt.wait()letsignal(_,u)x=Lwt.wakeupuxletwait=fstendmoduletypeBackend=sigopenBistro_internalstypettypetokenvalrun_shell_command:t->token->Shell_command.t->(int*bool)Lwt.tvaleval:t->token->('a->unit)->'a->(unit,string)Lwt_result.tvalbuild_trace:t->_Workflow.t->Allocator.request->(token->Allocator.resource->Task_result.tEval_thread.t)->Execution_trace.tEval_thread.tvalstop:t->unitLwt.tendmoduleMake(Backend:Backend)=structtypet={start:unitSynchro.t;_end_:unitSynchro.t;mutableclosed:bool;db:Db.t;logger:Logger.t;allowed_containers:[`Docker|`Singularity]list;traces:Execution_trace.tthreadTable.t;gc:Gc.toption;backend:Backend.t;allocator:Allocator.t;}letcreate?(allowed_containers=[`Docker])?(loggers=[])?(collect=false)backenddb=letallocator=Allocator.create~np:1~mem:0inletlogger=Logger.teeloggersin{start=Synchro.create();_end_=Synchro.create();closed=false;db;allowed_containers;traces=String.Table.create();logger;gc=ifcollectthenletgc_logevent=logger#eventdb(Unix.gettimeofday())eventinSome(Gc.createdbgc_log)elseNone;backend;allocator;}letgc_statesched=Option.map~f:Gc.statesched.gc(* let log ?(time = Unix.gettimeofday ()) sched event =
* sched.logger#event sched.db time event *)letperform_input~path~id=letpass=Sys.file_existspath=`Yesin(* (
* if pass then Misc.cp path (Db.cache sched.db id)
* else Lwt.return ()
* ) >>= fun () -> *)Eval_thread.return(Task_result.Input{id;pass;path})letperform_select~db~id~dir~sel=letp=Filename.concat(Db.pathdbdir)(Path.to_stringsel)inletpass=Sys.file_existsp=`YesinEval_thread.return(Task_result.Select{id;pass;dir_path=Db.pathdbdir;sel;})letstep_outcome~exit_code~dest_exists=matchexit_code,dest_existswith0,true->`Succeeded|0,false->`Missing_output|_->`Failedletperform_shell{backend;allowed_containers;db;_}token(Allocator.Resource{np;mem})~id~descrcmd=letenv=Execution_env.make~allowed_containers~db~np~mem~idinletcmd=Shell_command.makeenvcmdinBackend.run_shell_commandbackendtokencmd>>=fun(exit_code,dest_exists)->letcache_dest=Db.cachedbidinletoutcome=step_outcome~exit_code~dest_existsinMisc.(ifoutcome=`Succeededthenmvenv.destcache_dest>>=fun()->remove_if_existsenv.tmp_direlseLwt.return())>>=fun()->Eval_thread.return(Task_result.Shell{outcome;id;descr;exit_code;cmd=Shell_command.textcmd;file_dumps=Shell_command.file_dumpscmd;cache=ifoutcome=`SucceededthenSomecache_destelseNone;stdout=env.stdout;stderr=env.stderr;})letrecblocking_evaluator:types.Db.t->sWorkflow.t->(unit->s)=fundbw->matchwwith|Workflow.Pure{value;_}->fun()->value|Workflow.App{f;x;_}->letf=blocking_evaluatordbfinletx=blocking_evaluatordbxinfun()->(f())(x())|Workflow.Both{fst;snd;_}->letfst=blocking_evaluatordbfstinletsnd=blocking_evaluatordbsndinfun()->(fst(),snd())|Workflow.Eval_pathx->letf=blocking_evaluatordbx.workflowinfun()->Db.pathdb(f())|Workflow.Selects->letdir=blocking_evaluatordbs.dirinfun()->Workflow.cd(dir())s.sel|Workflow.Input{path;_}->fun()->Workflow.FS_path(Misc.absolutizepath)|Workflow.Plugin{id;task=Value_plugin_;_}->fun()->(Misc.load_value(Db.cachedbid))|Workflow.Plugin{id;task=Path_plugin_;_}->fun()->Workflow.Cache_idid|Workflow.Spawns->letelts=blocking_evaluatordbs.eltsinfun()->letelts=elts()inList.init(List.lengthelts)~f:(funi->blocking_evaluatordb(s.f(Workflow.list_nths.eltsi))())|Workflow.Shells->fun()->Workflow.Cache_ids.id|Workflow.Listl->letl=List.mapl.elts~f:(blocking_evaluatordb)infun()->List.mapl~f:(funf->f())|Workflow.List_nthl->letelts=blocking_evaluatordbl.eltsinfun()->letelts=elts()inList.nth_exneltsl.index|Workflow.Glob{dir;type_selection;pattern;id=_}->letdir=blocking_evaluatordbdirin(* FIXME: maybe cache this function? *)fun()->letdir_path=dir()inmatchMisc.glob~type_selection~pattern(Db.pathdbdir_path)with|Error(`Msgs)->failwithf"glob error: %s"s()|Okxs->List.mapxs~f:(funfn->Workflow.FS_pathfn)letperform_plugin{backend;db;_}token(Allocator.Resource_)~id~descrworkflow=letevaluator=blocking_evaluatordbworkflowinBackend.evalbackendtoken(fun()->lety=evaluator()()inMisc.save_value~data:y(Db.cachedbid))()>|=function|Ok()->Ok(Task_result.Plugin{id;outcome=`Succeeded;msg=None;descr})|Errormsg->Ok(Task_result.Plugin{id;outcome=`Failed;msg=Somemsg;descr})letperform_path_plugin{db;backend;_}token(Allocator.Resource{mem;np})~id~descrworkflow=letevaluator=blocking_evaluatordbworkflowinletenv=Execution_env.make~allowed_containers:[]~db~np~mem~idinletcache_dest=Db.cachedbidinMisc.remove_if_existsenv.tmp_dir>>=fun()->Unix.mkdir_penv.tmp;Backend.evalbackendtoken(Fn.flipevaluatorenv.dest)()>>=function|Ok()->letoutcome=ifSys.file_existsenv.dest=`Yesthen`Succeededelse`Missing_outputinMisc.(ifoutcome=`Succeededthenmvenv.destcache_dest>>=fun()->remove_if_existsenv.tmp_direlseLwt.return())>>=fun()->Lwt_result.return(Task_result.Plugin{id;outcome;msg=None;descr})|Errormsg->Lwt_result.return(Task_result.Plugin{id;outcome=`Failed;msg=Somemsg;descr})letrecshallow_eval:types.t->sW.t->sLwt.t=funschedw->matchwwith|W.Pure{value;_}->Lwt.returnvalue|W.App{f;x;_}->lwt_both(shallow_evalschedf)(shallow_evalschedx)>>=fun(f,x)->lety=fxinLwt.returny|W.Both{fst;snd;_}->lwt_both(shallow_evalschedfst)(shallow_evalschedsnd)>>=fun(fst,snd)->Lwt.return(fst,snd)|W.Eval_pathw->shallow_evalschedw.workflow>|=Db.pathsched.db|W.Selects->shallow_evalscheds.dir>>=fundir->Lwt.return(W.cddirs.sel)|W.Input{path;_}->Lwt.return(W.FS_path(Misc.absolutizepath))|W.Plugin{id;task=Value_plugin_;_}->Lwt.return(Misc.load_value(Db.cachesched.dbid))(* FIXME: blocking call *)|W.Spawns->(* FIXME: much room for improvement *)shallow_evalscheds.elts>>=funelts->lettargets=List.init(List.lengthelts)~f:(funi->s.f(W.list_nths.eltsi))inLwt_list.map_p(shallow_evalsched)targets|W.Plugin{id;task=Path_plugin_;_}->Lwt.return(W.Cache_idid)|W.Shells->Lwt.return(W.Cache_ids.id)|W.Listl->Lwt_list.map_p(shallow_evalsched)l.elts|W.List_nthl->shallow_evalschedl.elts>>=funelts->Lwt.return(List.nth_exneltsl.index)|W.Glob{dir;type_selection;pattern;id=_}->shallow_evalscheddir>>=funp->Db.pathsched.dbp|>fundir_path->matchMisc.glob~type_selection~patterndir_pathwith|Error(`Msgs)->Lwt.fail(Failure(sprintf"glob error: %s"s))|Okxs->Lwt.return@@List.mapxs~f:(funfn->W.FS_pathfn)andshallow_eval_commandsched=letlistxs=Lwt_list.map_p(shallow_eval_commandsched)xsinletopenCommandinfunction|Simple_commandcmd->shallow_eval_templateschedcmd>|=funcmd->Simple_commandcmd|And_listxs->listxs>|=funxs->And_listxs|Or_listxs->listxs>|=funxs->Or_listxs|Pipe_listxs->listxs>|=funxs->Pipe_listxs|Within_container(env,cmd)->shallow_eval_commandschedcmd>|=funcmd->Within_container(env,cmd)andshallow_eval_templateschedtoks=Lwt_list.map_p(shallow_eval_tokensched)toksandshallow_eval_tokensched=letopenTemplateinfunction|D(Workflow.Path_tokenw)->shallow_evalschedw>|=funp->D(Execution_env.Pathp)|D(Workflow.Path_list_token{elts;quote;sep})->shallow_evalschedelts>|=funelts->D(Execution_env.Path_list{elts;quote;sep})|D(Workflow.String_tokenw)->shallow_evalschedw>|=funp->D(Execution_env.Stringp)|Ff->shallow_eval_templateschedf>|=funt->Ft|DEST|TMP|NP|MEM|S_astok->Lwt.returntokletregister_buildsched~id~build_trace=letopenEval_thread.Infixin(matchTable.findsched.tracesidwith|None->lettrace=build_trace()inTable.setsched.traces~key:id~data:trace;trace|Sometrace->trace)>>=funtrace->ifExecution_trace.is_erroredtracethenEval_thread.fail1traceelseLwt_result.returntraceletnp_requirement:types.sWorkflow.t->int=function|Pure_->0|App_->0|Spawn_->0|Both_->0|Eval_path_->0|Input_->0|Select_->0|List_->0|List_nth_->0|Glob_->0|Pluginx->x.np|Shellx->x.npletopt_mem_requirementsched=function|None->Lwt.return100|Somemem->shallow_evalschedmemletmem_requirement:typeu.t->uWorkflow.t->intLwt.t=funsched->function|Pure_->Lwt.return0|App_->Lwt.return0|Spawn_->Lwt.return0|Both_->Lwt.return0|Eval_path_->Lwt.return0|Input_->Lwt.return0|Select_->Lwt.return0|List_->Lwt.return0|List_nth_->Lwt.return0|Glob_->Lwt.return0|Pluginx->opt_mem_requirementschedx.mem|Shellx->opt_mem_requirementschedx.memletbuild_traceschedwperform=mem_requirementschedw>>=funmem->letrequirement=Allocator.Request{np=np_requirementw;mem}inBackend.build_tracesched.backendwrequirementperformletcached_buildsched~id~f=ifSys.file_exists(Db.cachesched.dbid)=`YesthenEval_thread.return(Execution_trace.Done_already{id})elsef()letsignal_trace_to_gcschedwt=ifnot(Execution_trace.is_erroredt)then(Maybe_gc.tag_workflow_as_builtsched.gcw)letschedule_cached_workflowsched~idw~deps~perform=letopenEval_thread.Infixinregister_buildsched~id~build_trace:(fun()->cached_buildsched~id~f:(fun()->deps()>>=fun()->build_traceschedwperform>|=funtrace_or_error->(matchtrace_or_errorwith|Oktrace->signal_trace_to_gcschedwtrace|Error_->());trace_or_error))|>Eval_thread.ignoreletschedule_container_image_fetchschedimg=letid=Db.container_image_identifierimginletready=Unix.gettimeofday()inregister_buildsched~id~build_trace:(fun()->letdest=Db.singularity_imagesched.dbimginifSys.file_existsdest=`YesthenEval_thread.return(Execution_trace.Done_already{id})else(letreq=Allocator.Request{np=1;mem=0}inAllocator.requestsched.allocatorreq>>=function|Okresource->letstart=Unix.gettimeofday()in(* log ~time:start sched (Logger.Workflow_started (w, resource)) ; *)Singularity.fetch_imageimgdest>>=funoutcome->let_end_=Unix.gettimeofday()inAllocator.releasesched.allocatorresource;Eval_thread.return@@Execution_trace.Run{ready;start;_end_;outcome=Task_result.Container_image_fetch{id;outcome};}|Error_->assertfalse(* should never happen, we're asking so little here! *)))|>Eval_thread.ignoreletschedule_shell_container_image_fetchschedwcmd=letimages=Execution_env.images_for_singularitysched.allowed_containerscmdinList.iterimages~f:(Maybe_gc.uses_singularity_imagesched.gcw);Eval_thread.joinimages~f:(schedule_container_image_fetchsched)letrecbuild:typeuv.t->?target:vW.t->uW.t->unitthread=funsched?targetw->letopenEval_thread.Infixinmatchwwith|W.Pure_->Eval_thread.return()|W.App{x;f;_}->Eval_thread.both(buildsched?targetx)(buildsched?targetf)>>|ignore|W.Both{fst;snd;_}->Eval_thread.both(buildsched?targetfst)(buildsched?targetsnd)>>|ignore|W.Eval_path{workflow;_}->buildsched?targetworkflow|List_nthl->buildsched?targetl.elts|Globg->buildsched?targetg.dir|W.Spawn{elts;f;_}->buildsched?targetelts>>=fun()->shallow_evalschedelts>>funelts_value->letn=List.lengthelts_valueinlettargets=List.initn~f:(funi->f(W.list_ntheltsi))inLwt_list.iter_p(Maybe_gc.register?targetsched.gc)targets>>fun()->Eval_thread.join~f:(build?targetsched)targets|W.Input{id;path;_}->register_buildsched~id~build_trace:(fun()->build_traceschedw(fun__->perform_input~id~path))|>Eval_thread.ignore|W.Select{id;dir;sel;_}->buildsched?targetdir>>=fun()->shallow_evalscheddir>>fundir->register_buildsched~id~build_trace:(fun()->build_traceschedw(fun__->perform_select~db:sched.db~id~dir~sel))|>Eval_thread.ignore|W.Plugin{task=Value_pluginworkflow;id;descr;_}->schedule_cached_workflowsched~idw~deps:(fun()->buildsched~target:wworkflow)~perform:(funtokenresource->perform_pluginschedtokenresource~id~descrworkflow)|W.Plugin{id;task=Path_pluginworkflow;descr;_}->schedule_cached_workflowsched~idw~deps:(fun()->buildsched~target:wworkflow)~perform:(funtokenresource->perform_path_pluginschedtokenresource~id~descrworkflow)|W.Shell{id;task;descr;deps;_}->schedule_cached_workflowsched~idw~deps:Eval_thread.(fun()->join2(schedule_shell_container_image_fetchschedwtask)(joindeps~f:(fun(W.Anyx)->buildsched~target:wx)))~perform:(funtokenresource->shallow_eval_commandschedtask>>funcmd->perform_shellschedtokenresource~id~descrcmd>>=funr->Eval_thread.returnr)|Listl->Eval_thread.joinl.elts~f:(build?targetsched)letstartsched=Synchro.signalsched.start()letevalschedtarget=ifsched.closedthenfailwith"Scheduler is closed";lettarget=Bistro.Private.revealtargetinSynchro.waitsched.start>>=fun()->Maybe_gc.registersched.gctarget>>=fun()->buildschedtarget>>=(funr->Maybe_gc.stopsched.gc>|=fun()->r)(* FIXME: is this the right moment?
what if eval is called several times? *)|>Fn.flipLwt_result.bindLwt.(fun()->shallow_evalschedtarget>|=Result.return)|>Lwt_result.map_errExecution_trace.Set.elementsleterror_report{db;_}traces=letbuf=Buffer.create1024inList.itertraces~f:(funtrace->Execution_trace.error_reporttracedbbuf);Buffer.contentsbufleteval_exnschedw=evalschedw>|=function|Okr->r|Errorerrors->failwith(error_reportschederrors)letstopsched=Maybe_gc.stopsched.gc>>=fun()->sched.logger#stop>>=fun()->Backend.stopsched.backendendincludeMake(Local_backend)letcreate?np?mem?allowed_containers?loggers?collectdb=letbackend=Local_backend.create?np?mem?loggersdbincreate?allowed_containers?loggers?collectbackenddbletsimple_eval_exn?np?mem?allowed_containers?loggers?collect?(db_path="_bistro")w=letdb=Db.init_exndb_pathinletsched=create?np?mem?allowed_containers?loggers?collectdbinletthread=eval_exnschedwinstartsched;Lwt_main.runthread