Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file bistro_multinode.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392openCore_kernelopenBistro_engineopenLwt.Infixtypejob=|Pluginof{workflow_id:string;f:unit->unit;}|Shell_commandof{workflow_id:string;cmd:Shell_command.t;}typeclient_id=Client_idofstringtype_api_request=|Subscript:{np:int;mem:int}->client_idapi_request|Get_job:{client_id:string}->joboptionapi_request|Plugin_result:{client_id:string;workflow_id:string;result:(unit,string)Result.t;}->unitapi_request|Shell_command_result:{client_id:string;workflow_id:string;result:int*bool;}->unitapi_requestmoduleClient=structtypet={np:int;mem:int;hostname:string;port:int;}letwith_connection{hostname;port;_}~f=Lwt_io.with_connectionUnix.(ADDR_INET(inet_addr_of_stringhostname,port))fletsend_requestx(msg:'aapi_request):'aLwt.t=with_connectionx~f:(fun(ic,oc)->Lwt_io.write_valueocmsg>>=fun()->Lwt_io.flushoc>>=fun()->Lwt_io.read_valueic)letmain~np~mem~hostname~port()=letmem=mem*1024inletclient={np;mem;hostname;port}inletstop_var=Lwt_mvar.create_empty()insend_requestclient(Subscript{np;mem})>>=fun(Client_idclient_id)->letjob_thread=function|Plugin{workflow_id;f}->Local_backend.eval()()f()>>=funresult->send_requestclient(Plugin_result{client_id;workflow_id;result})|Shell_command{workflow_id;cmd}->Shell_command.runcmd>>=funresult->send_requestclient(Shell_command_result{client_id;workflow_id;result})inletrecloop()=Lwt.pick[(send_requestclient(Get_job{client_id})>|=funx->`New_jobx);Lwt_mvar.takestop_var>|=fun()->`Stop]>>=function|`New_jobNone|`Stop->Lwt.return()|`New_job(Somejob)->Lwt.async(fun()->job_threadjob);loop()inloop()letcommand=letopenCommand.Let_syntaxinCommand.basic~summary:"Bistro client"[%map_openletnp=flag"--np"(requiredint)~doc:"INT Number of available cores"andmem=flag"--mem"(requiredint)~doc:"INT Available memory (in GB)"andhostname=flag"--hostname"(requiredstring)~doc:"ADDR Bistro server address"andport=flag"--port"(requiredint)~doc:"INT Bistro server port"infun()->main~np~mem~hostname~port()|>Lwt_main.run]endmoduleServer=structmoduleBackend=structtypejob_waiter=|Waiting_shell_commandof{workflow_id:string;cmd:Shell_command.t;waiter:(int*bool)Lwt.u;}|Waiting_pluginof{workflow_id:string;f:unit->unit;waiter:(unit,string)resultLwt.u;}typeworker=Workerof{id:string;np:int;mem:int;mutableavailable_resource:Allocator.resource;pending_jobs:job_waiterLwt_queue.t;running_jobs:job_waiterString.Table.t;}moduleWorker_allocator=structtypet={mutableavailable:Allocator.resourceString.Table.t;mutablewaiters:((int*int)*(string*Allocator.resource)Lwt.u)list;}letcreate()={available=String.Table.create();waiters=[];}letsearch(types)(table:sString.Table.t)~f=letmoduleM=structexceptionFoundofstring*sendintryString.Table.foldtable~init:()~f:(fun~key~data()->iff~key~datathenraise(M.Found(key,data)));NonewithM.Found(k,v)->Some(k,v)letallocation_passpool=letremaining_waiters=List.filter_mappool.waiters~f:(fun((np,mem),uaselt)->letallocation_attempt=searchpool.available~f:(fun~key:_~data:(Resourcecurr)->curr.np>=np&&curr.mem>=mem)inmatchallocation_attemptwith|None->Someelt|Some(worker_id,(Resourcecurr))->String.Table.setpool.available~key:worker_id~data:(Resource{np=curr.np-np;mem=curr.mem-mem});Lwt.wakeupu(worker_id,Resource{np;mem});None)inpool.waiters<-remaining_waitersletrequestpool(Allocator.Request{np;mem})=lett,u=Lwt.wait()inletwaiters=((np,mem),u)::pool.waiters|>List.sort~compare:(fun(x,_)(y,_)->compareyx)inpool.waiters<-waiters;allocation_passpool;tletadd_workerpool(Worker{id;np;mem;_})=matchString.Table.addpool.available~key:id~data:(Allocator.Resource{np;mem})with|`Ok->allocation_passpool|`Duplicate->failwith"A worker has been added twice"letreleasepoolworker_id(Allocator.Resource{np;mem})=String.Table.updatepool.availableworker_id~f:(function|None->failwith"Tried to release resources of inexistent worker"|Some(Resourcer)->Resource{np=r.np+np;mem=r.mem+mem})endtypetoken={worker_id:string;workflow_id:string;}typestate={workers:workerString.Table.t;alloc:Worker_allocator.t;}typeevent=[|`Stop|`New_worker]typet={server:Lwt_io.server;state:state;events:eventLwt_react.event;send_event:event->unit;stop_signal:unitLwt_condition.t;server_stop:unitLwt.t;logger:Logger.t;db:Db.t;}letnew_id=letc=ref0infun()->incrc;sprintf"w%d"!cletworkflow_id_of_job_waiter=function|Waiting_pluginwp->wp.workflow_id|Waiting_shell_commandwsc->wsc.workflow_idletjob_of_job_waiter=function|Waiting_plugin{f;workflow_id;_}->Plugin{f;workflow_id}|Waiting_shell_command{cmd;workflow_id;_}->Shell_command{cmd;workflow_id}letcreate_worker~np~memid=Worker{id;np;mem;available_resource=Allocator.Resource{np;mem};pending_jobs=Lwt_queue.create();running_jobs=String.Table.create();}letcreate_state()={workers=String.Table.create();alloc=Worker_allocator.create();}letserver_api:types.(Logger.event->unit)->stop_signal:unitLwt_condition.t->state->sapi_request->sLwt.t=funlog~stop_signalstatemsg->matchmsgwith|Subscript{np;mem}->letid=new_id()inletw=create_worker~np~memidinString.Table.setstate.workers~key:id~data:w;Worker_allocator.add_workerstate.allocw;log(Logger.Debug(sprintf"new worker %s"id));Lwt.return(Client_idid)|Get_job{client_id}->(matchString.Table.findstate.workersclient_idwith|None->Lwt.returnNone|Some(Workerworker)->Lwt.choose[(Lwt_queue.popworker.pending_jobs>|=funx->`Jobx);(Lwt_condition.waitstop_signal>|=fun()->`Stop);]>>=function|`Jobwp->letworkflow_id=workflow_id_of_job_waiterwpinString.Table.setworker.running_jobs~key:workflow_id~data:wp;Lwt.return(Some(job_of_job_waiterwp))|`Stop->Lwt.returnNone)|Plugin_resultr->letWorkerworker=String.Table.find_exnstate.workersr.client_idinLwt.return(matchString.Table.find_exnworker.running_jobsr.workflow_idwith|Waiting_pluginwp->Lwt.wakeupwp.waiterr.result|Waiting_shell_command_->assertfalse(* should never happen *))|Shell_command_resultr->letWorkerworker=String.Table.find_exnstate.workersr.client_idinLwt.return(matchString.Table.find_exnworker.running_jobsr.workflow_idwith|Waiting_plugin_->assertfalse(* should never happen *)|Waiting_shell_commandwp->Lwt.wakeupwp.waiterr.result)letserver_handlerlog~stop_signalstate_(ic,oc)=Lwt_io.read_valueic>>=funmsg->server_apilog~stop_signalstatemsg>>=funres->Lwt_io.write_valueocres~flags:[Closures]>>=fun()->Lwt_io.flushoc>>=fun()->Lwt_io.closeic>>=fun()->Lwt_io.closeocletcreate?(loggers=[])~portdb=Lwt_unix.gethostname()>>=funhostname->Lwt_unix.gethostbynamehostname>>=funh->letsockaddr=Unix.ADDR_INET(h.Unix.h_addr_list.(0),port)inletstate=create_state()inletlogger=Logger.teeloggersinletlogevent=logger#eventdb(Unix.gettimeofday())eventinletstop_signal=Lwt_condition.create()inLwt_io.establish_server_with_client_addresssockaddr(server_handlerlog~stop_signalstate)>>=funserver->letevents,send_event=Lwt_react.E.create()inletserver_stop=Lwt_condition.waitstop_signal>>=fun()->Lwt_io.shutdown_serverserverinLwt.return{events;send_event;stop_signal;server_stop;server;state;logger=Logger.teeloggers;db;}letlog?(time=Unix.gettimeofday())backendevent=backend.logger#eventbackend.dbtimeeventletrequest_resourcebackendreq=Worker_allocator.requestbackend.state.allocreq>|=fun(worker_id,resource)->String.Table.find_exnbackend.state.workersworker_id,resourceletrelease_resourcebackendworker_idres=Worker_allocator.releasebackend.state.allocworker_idresletbuild_tracebackendwrequirementperform=letready=Unix.gettimeofday()inlog~time:readybackend(Logger.Workflow_readyw);request_resourcebackendrequirement>>=fun(Workerworker,resource)->letopenEval_thread.Infixinletstart=Unix.gettimeofday()inlog~time:startbackend(Logger.Workflow_started(w,resource));lettoken={worker_id=worker.id;workflow_id=Bistro_internals.Workflow.idw}inperformtokenresource>>=funoutcome->let_end_=Unix.gettimeofday()inlog~time:_end_backend(Logger.Workflow_ended{outcome;start;_end_});release_resourcebackendworker.idresource;Eval_thread.return(Execution_trace.Run{ready;start;_end_;outcome})(* | Error `Resource_unavailable ->
* let msg = "No worker with enough resource" in
* log backend (Logger.Workflow_allocation_error (w, msg)) ;
* wait_for_new_worker backend >>= fun () ->
* loop () *)letevalbackend{worker_id;workflow_id}fx=letWorkerworker=String.Table.find_exnbackend.state.workersworker_idinletf()=fxinlett,u=Lwt.wait()inletjob_waiter=Waiting_plugin{waiter=u;f;workflow_id}inLwt_queue.pushworker.pending_jobsjob_waiter;tletrun_shell_commandbackend{worker_id;workflow_id}cmd=letWorkerworker=String.Table.find_exnbackend.state.workersworker_idinlett,u=Lwt.wait()inletjob=Waiting_shell_command{waiter=u;cmd;workflow_id}inLwt_queue.pushworker.pending_jobsjob;tletstopbackend=Lwt_condition.broadcastbackend.stop_signal();Lwt.return()endmoduleScheduler=Scheduler.Make(Backend)typet=Scheduler.tletcreate?allowed_containers?loggers?collect?(port=6666)db=Backend.create?loggers~portdb>|=funbackend->Scheduler.create?allowed_containers?loggers?collectbackenddbletstartsched=Scheduler.startschedletstopsched=Scheduler.stopschedletevalschedw=Scheduler.evalschedwletsimple_app?allowed_containers?loggers?collect?port?(db="_bistro")w=lett=create?allowed_containers?loggers?collect?port(Db.init_exndb)>>=funserver->startserver;evalserverw>|=(function|Ok_->()|Errore->print_endline@@Scheduler.error_reportservere)>>=fun()->stopserverinLwt_main.runtletsimple_command~summaryw=letopenCommand.Let_syntaxinCommand.basic~summary[%map_openletport=flag"--port"(requiredint)~doc:"INT Port"andverbose=flag"--verbose"no_arg~doc:" Display more info"inletloggers=ifverbosethen[Bistro_utils.Console_logger.create()]else[]infun()->simple_app~port~loggersw]end