Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file picos_io.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542openPicosmoduleSelect=Picos_io_selectmoduleFd=Picos_io_fdmoduleHtbl=Picos_aux_htblletnonblock_fds=Htbl.create~hashed_type:(moduleFd.Resource)()moduleUnix=structincludeUnixtypefile_descr=Fd.tletis_oobflag=MSG_OOB==flagletis_nonblockflag=O_NONBLOCK==flag(* The retry wrappers below are written to avoid closure allocations. *)(* [EAGAIN] (and [EWOULDBLOCK]) indicates that the operation would have
blocked and so we await using the [select] thread for the file descriptor.
[EINTR] indicates that the operation was interrupted by a signal, which
usually shouldn't happen with non-blocking operations. We don't want to
block, so we do the same thing as with [EAGAIN]. *)let[@inline]intr_reqfd=ifSys.win32||Htbl.memnonblock_fds(Fd.unsafe_getfd)thenSelect.Intr.nothingelseSelect.Intr.req~seconds:0.000_01(* 10μs - TODO *)letrecagain_0fdfnop=letintr=intr_reqfdinmatchfn(Fd.unsafe_getfd)with|result->Select.Intr.clrintr;result|exceptionUnix.Unix_error((EAGAIN|EINTR|EWOULDBLOCK),_,_)->Select.Intr.clrintr;again_0(Select.await_onfdop)fnop|exceptionexn->Select.Intr.clrintr;raiseexnletrecagain_cloexec_0?cloexecfdfnop=letintr=intr_reqfdinmatchfn?cloexec(Fd.unsafe_getfd)with|result->Select.Intr.clrintr;result|exceptionUnix.Unix_error((EAGAIN|EINTR|EWOULDBLOCK),_,_)->Select.Intr.clrintr;again_cloexec_0?cloexec(Select.await_onfdop)fnop|exceptionexn->Select.Intr.clrintr;raiseexnletrecagain_3fdx1x2x3fnop=letintr=intr_reqfdinmatchfn(Fd.unsafe_getfd)x1x2x3with|result->Select.Intr.clrintr;result|exceptionUnix.Unix_error((EAGAIN|EINTR|EWOULDBLOCK),_,_)->Select.Intr.clrintr;again_3(Select.await_onfdop)x1x2x3fnop|exceptionexn->Select.Intr.clrintr;raiseexnletrecagain_4fdx1x2x3x4fnop=letintr=intr_reqfdinmatchfn(Fd.unsafe_getfd)x1x2x3x4with|result->Select.Intr.clrintr;result|exceptionUnix.Unix_error((EAGAIN|EINTR|EWOULDBLOCK),_,_)->Select.Intr.clrintr;again_4(Select.await_onfdop)x1x2x3x4fnop|exceptionexn->Select.Intr.clrintr;raiseexnletrecagain_5fdx1x2x3x4x5fnop=letintr=intr_reqfdinmatchfn(Fd.unsafe_getfd)x1x2x3x4x5with|result->Select.Intr.clrintr;result|exceptionUnix.Unix_error((EAGAIN|EINTR|EWOULDBLOCK),_,_)->Select.Intr.clrintr;again_5(Select.await_onfdop)x1x2x3x4x5fnop|exceptionexn->Select.Intr.clrintr;raiseexn(* [EINPROGRESS] indicates that a socket operation is being performed
asynchronously. We await using the [select] thread for the operation to
complete and then get the error from the socket. *)letprogress_1fdx1fnopname=letintr=intr_reqfdinmatch fn(Fd.unsafe_getfd)x1with|()->Select.Intr.clrintr|exceptionUnix.Unix_error((EAGAIN|EINPROGRESS|EINTR|EWOULDBLOCK),_,_)->begin(* The documentation of [bind] and [connect] does not mention [EAGAIN]
(or [EWOULDBLOCK]), but on Windows we do seem to get those errors
from [connect].
The documentation of [bind] does not mention [EINTR]. Matching on
that shouldn't cause issues with [bind].
For [connect] both [EINPROGRESS] and [EINTR] mean that connection
will be established asynchronously and we use [select] to wait. *)Select.Intr.clrintr;letfd=Select.await_onfdopinmatchUnix.getsockopt_error(Fd.unsafe_getfd)with|None->()|Someerror->raise(Unix.Unix_error(error,name,""))end|exceptionexn->Select.Intr.clrintr;raiseexnletstdin=Fd.create~dispose:falseUnix.stdinandstdout=Fd.create~dispose:false Unix.stdoutandstderr=Fd.create~dispose:falseUnix.stderr(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/open.html *)letopenfilepathflagsfile_perm=letfd=Fd.create(Unix.openfilepathflagsfile_perm)inifList.existsis_nonblockflagsthenbeginletif_not_added_fd_has_been_closed_outside=Htbl.try_addnonblock_fds (Fd.unsafe_getfd)()inassertif_not_added_fd_has_been_closed_outsideend;fd(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html *)letclosefd=let_:bool=Htbl.try_removenonblock_fds(Fd.unsafe_getfd)inFd.decr ~close:truefdletclose_pair(fd1,fd2)=closefd1;closefd2(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fsync.html *)letfsyncfd=again_0fdUnix.fsync`W(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html *)letreadfd bytesposlen=again_3fdbytesposlenUnix.read`R(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html *)letwritefdbytesposlen=again_3fdbytesposlenUnix.write`Wletsingle_writefdbytesposlen=again_3fdbytesposlenUnix.single_write`Wletwrite_substringfdstringposlen=again_3fdstringposlenUnix.write_substring`Wletsingle_write_substringfdstringposlen=again_3fdstringposlenUnix.single_write_substring`W(* *)(*let in_channel_of_descr _ = failwith "TODO"*)(*let out_channel_of_descr _ = failwith "TODO"*)(*let descr_of_in_channel _ = failwith "TODO"*)(*let descr_of_out_channel _ = failwith "TODO"*)(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/lseek.html *)letlseekfdamountseek_command=Unix.lseek(Fd.unsafe_getfd)amountseek_command(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/ftruncate.html *)letftruncatefdsize=Unix.ftruncate(Fd.unsafe_getfd)size(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fstat.html *)letfstatfd=Unix.fstat(Fd.unsafe_getfd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/isatty.html *)letisattyfd=Unix.isatty(Fd.unsafe_getfd)(* *)moduleLargeFile=structincludeUnix.LargeFile(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/lseek.html *)letlseekfdamountseek_command=Unix.LargeFile.lseek(Fd.unsafe_getfd)amountseek_command(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/ftruncate.html *)letftruncatefdsize=Unix.LargeFile.ftruncate(Fd.unsafe_getfd)size(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fstat.html *)letfstatfd=Unix.LargeFile.fstat(Fd.unsafe_getfd)end(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/mmap.html
This can raise EAGAIN, but it probably should not be handled? *)letmap_filefd?poskindlayoutshareddims=Unix.map_file(Fd.unsafe_getfd)?poskindlayoutshareddims(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fchmod.html *)letfchmodfdfile_perm=Unix.fchmod(Fd.unsafe_getfd)file_perm(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fchown.html *)letfchownfduidgid=Unix.fchown(Fd.unsafe_getfd)uidgid(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html *)letdup?cloexecfd=Fd.create(Unix.dup?cloexec(Fd.unsafe_getfd))(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html *)letdup2?cloexecsrcdst=Unix.dup2?cloexec(Fd.unsafe_getsrc)(Fd.unsafe_getdst)letset_nonblockfd=Unix.set_nonblock(Fd.unsafe_getfd);Htbl.try_addnonblock_fds(Fd.unsafe_getfd)()|>ignoreletclear_nonblockfd=Unix.clear_nonblock(Fd.unsafe_getfd);Htbl.try_removenonblock_fds(Fd.unsafe_getfd)|>ignore(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fcntl.html *)letset_close_on_execfd=Unix.set_close_on_exec(Fd.unsafe_getfd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/fcntl.html *)letclear_close_on_execfd=Unix.clear_close_on_exec(Fd.unsafe_getfd)(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html *)letpipe?cloexec()=letinn,out=Unix.pipe?cloexec()in(Fd.createinn,Fd.createout)(* *)letcreate_processprogargsstdinstdoutstderr=Unix.create_processprogargs(Fd.unsafe_getstdin)(Fd.unsafe_getstdout)(Fd.unsafe_getstderr)letcreate_process_envprogargsenvstdinstdoutstderr=Unix.create_process_envprogargsenv(Fd.unsafe_getstdin)(Fd.unsafe_getstdout)(Fd.unsafe_getstderr)(*let open_process_in _ = failwith "TODO"*)(*let open_process_out _ = failwith "TODO"*)(*let open_process _ = failwith "TODO"*)(*let open_process_full _ = failwith "TODO"*)(*let open_process_args _ = failwith "TODO"*)(*let open_process_args_in _ = failwith "TODO"*)(*let open_process_args_out _ = failwith "TODO"*)(*let open_process_args_full _ = failwith "TODO"*)(*let process_in_pid _ = failwith "TODO"*)(*let process_out_pid _ = failwith "TODO"*)(*let process_pid _ = failwith "TODO"*)(*let process_full_pid _ = failwith "TODO"*)(*let close_process_in _ = failwith "TODO"*)(*let close_process_out _ = failwith "TODO"*)(*let close_process _ = failwith "TODO"*)(*let close_process_full _ = failwith "TODO"*)(* *)moduleWait_flag=structletnohang_bit=0b10letuntraced_bit=0b01(* Note that this is optimized to the identity function. *)letto_int=functionWNOHANG->0|WUNTRACED->1letto_bitflag=nohang_bit-to_intflaglet()=assert(to_bitWNOHANG=nohang_bit)let()=assert(to_bitWUNTRACED=untraced_bit)letrecto_bitsflagsbits=matchflagswith|[]->bits|flag::flags->to_bitsflags(bitslorto_bitflag)letto_bitsflags=to_bitsflags0letto_flags=[|[];[WUNTRACED];[WNOHANG];[WNOHANG;WUNTRACED]|]letto_flagsbits=Array.unsafe_getto_flagsbitsendletrecwaitpid_unix~bits~pid=ifbitslandWait_flag.nohang_bit<>0thenUnix.waitpid(Wait_flag.to_flagsbits)pidelseletcomputation=Computation.create~mode:`LIFO()inSelect.return_on_sigchldcomputation();matchUnix.waitpid(Wait_flag.to_flags(bitslorWait_flag.nohang_bit))pidwith|exceptionUnix_error(EINTR,_,_)->Computation.finishcomputation;waitpid_unix~bits~pid|(pid_or_0,_)asresult->ifpid_or_0=0thenbeginComputation.awaitcomputation;waitpid_unix~bits~pidendelsebeginComputation.finishcomputation;resultend|exceptionexn->Computation.finishcomputation;raiseexnletwaitpid_win32~bits~pid=ifbitslandWait_flag.nohang_bit<>0thenUnix.waitpid(Wait_flag.to_flagsbits)pidelse(* One way to provide a scheduler friendly [waitpid] on Windows would be
to use a thread pool to run blocking operations on. PR for a thread
pool implemented in Picos would be welcome! *)invalid_arg"currently not supported on Windows without WNOHANG"letwaitpidflagspid=letbits=Wait_flag.to_bitsflagsinifSys.win32thenifpid<>-1thenwaitpid_win32~bits~pidelsebegin(* This should raise? *)Unix.waitpidflagspidendelsewaitpid_unix~bits~pidletwait()=ifnotSys.win32thenwaitpid_unix~bits:0~pid:(-1)elsebegin(* This should raise [Invalid_argument] *)Unix.wait()end(* *)letsh="/bin/sh"letsystemcmd=ifSys.win32then(* One way to provide a scheduler friendly [system] on Windows would be to
use a thread pool to run blocking operations on. PR for a thread pool
implemented in Picos would be welcome! *)invalid_arg"currently not supported on Windows"elsecreate_processsh[|sh;"-c";cmd|]stdinstdoutstderr|>waitpid[]|>snd(* *)letsleepfseconds=Fiber.sleep~secondsletsleepseconds=Fiber.sleep~seconds:(Float.of_intseconds)(* *)exceptionDoneletempty_bt=Printexc.get_callstack0let[@alert"-handler"]selectrdswrsexsseconds=letoverall=Computation.create~mode:`LIFO()inletcanceler=Trigger.from_actionoverall()@@fun_overall_->Select.cancel_afteroverall~seconds:0.0Doneempty_btinletprepareopfd=letcomputation=Computation.create~mode:`LIFO()inifComputation.try_attachcomputationcancelerthenSelect.return_oncomputationfdoptrue;computationinletrdcs=List.map(prepare`R)rdsinletwrcs=List.map(prepare`W)wrsinletexcs=List.map(prepare`E)exsinletfinisher=Trigger.from_actionrdcswrcs@@fun_rdcswrcs->letreturn_falsecomputation=Computation.returncomputationfalseinList.iterreturn_falserdcs;List.iterreturn_falsewrcs;List.iterreturn_falseexcsinifnot(Computation.try_attachoverallfinisher)thenTrigger.signalfinisherelseif0.0<=secondsthenSelect.cancel_afteroverall~secondsDoneempty_bt;matchComputation.awaitoverallwith|()->assertfalse|exceptionDone->let[@tail_mod_cons]reczip_filterpredxsys=match(xs,ys)with|x::xs,y::ys->ifpredythenx::zip_filterpredxsyselsezip_filterpredxsys|_,_->[]in(zip_filterComputation.awaitrdsrdcs,zip_filterComputation.awaitwrswrcs,zip_filterComputation.awaitexsexcs)|exceptioncancelation_exn->Computation.canceloverallDoneempty_bt;raisecancelation_exn(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/lockf.html *)letlockffdlock_commandlength=Unix.lockf(Fd.unsafe_getfd)lock_command length(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/socket.html *)letsocket?cloexecsocket_domainsocket_typeprotocol=Fd.create(Unix.socket?cloexecsocket_domain socket_typeprotocol)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/socketpair.html *)letsocketpair?cloexecsocket_domainsocket_typemystery=letfst,snd=Unix.socketpair?cloexecsocket_domainsocket_typemysteryin(Fd.createfst,Fd.createsnd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/accept.html *)letaccept?cloexecfd=letfd,sockaddr =again_cloexec_0?cloexecfdUnix.accept`Rin(Fd.createfd,sockaddr)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/bind.html *)letbindfdsockaddr=progress_1fdsockaddrUnix.bind`W"bind"(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html *)letconnectfdsockaddr=progress_1fdsockaddrUnix.connect`W"connect"(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/listen.html *)letlistenfdmax_pending=Unix.listen(Fd.unsafe_getfd)max_pending(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/shutdown.html *)letshutdownfdshutdown_command=Unix.shutdown(Fd.unsafe_getfd)shutdown_command(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/getsockname.html *)letgetsocknamefd=Unix.getsockname(Fd.unsafe_getfd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/getpeername.html *)letgetpeernamefd=Unix.getpeername(Fd.unsafe_getfd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/recv.html *)letrecvfdbytesoffsetlengthflags=again_4fdbytesoffsetlengthflagsUnix.recv(ifList.existsis_oobflagsthen`Eelse`R)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/recvfrom.html *)letrecvfromfdbytesoffsetlengthflags=again_4fdbytesoffsetlengthflagsUnix.recvfrom(ifList.existsis_oobflagsthen`Eelse`R)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/send.html *)letsendfdbytesoffsetlengthflags=again_4fdbytesoffsetlengthflagsUnix.send`W(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/send.html *)letsend_substringfdstringoffsetlengthflags=again_4fdstringoffsetlengthflagsUnix.send_substring`W(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/sendto.html *)letsendtofdbytesoffsetlengthflagssockaddr=again_5fdbytesoffsetlengthflagssockaddrUnix.sendto`W(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/sendto.html *)letsendto_substringfdstringoffsetlengthflagssockaddr=again_5fdstringoffsetlengthflagssockaddrUnix.sendto_substring`W(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/getsockopt.html *)letgetsockoptfdoption=Unix.getsockopt(Fd.unsafe_getfd)option(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/setsockopt.html *)letsetsockoptfdoptionbool=Unix.setsockopt(Fd.unsafe_getfd)optionbool(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/getsockopt.html *)letgetsockopt_intfdoption=Unix.getsockopt_int(Fd.unsafe_getfd)optionletsetsockopt_intfdoptionint=Unix.setsockopt_int(Fd.unsafe_getfd)optionintletgetsockopt_optintfdoption=Unix.getsockopt_optint(Fd.unsafe_getfd)optionletsetsockopt_optintfdoptionoptint=Unix.setsockopt_optint(Fd.unsafe_getfd)optionoptintletgetsockopt_floatfdoption=Unix.getsockopt_float(Fd.unsafe_getfd)optionletsetsockopt_floatfdoptionfloat=Unix.setsockopt_float(Fd.unsafe_getfd)optionfloatletgetsockopt_errorfd=Unix.getsockopt_error(Fd.unsafe_getfd)(* *)(*let open_connection _ = failwith "TODO"*)(*let shutdown_connection _ = failwith "TODO"*)(*let establish_server _ = failwith "TODO"*)(* *)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/tcgetattr.html *)lettcgetattrfd=Unix.tcgetattr(Fd.unsafe_getfd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/tcsetattr.html *)lettcsetattrfdsetattr_whenterminal_io=Unix.tcsetattr(Fd.unsafe_getfd)setattr_whenterminal_io(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/tcsendbreak.html *)lettcsendbreakfdduration=Unix.tcsendbreak(Fd.unsafe_getfd)duration(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/tcdrain.html *)lettcdrainfd=Unix.tcdrain(Fd.unsafe_getfd)(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/tcflush.html *)lettcflushfdflush_queue=Unix.tcflush(Fd.unsafe_getfd)flush_queue(* https://pubs.opengroup.org/onlinepubs/9699919799/functions/tcflow.html *)lettcflowfdflow_action=Unix.tcflow(Fd.unsafe_getfd)flow_actionend