Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file worker.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432(**
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the "hack" directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*)openHack_core(*****************************************************************************
* Module building workers
*
* A worker is a subprocess executing an arbitrary function
*
* You should first create a fixed amount of workers and then use those
* because the amount of workers is limited and to make the load-balancing
* of tasks better (cf multiWorker.ml)
*
* On Unix, we "spawn" workers when initializing Hack. Then, this
* worker, "fork" a slave for each incoming request. The forked "slave"
* will die after processing a single request.
*
* On Windows, we do not "prespawn" when initializing Hack, but we just
* allocate all the required information into a record. Then, we
* spawn a slave for each incoming request. It will also die after
* one request.
*
* A worker never handle more than one request at a time.
*
*****************************************************************************)exceptionWorker_exited_abnormallyofint*Unix.process_statusexceptionWorker_oomedexceptionWorker_busyexceptionWorker_killedtypesend_job_failure=|Worker_already_exitedofUnix.process_status|Other_send_job_failureofexnexceptionWorker_failed_to_send_jobofsend_job_failure(* Should we 'prespawn' the worker ? *)letuse_prespawned=notSys.win32(* The maximum amount of workers *)letmax_workers=1000(*****************************************************************************
* The job executed by the worker.
*
* The 'serializer' is the job continuation: it is a function that must
* be called at the end of the request ir order to send back the result
* to the master (this is "internal business", this is not visible outside
* this module). The slave will provide the expected function.
* cf 'send_result' in 'slave_main'.
*
*****************************************************************************)typerequest=Requestof(serializer->unit)andserializer={send:'a.'a->unit}andvoid(* an empty type *)typecall_wrapper={wrap:'x'b.('x->'b)->'x->'b}(*****************************************************************************
* Everything we need to know about a worker.
*
*****************************************************************************)typet={id:int;(* Simple id for the worker. This is not the worker pid: on
Windows, we spawn a new worker for each job. *)(* The call wrapper will wrap any workload sent to the worker (via "call"
* below) before invoking the workload.
*
* That is, when calling the worker with workload `f x`, it will be wrapped
* as `wrap (f x)`.
*
* This allows universal handling of workload at the time we create the actual
* workers. For example, this can be useful to handle exceptions uniformly
* across workers regardless what workload is called on them. *)call_wrapper:call_wrapper;(* Sanity check: is the worker still available ? *)mutablekilled:bool;(* Sanity check: is the worker currently busy ? *)mutablebusy:bool;(* On Unix, a reference to the 'prespawned' worker. *)prespawned:(void,request)Daemon.handleoption;(* On Windows, a function to spawn a slave. *)spawn:unit->(void,request)Daemon.handle;}(*****************************************************************************
* The handle is what we get back when we start a job. It's a "future"
* (sometimes called a "promise"). The scheduler uses the handle to retrieve
* the result of the job when the task is done (cf multiWorker.ml).
*
*****************************************************************************)type'ahandle='adelayedrefand'adelayed=|Processingof'aslave|Cachedof'a|Failedofexnand'aslave={worker:t;(* The associated worker *)slave_pid:int;(* The actual slave pid *)(* The file descriptor we might pass to select in order to
wait for the slave to finish its job. *)infd:Unix.file_descr;(* A blocking function that returns the job result. *)result:unit->'a;}(*****************************************************************************
* Entry point for spawned worker.
*
*****************************************************************************)letslave_mainicoc=letstart_user_time=ref0.0inletstart_system_time=ref0.0inletsend_resultdata=lettm=Unix.times()inletend_user_time=tm.Unix.tms_utime+.tm.Unix.tms_cutimeinletend_system_time=tm.Unix.tms_stime+.tm.Unix.tms_cstimeinMeasure.sample"worker_user_time"(end_user_time-.!start_user_time);Measure.sample"worker_system_time"(end_system_time-.!start_system_time);letstats=Measure.serialize(Measure.pop_global())inlets=Marshal.to_string(data,stats)[Marshal.Closures]inletlen=String.lengthsiniflen>10*1024*1024(* 10 MB *)thenbeginHh_logger.log"WARNING: you are sending quite a lot of data (%d bytes), \
which may have an adverse performance impact. If you are sending \
closures, double-check to ensure that they have not captured large \
values in their environment."len;Printf.eprintf"%s"(Printexc.raw_backtrace_to_string(Printexc.get_callstack100));end;Daemon.output_stringocs;Daemon.flushocintryMeasure.push_global();letRequestdo_process=Daemon.from_channelicinlettm=Unix.times()instart_user_time:=tm.Unix.tms_utime+.tm.Unix.tms_cutime;start_system_time:=tm.Unix.tms_stime+.tm.Unix.tms_cstime;do_process{send=send_result};exit0with|End_of_file->exit1|SharedMem.Out_of_shared_memory->Exit_status.(exitOut_of_shared_memory)|SharedMem.Hash_table_full->Exit_status.(exitHash_table_full)|SharedMem.Heap_full->Exit_status.(exitHeap_full)|SharedMem.Sql_assertion_failureerr_num->letexit_code=matcherr_numwith|11->Exit_status.Sql_corrupt|14->Exit_status.Sql_cantopen|21->Exit_status.Sql_misuse|_->Exit_status.Sql_assertion_failureinExit_status.exitexit_code|e->leterror_backtrace=Printexc.get_backtrace()inleterror_str=Printexc.to_stringeinPrintf.printf"Exception: %s\n"error_str;EventLogger.log_if_initialized(fun()->EventLogger.worker_exceptionerror_str);Printf.printf"Potential backtrace:\n%s"error_backtrace;exit2letwin32_worker_mainrestorestate(ic,oc)=restorestate;slave_mainicocletunix_worker_mainrestorestate(ic,oc)=restorestate;letin_fd=Daemon.descr_of_in_channelicinif!Utils.profilethenUtils.log:=prerr_endline;trywhiletruedo(* Wait for an incoming job : is there something to read?
But we don't read it yet. It will be read by the forked slave. *)letreadyl,_,_=Unix.select[in_fd][][](-1.0)inifreadyl=[]thenexit0;(* We fork a slave for every incoming request.
And let it die after one request. This is the quickest GC. *)matchFork.fork()with|0->slave_mainicoc|pid->(* Wait for the slave termination... *)matchsnd(Unix.waitpid[]pid)with|Unix.WEXITED0->()|Unix.WEXITED1->raiseEnd_of_file|Unix.WEXITEDcode->Printf.printf"Worker exited (code: %d)\n"code;flushstdout;Pervasives.exitcode|Unix.WSIGNALEDx->letsig_str=PrintSignal.string_of_signalxinPrintf.printf"Worker interrupted with signal: %s\n"sig_str;exit2|Unix.WSTOPPEDx->Printf.printf"Worker stopped with signal: %d\n"x;exit3done;assertfalsewithEnd_of_file->exit0type'aentry_state='a*Gc.control*SharedMem.handletype'aentry=('aentry_state,request,void)Daemon.entryletentry_counter=ref0letregister_entry_point~restore=increntry_counter;letrestore(st,gc_control,heap_handle)=restorest;SharedMem.connectheap_handle;Gc.setgc_controlinletname=Printf.sprintf"slave_%d"!entry_counterinDaemon.register_entry_pointname(ifSys.win32thenwin32_worker_mainrestoreelseunix_worker_mainrestore)(**************************************************************************
* Creates a pool of workers.
*
**************************************************************************)letworkers=ref[]letcurrent_worker_id=ref0(* Build one worker. *)letmake_onespawnid=ifid>=max_workersthenfailwith"Too many workers";letprespawned=ifnotuse_prespawnedthenNoneelseSome(spawn())inletwrapfinput=current_worker_id:=id;finputinletworker={call_wrapper={wrap};id;busy=false;killed=false;prespawned;spawn}inworkers:=worker::!workers;worker(** Make a few workers. When workload is given to a worker (via "call" below),
* the workload is wrapped in the call_wrapper. *)letmake~saved_state~entry~nbr_procs~gc_control~heap_handle=letspawn_log_fd=Unix.clear_close_on_execheap_handle.SharedMem.h_fd;lethandle=Daemon.spawn(Daemon.null_fd(),Unix.stdout,Unix.stderr)entry(saved_state,gc_control,heap_handle)inUnix.set_close_on_execheap_handle.SharedMem.h_fd;handleinletmade_workers=ref[]inforn=1tonbr_procsdomade_workers:=make_onespawnn::!made_workersdone;!made_workersletcurrent_worker_id()=!current_worker_id(**************************************************************************
* Send a job to a worker
*
**************************************************************************)letcallw(typea)(typeb)(f:a->b)(x:a):bhandle=ifw.killedthenraiseWorker_killed;ifw.busythenraiseWorker_busy;(* Spawn the slave, if not prespawned. *)let{Daemon.pid=slave_pid;channels=(inc,outc)}ash=matchw.prespawnedwith|None->w.spawn()|Somehandle->handlein(* Prepare ourself to read answer from the slave. *)letresult():b=matchUnix.waitpid[Unix.WNOHANG]slave_pidwith|0,_|_,Unix.WEXITED0->letres:b*Measure.record_data=Daemon.input_valueincinifw.prespawned=NonethenDaemon.closeh;Measure.merge~from:(Measure.deserialize(sndres))();fstres|_,Unix.WEXITEDiwheni=Exit_status.(exit_codeOut_of_shared_memory)->raiseSharedMem.Out_of_shared_memory|_,exit_status->raise(Worker_exited_abnormally(slave_pid,exit_status))in(* Mark the worker as busy. *)letinfd=Daemon.descr_of_in_channelincinletslave={result;slave_pid;infd;worker=w;}inw.busy<-true;letrequest=let{wrap}=w.call_wrapperinRequest(fun{send}->send(wrapfx))in(* Send the job to the slave. *)let()=tryDaemon.to_channeloutc~flush:true~flags:[Marshal.Closures]requestwith|e->beginmatchUnix.waitpid[Unix.WNOHANG]slave_pidwith|0,_->raise(Worker_failed_to_send_job(Other_send_job_failuree))|_,status->raise(Worker_failed_to_send_job(Worker_already_exitedstatus))endin(* And returned the 'handle'. *)ref(Processingslave)(**************************************************************************
* Read results from a handle.
* This might block if the worker hasn't finished yet.
*
**************************************************************************)letis_oom_failuremsg=(String_utils.string_starts_withmsg"Subprocess")&&(String_utils.is_substring"signaled -7"msg)letget_resultd=match!dwith|Cachedx->x|Failedexn->raiseexn|Processings->tryletres=s.result()ins.worker.busy<-false;d:=Cachedres;reswith|Failure(msg)whenis_oom_failuremsg->raiseWorker_oomed|exn->s.worker.busy<-false;d:=Failedexn;raiseexn(*****************************************************************************
* Our polling primitive on workers
* Given a list of handle, returns the ones that are ready.
*
*****************************************************************************)type'aselected={readys:'ahandlelist;waiters:'ahandlelist;}letget_processingds=List.rev_filter_mapds~f:(fund->match!dwithProcessingp->Somep|_->None)letselectds=letprocessing=get_processingdsinletfds=List.map~f:(fun{infd;_}->infd)processinginletready_fds,_,_=iffds=[]||List.lengthprocessing<>List.lengthdsthen[],[],[]elseUnix.selectfds[][]~-.1.inList.fold_right~f:(fund{readys;waiters}->match!dwith|Cached_|Failed_->{readys=d::readys;waiters}|ProcessingswhenList.memready_fdss.infd->{readys=d::readys;waiters}|Processing_->{readys;waiters=d::waiters})~init:{readys=[];waiters=[]}dsletget_workerh=match!hwith|Processing{worker;_}->worker|Cached_|Failed_->invalid_arg"Worker.get_worker"(**************************************************************************
* Worker termination
**************************************************************************)letkillw=ifnotw.killedthenbeginw.killed<-true;matchw.prespawnedwith|None->()|Somehandle->Daemon.killhandleendletkillall()=List.iter~f:kill!workers