Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file low_level_process.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481openCoremoduleSys=Caml.Sysletrectemp_failure_retryf=tryf()withUnix.Unix_error(EINTR,_,_)->temp_failure_retryfletclose_non_intrfd=temp_failure_retry(fun()->Unix.closefd)(* Creates a unix pipe with both sides set close on exec *)letcloexec_pipe()=let(fd1,fd2)asres=Unix.pipe()inUnix.set_close_on_execfd1;Unix.set_close_on_execfd2;resmoduleProcess_info=structtypet={pid:Pid.t;stdin:Unix.File_descr.t;stdout:Unix.File_descr.t;stderr:Unix.File_descr.t;}end(* We use a slightly more powerful version of create process than the one in
core. This version is not quite as carefuly code reviewed but allows us to
have more control over the forked side of the process (e.g.: chdir).
*)letinternal_create_process?working_dir?setuid?setgid~env~prog~args()=letclose_on_err=ref[]intrylet(in_read,in_write)=cloexec_pipe()inclose_on_err:=in_read::in_write::!close_on_err;let(out_read,out_write)=cloexec_pipe()inclose_on_err:=out_read::out_write::!close_on_err;let(err_read,err_write)=cloexec_pipe()inclose_on_err:=err_read::err_write::!close_on_err;letpid=Unix_extended.fork_execprogargs?working_dir?setuid?setgid~env~stdin:in_read~stdout:out_write~stderr:err_writeinclose_non_intrin_read;close_non_introut_write;close_non_intrerr_write;{Process_info.pid=pid;stdin=in_write;stdout=out_read;stderr=err_read}withe->List.iter~f:(funfd->tryclose_non_intrfdwith_->())!close_on_err;raisee(**
Remembers the last n-characters appended to it....
*)moduleTail_buffer=struct(** remembers the output in a circular buffer.
looped is used to tell whether we loop around the
boundary of the buffer.
*)typet={buffer:Bytes.t;length:int;mutablelooped:bool;mutableposition:int;}letcontentsb=ifnotb.loopedthenBytes.To_string.subb.buffer~pos:0~len:b.positionelseletdst=Bytes.create(b.length+3)inBytes.setdst0'.';Bytes.setdst1'.';Bytes.setdst2'.';Bytes.blit~src:b.buffer~dst~dst_pos:3~src_pos:b.position~len:(b.length-b.position);Bytes.blit~src:b.buffer~dst~dst_pos:(b.length-b.position+3)~src_pos:0~len:(b.position);Bytes.unsafe_to_string~no_mutation_while_string_reachable:dstletcreatelen={buffer=Bytes.createlen;length=len;looped=false;position=0}letaddbsrclen=ifb.length<=lenthenbeginBytes.blit~src~dst:b.buffer~dst_pos:0~src_pos:(len-b.length)~len:(b.length);b.looped<-true;b.position<-0endelseletleftover=b.length-b.positioninif(len<leftover)thenbeginBytes.blit~src~dst:b.buffer~dst_pos:b.position~src_pos:0~len;b.position<-b.position+len;endelsebeginBytes.blit~src~dst:b.buffer~dst_pos:b.position~src_pos:0~len:leftover;b.looped<-true;letlen=(len-leftover)inBytes.blit~src~dst:b.buffer~dst_pos:0~src_pos:leftover~len;b.position<-lenendendmoduleStatus=structtypet=[`TimeoutofTime.Span.t|`Exitedofint|`SignaledofSignal.t(* WStopped is impossible*)][@@derivingsexp_of]letto_string=function|`Exitedi->sprintf"exited with code %d"i|`Signaleds->sprintf!"died after receiving %{Signal} (signal number %d)"s(Signal.to_system_ints)|`Timeouts->sprintf!"Timed out (ran for %{Time.Span})"sendmoduleCommand_result=structtypet={status:Status.t;stdout_tail:string;stderr_tail:string}endletwaitpid_nohangpid=matchUnix.wait_nohang(`Pidpid)with|None->None|Some(v,res)->assertPid.(v=pid);Someres(** wait for a given pid to exit;
returns true when the process exits and false if the process is still runing
after waiting for [span]
*)letwait_for_exit~is_childspanpid=letend_time=Time.add(Time.now())spaninletexited()=ifis_childthenbeginmatchwaitpid_nohangpidwith|None->true|Some_->falseendelse(* This is the equivalent of calling the C kill with 0 (test whether a process
exists) *)matchSignal.send(Signal.of_system_int0)(`Pidpid)with|`Ok->true|`No_such_process->falseinletrecloop()=ifTime.(>)(Time.now())end_timethenfalse(*We need to explicitely waitpid the child otherwise we are sending
signals to a zombie*)elseifnot(exited())thentrueelsebeginTime.pause(sec0.1);loop()endinloop()letkill?(is_child=false)?(wait_for=sec2.0)?(signal=Signal.term)pid=Signal.send_exnsignal(`Pidpid);ifnot(wait_for_exit~is_childwait_forpid)thenbeginbeginmatchSignal.sendSignal.kill(`Pidpid)with|`No_such_process->ifis_childthenfailwith"Process.kill got `No_such_process even though the process was a \
child we never waited for"|`Ok->()end;ifnot(wait_for_exit~is_childwait_forpid)thenbeginfailwithf"Process.kill failed to kill %i%s"(Pid.to_intpid)(ifis_childthen""else" (or the process wasn't collected by its parent)")()endendtypet={mutableopen_fds:Unix.File_descr.tlist;mutablein_fds:Unix.File_descr.tlist;mutableout_fds:Unix.File_descr.tlist;keep_open:bool;buf:Bytes.t;in_cnt:String.t;in_len:int;out_callbacks:(Unix.File_descr.t*(Bytes.t->int->unit))list;pid:Pid.t;mutablein_pos:int;}letclose_pooledstatefd=ifList.memstate.open_fdsfd~equal:Unix.File_descr.equalthenclose_non_intrfd;state.open_fds<-List.filter~f:((<>)fd)state.open_fds;state.out_fds<-List.filter~f:((<>)fd)state.out_fds;state.in_fds<-List.filter~f:((<>)fd)state.in_fdsletprocess_io~read~writestate=List.iterwrite~f:(funfd->(tryletlen=temp_failure_retry(fun()->Unix.single_write_substringfd~buf:state.in_cnt~pos:state.in_pos~len:(state.in_len-state.in_pos))instate.in_pos<-state.in_pos+len;(* Close the process's in_channel iff we are done writing to it*)iflen=0thenifstate.keep_openthenstate.in_fds<-List.filter~f:((<>)fd)state.in_fdselseclose_pooledstatefdwithUnix.Unix_error(EPIPE,_,_)->close_pooledstatefd));List.iterread~f:(funfd->letlen=temp_failure_retry(fun()->Unix.readfd~buf:state.buf~pos:0~len:(Bytes.lengthstate.buf))iniflen=0thenclose_pooledstatefdelseletcallback=List.Assoc.find_exn~equal:Unix.File_descr.equalstate.out_callbacksfdincallbackstate.buflen)letavailable_fds=letuse_selectstate~timeout=let{Unix.Select_fds.read;write;_;}=temp_failure_retry(fun()->Unix.select~read:state.out_fds~write:state.in_fds~except:[]~timeout())inread,writeinletuse_epollepoll_create=funstate~timeout->letmoduleEpoll=Linux_ext.Epollinlettimeout=matchtimeoutwith|(`Immediately|`Never)astimeout->timeout|`Afterspan->`Afterspaninletepoll_t=letfds=List.map~f:Unix.File_descr.to_int(state.in_fds@state.out_fds)inletmax_ready_events=List.lengthfdsinletnum_file_descrs=1+List.fold~init:max_ready_events~f:Int.maxfdsinepoll_create~num_file_descrs~max_ready_eventsinList.iterstate.in_fds~f:(funfd->Epoll.setepoll_tfdEpoll.Flags.out);List.iterstate.out_fds~f:(funfd->Epoll.setepoll_tfdEpoll.Flags.in_);letread,write=matchtemp_failure_retry(fun()->Epoll.waitepoll_t~timeout)with|`Timeout->([],[])|`Ok->Epoll.fold_readyepoll_t~init:([],[])~f:(fun(read,write)fdflags->lettake_matching_flagsaccfdflags~wanted=ifEpoll.Flags.do_intersectwantedflagsthenfd::accelseaccinletread=take_matching_flagsreadfdflags~wanted:Epoll.Flags.in_inletwrite=take_matching_flagswritefdflags~wanted:Epoll.Flags.outin(read,write))inEpoll.closeepoll_t;(read,write)inmatchLinux_ext.Epoll.createwith|Error_->use_select|Okepoll_create->use_epollepoll_create;;letcreate~keep_open~use_extra_path~working_dir~setuid~setgid~prog~args~stdoutf~stderrf~input_string~env=letfull_prog=Shell_internal.path_expand?use_extra_pathproginletprocess_info=internal_create_process?working_dir?setuid?setgid~env~prog:full_prog~args()inletout_fd=process_info.Process_info.stdoutandin_fd=process_info.Process_info.stdinanderr_fd=process_info.Process_info.stderrandpid=process_info.Process_info.pidin{keep_open;open_fds=[in_fd;out_fd;err_fd];in_fds=[in_fd];out_fds=[err_fd;out_fd];buf=Bytes.create4096;in_cnt=input_string;in_pos=0;in_len=String.lengthinput_string;out_callbacks=[out_fd,stdoutf;err_fd,stderrf];pid}letrecfinish_readingstate=matchavailable_fdsstate~timeout:`Immediatelywith|[],_->()|read,_->process_iostate~read~write:[];finish_readingstateletrecrun_loop~start_time~timeoutstate=letread,write=available_fdsstate~timeout:(`After(Time_ns.Span.of_sec0.1))inbegintryprocess_iostate~read~writewithe->kill~is_child:truestate.pid;raiseeend;letelapsed=Time.diff(Time.now())start_timeinmatchtimeoutwith|SometimeoutwhenTime.Span.(elapsed>timeout)->kill~is_child:truestate.pid;finish_readingstate;`Timeoutelapsed|None|Some_->matchwaitpid_nohangstate.pidwith|None->run_loop~start_time~timeoutstate|Somestatus->finish_readingstate;matchstatuswith|Ok()->`Exited0|Error(`Exit_non_zeroi)->`Exitedi|Error(`Signals)->`Signaledsletrun?timeout?use_extra_path?working_dir?setuid?setgid?(env=`Extend[])?input:(input_string="")?(keep_open=false)?(stdoutf=(fun_string_len->()))?(stderrf=(fun_string_len->()))?(tail_len=2048)~prog~args()=letstdout_tail=Tail_buffer.createtail_lenandstderr_tail=Tail_buffer.createtail_leninletstdoutfsbuflen=stdoutfsbuflen;Tail_buffer.addstdout_tailsbuflenandstderrfsbuflen=stderrfsbuflen;Tail_buffer.addstderr_tailsbufleninletstatus=protectx(Sys.signalSys.sigpipeSys.Signal_ignore,create~keep_open~use_extra_path~working_dir~setuid~setgid~stderrf~stdoutf~prog~args~env~input_string)~f:(fun(_old_sigpipe,state)->run_loopstate~start_time:(Time.now())~timeout;)~finally:(fun(old_sigpipe,state)->List.iterstate.open_fds~f:close_non_intr;ignore(Sys.signalSys.sigpipeold_sigpipe:Sys.signal_behavior))in{Command_result.status=status;stdout_tail=Tail_buffer.contentsstdout_tail;stderr_tail=Tail_buffer.contentsstderr_tail}(* Externally export this *)letkill?is_child?wait_for?(signal=Signal.term)pid=kill?is_child?wait_for~signalpidlet%test_module_=(modulestructletwith_fdsn~f=letrestore_max_fds=letmoduleRLimit=Core.Unix.RLimitinletmax_fds=RLimit.getRLimit.num_file_descriptorsinmatchmax_fds.RLimit.curwith|RLimit.Infinity->None|RLimit.LimitlimitwhenInt64.(of_intInt.(2*n)<limit)->None|RLimit.Limit_->RLimit.setRLimit.num_file_descriptors{max_fdswithRLimit.cur=RLimit.Limit(Int64.of_int(2*n))};Somemax_fdsinletfds=List.initn~f:(fun_->Unix.openfile~mode:[Unix.O_RDONLY]"/dev/null")inletretval=Or_error.try_withfinList.iterfds~f:(funfd->Unix.closefd);Option.iterrestore_max_fds~f:(funmax_fds->letmoduleRLimit=Core.Unix.RLimitinRLimit.setRLimit.num_file_descriptorsmax_fds);Or_error.ok_exnretvalletrun_process()=ignore(run~prog:"true"~args:[]())let%test_unit_=with_fds10~f:run_processlet%test_unit_=with_fds1055~f:(fun()->[%test_eq:bool](Result.is_okLinux_ext.Epoll.create)(Result.is_ok(Result.try_withrun_process)))end)