Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file async_udp.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384open!Coreopen!Asyncopen!Importopen!Int.Replace_polymorphic_comparetypewrite_buffer=(read_write,Iobuf.seek)Iobuf.tletdefault_capacity=1472letdefault_retry=12moduleConfig=structtypet={capacity:int;init:write_buffer;stop:unitDeferred.t;max_ready:int}[@@derivingfields]letcreate?(capacity=default_capacity)?(init=Iobuf.create~len:capacity)?(stop=Deferred.never())?(max_ready=default_retry)()={capacity;init;stop;max_ready};;endletfailiobufmessageasexp_of_a=(* Render buffers immediately, before we have a chance to change them. *)failwithsmessage(a,[%sexp_of:(_,_)Iobuf.Hexdump.toption]iobuf)(Tuple.T2.sexp_of_tsexp_of_aident);;(* Don't use [Or_error.map] to extract any of the [send] functions. The natural usage
results in partially applied functions! *)letsendto_sync()=matchIobuf.sendto_nonblocking_no_sigpipe()with|Error_ase->e|Oksendto->Ok(funfdbufaddr->Fd.with_file_descr_exnfd~nonblocking:true(fundesc->sendtobufdesc(Unix.Socket.Address.to_sockaddraddr)));;letsend_sync()=matchIobuf.send_nonblocking_no_sigpipe()with|Error_ase->e|Oksend->Ok(funfdbuf->Fd.with_file_descr_exnfd~nonblocking:true(fundesc->sendbufdesc));;(** [ready_iter fd ~stop ~max_ready ~f] iterates [f] over [fd], handling [EINTR] by
retrying immediately (at most [max_ready] times in a row) and [EWOULDBLOCK]/[EAGAIN]
by retrying when ready. Iteration is terminated when [fd] closes, [stop] fills, or
[f] returns [User_stopped].
[ready_iter] may fill [stop] itself.
By design, this function will not return to the Async scheduler until [fd] is no
longer ready to transfer data or [f] has succeeded [max_ready] consecutive times. To
avoid starvation, use [stop] or [User_stopped] and/or choose [max_ready] carefully to
allow other Async jobs to run.
@raise Unix.Unix_error on errors other than [EINTR] and [EWOULDBLOCK]/[EAGAIN]. *)moduleReady_iter=structmoduleOk=structtypet=|Poll_again|User_stopped[@@derivingsexp_of,enumerate,compare]letof_int_exn=function|0->Poll_again|1->User_stopped|i->failwithf"invalid ready iter ok %d"i();;letto_int=function|Poll_again->0|User_stopped->1;;endincludeUnix.Syscall_result.Make(Ok)()letpoll_again=create_okPoll_againletuser_stopped=create_okUser_stoppedendletready_iterfd~stop~max_ready~fread_or_write~syscall_name=letrecinner_loopifile_descr:Ready_iter.Ok.t=ifi<max_ready&&Ivar.is_emptystop&&Fd.is_openfdthen(matchffile_descr|>Ready_iter.to_result(* doesn't allocate *)with|OkPoll_again|ErrorEINTR->inner_loop(i+1)file_descr|OkUser_stopped->User_stopped|Error(EWOULDBLOCK|EAGAIN)->Poll_again(* This looks extreme but serves the purpose of effectively terminating the
[interruptible_every_ready_iter] job and the [ready_iter] loop. *)|Errore->raise(Unix.Unix_error(e,syscall_name,"")))elsePoll_againin(* [Fd.with_file_descr] is for [Raw_fd.set_nonblock_if_necessary].
[with_file_descr_deferred] would be the more natural choice, but it doesn't call
[set_nonblock_if_necessary]. *)matchFd.with_file_descr~nonblocking:truefd(funfile_descr->Fd.interruptible_every_ready_tofdread_or_write~interrupt:(Ivar.readstop)(funfile_descr->matchinner_loop0file_descrwith|Poll_again->()|User_stopped->Ivar.fill_if_emptystop())file_descr)with(* Avoid one ivar creation and wait immediately by returning the result from
[Fd.interruptible_every_ready_to] directly. *)|`Okdeferred->deferred|`Already_closed->return`Closed|`Errore->raisee;;letsendto()=matchIobuf.sendto_nonblocking_no_sigpipe()with|Error_ase->e|Oksendto->Ok(funfdbufaddr->letaddr=Unix.Socket.Address.to_sockaddraddrinletstop=Ivar.create()inready_iterfd~max_ready:default_retry~stop`Write~syscall_name:"sendto"~f:(funfile_descr->matchUnix.Syscall_result.Unit.to_result(sendtobuffile_descraddr)with|Ok()->Ready_iter.user_stopped|Errore->Ready_iter.create_errore)>>=function|`Interrupted->Deferred.unit|(`Bad_fd|`Closed|`Unsupported)aserror->fail(Somebuf)"Udp.sendto"(error,addr)[%sexp_of:[`Bad_fd|`Closed|`Unsupported]*Core.Unix.sockaddr]);;letsend()=matchIobuf.send_nonblocking_no_sigpipe()with|Error_ase->e|Oksend->Ok(funfdbuf->letstop=Ivar.create()inready_iterfd~max_ready:default_retry~stop`Write~syscall_name:"send"~f:(funfile_descr->matchUnix.Syscall_result.Unit.to_result(sendbuffile_descr)with|Ok()->Ready_iter.user_stopped|Errore->Ready_iter.create_errore)>>=function|`Interrupted->Deferred.unit|(`Bad_fd|`Closed|`Unsupported)aserror->fail(Somebuf)"Udp.send"error[%sexp_of:[`Bad_fd|`Closed|`Unsupported]]);;letbind?ifname?sourceaddr=letsocket=Socket.createSocket.Type.udpinletis_multicasta=Unix.Cidr.does_matchUnix.Cidr.multicast(Socket.Address.Inet.addra)inifis_multicastaddrthen((* We do not treat [mcast_join] as a blocking operation because it only instructs
the kernel to send an IGMP message, which the kernel handles asynchronously. *)tryCore.Unix.mcast_join?source?ifname(Fd.file_descr_exn(Socket.fdsocket))(Socket.Address.to_sockaddraddr)with|exn->raise_s[%message"Async_udp.bind unable to join multicast group"(addr:Socket.Address.Inet.t)(ifname:stringoption)(exn:Exn.t)]);Socket.bind_inetsocketaddr;;letbind_any()=letbind_addr=Socket.Address.Inet.create_bind_any~port:0inletsocket=Socket.createSocket.Type.udpin(* When bind() is called with a port number of zero, a non-conflicting local port
address is chosen (i.e., an ephemeral port). In almost all cases where we use
this, we want a unique port, and hence prevent reuseaddr. *)trySocket.bind_inetsocket~reuseaddr:falsebind_addrwith|bind_exn->letsocket_fd=Socket.fdsocketindon't_wait_for(* Errors from [close] are generally harmless, so we ignore them *)(Monitor.handle_errors(fun()->Fd.closesocket_fd)(fun(_:exn)->()));raisebind_exn;;moduleLoop_result=structtypet=|Closed|Stopped[@@derivingsexp_of,compare]letof_fd_interruptible_every_ready_to_result_exnbuffunction_namexsexp_of_xresult=matchresultwith|(`Bad_fd|`Unsupported)aserror->failbuffunction_name(error,x)[%sexp_of:[`Bad_fd|`Unsupported]*x]|`Closed->Closed|`Interrupted->Stopped;;endletrecvfrom_loop_with_buffer_replacement?(config=Config.create())fdf=letstop=Ivar.create()inConfig.stopconfig>>>Ivar.fill_if_emptystop;letbuf=ref(Config.initconfig)inready_iter~stop~max_ready:config.max_readyfd`Read~syscall_name:"recvfrom"~f:(funfile_descr->matchIobuf.recvfrom_assume_fd_is_nonblocking!buffile_descrwith|exceptionUnix.Unix_error(e,_,_)->Ready_iter.create_errore|ADDR_UNIXdom->fail(Some!buf)"Unix domain socket addresses not supported"dom[%sexp_of:string]|ADDR_INET(host,port)->Iobuf.flip_lo!buf;buf:=f!buf(`Inet(host,port));Iobuf.reset!buf;Ready_iter.poll_again)>>|Loop_result.of_fd_interruptible_every_ready_to_result_exn(Some!buf)"recvfrom_loop_without_buffer_replacement"fd[%sexp_of:Fd.t];;letrecvfrom_loop?configfdf=recvfrom_loop_with_buffer_replacement?configfd(funba->fba;b);;(* We don't care about the address, so read instead of recvfrom. *)letread_loop_with_buffer_replacement?(config=Config.create())fdf=letstop=Ivar.create()inConfig.stopconfig>>>Ivar.fill_if_emptystop;letbuf=ref(Config.initconfig)inready_iter~stop~max_ready:config.max_readyfd`Read~syscall_name:"read"~f:(funfile_descr->letresult=Iobuf.read_assume_fd_is_nonblocking!buffile_descrinifUnix.Syscall_result.Unit.is_okresultthen(Iobuf.flip_lo!buf;buf:=f!buf;Iobuf.reset!buf;Ready_iter.poll_again)elseUnix.Syscall_result.Unit.reinterpret_error_exnresult)>>|Loop_result.of_fd_interruptible_every_ready_to_result_exn(Some!buf)"read_loop_with_buffer_replacement"fd[%sexp_of:Fd.t];;letread_loop?configfdf=read_loop_with_buffer_replacement?configfd(funb->fb;b);;(* Too small a [max_count] here negates the value of [recvmmsg], while too large risks
starvation of other ready file descriptors. 32 was chosen empirically to stay below
~64kb of data, assuming a standard Ethernet MTU. *)letdefault_recvmmsg_loop_max_count=32letrecvmmsg_loop=letcreate_buffers~max_countconfig=letlen=Config.capacityconfiginiflen<=2048then(letbstr=Bigstring.create(2048*max_count)inArray.initmax_count~f:(funindex->Iobuf.of_bigstring~pos:(index*2048)~lenbstr))elseArray.initmax_count~f:(function|0->Config.initconfig|_->Iobuf.create~len:(Iobuf.length(Config.initconfig)))inmatchIobuf.recvmmsg_assume_fd_is_nonblockingwith|Error_ase->e|Okrecvmmsg->Ok(fun?(config=Config.create())?(max_count=default_recvmmsg_loop_max_count)?(on_wouldblock=fun()->())fdf->letbufs=create_buffers~max_countconfiginletcontext=Iobuf.Recvmmsg_context.createbufsinletstop=Ivar.create()inConfig.stopconfig>>>Ivar.fill_if_emptystop;ready_iter~stop~max_ready:config.max_readyfd`Read~syscall_name:"recvmmsg"~f:(funfile_descr->letresult=recvmmsgfile_descrcontextinifUnix.Syscall_result.Int.is_okresultthen(letcount=Unix.Syscall_result.Int.ok_exnresultinifcount>Array.lengthbufsthenfailwithf"Unexpected result from Iobuf.recvmmsg_assume_fd_is_nonblocking: \
count (%d) > Array.length bufs (%d)"count(Array.lengthbufs)()else((* [recvmmsg_assume_fd_is_nonblocking] implicitly calls [flip_lo]
before and [reset] after the call, so we mustn't. *)fbufs~count;Ready_iter.poll_again))else((matchUnix.Syscall_result.Int.error_exnresultwith|EWOULDBLOCK|EAGAIN->on_wouldblock()|_->());Unix.Syscall_result.Int.reinterpret_error_exnresult))>>|Loop_result.of_fd_interruptible_every_ready_to_result_exnNone"recvmmsg_loop"fd[%sexp_of:Fd.t]);;modulePrivate=structmoduleReady_iter=Ready_iterend