Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file mirage_flow_lwt.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376(*
* Copyright (c) 2011-present Anil Madhavapeddy <anil@recoil.org>
* Copyright (c) 2013-present Thomas Gazagnaire <thomas@gazagnaire.org>
* Copyright (C) 2016-present David Scott <dave.scott@docker.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)openLwt.Infixletsrc=Logs.Src.create"mirage-flow-lwt"moduleLog=(valLogs.src_logsrc:Logs.LOG)moduletypeS=Mirage_flow.Swithtype'aio='aLwt.tandtypebuffer=Cstruct.tmoduletypeABSTRACT=Mirage_flow.ABSTRACTwithtype'aio='aLwt.tandtypebuffer=Cstruct.tmoduletypeCONCRETE=Mirage_flow.CONCRETEwithtype'aio='aLwt.tandtypebuffer=Cstruct.tmoduleConcrete(S:S)=Mirage_flow.Concrete(S)(Lwt)moduletypeSHUTDOWNABLE=Mirage_flow.SHUTDOWNABLEwithtype'aio='aLwt.tandtypebuffer=Cstruct.ttypetime=int64type'astats={read_bytes:int64ref;read_ops:int64ref;write_bytes:int64ref;write_ops:int64ref;finish:timeoptionref;start:time;time:unit->time;t:(unit,'a)resultLwt.t;}letstatst=letduration:int64=match!(t.finish)with|None->Int64.sub(t.time())t.start|Somex->Int64.subxt.startin{Mirage_flow.read_bytes=!(t.read_bytes);read_ops=!(t.read_ops);write_bytes=!(t.write_bytes);write_ops=!(t.write_ops);duration;}moduleCopy(Clock:Mirage_clock.MCLOCK)(A:S)(B:S)=structtypeerror=[`AofA.error|`BofB.write_error]letpp_errorppf=function|`Ae->A.pp_errorppfe|`Be->B.pp_write_errorppfeletstart(clock:Clock.t)(a:A.flow)(b:B.flow)=letread_bytes=ref0Linletread_ops=ref0Linletwrite_bytes=ref0Linletwrite_ops=ref0Linletfinish=refNoneinletstart=Clock.elapsed_nsclockinletrecloopc()=A.reada>>=function|Errore->finish:=Some(Clock.elapsed_nsc);Lwt.return(Error(`Ae))|Ok`Eof->finish:=Some(Clock.elapsed_nsc);Lwt.return(Ok())|Ok(`Databuffer)->read_ops:=Int64.succ!read_ops;read_bytes:=Int64.(add!read_bytes(of_int@@Cstruct.lenbuffer));B.writebbuffer>>=function|Ok()->write_ops:=Int64.succ!write_ops;write_bytes:=Int64.(add!write_bytes(of_int@@Cstruct.lenbuffer));loopc()|Errore->finish:=Some(Clock.elapsed_nsc);Lwt.return(Error(`Be))in{read_bytes;read_ops;write_bytes;write_ops;finish;start;time=(fun()->Clock.elapsed_nsclock);t=loopclock();}letwaitt=t.tletcopyclock~src:a~dst:b=lett=startclockabinwaitt>|=function|Ok()->Ok(statst)|Errore->ErroreendmoduleProxy(Clock:Mirage_clock.MCLOCK)(A:SHUTDOWNABLE)(B:SHUTDOWNABLE)=structmoduleA_to_B=Copy(Clock)(A)(B)moduleB_to_A=Copy(Clock)(B)(A)typeerror=[|`AofA_to_B.error|`BofB_to_A.error|`A_and_BofA_to_B.error*B_to_A.error]letpp_errorppf=function|`A_and_B(e1,e2)->Fmt.pfppf"flow proxy a: %a; flow proxy b: %a"A_to_B.pp_errore1B_to_A.pp_errore2|`Ae->Fmt.pfppf"flow proxy a: %a"A_to_B.pp_errore|`Be->Fmt.pfppf"flow proxy b: %a"B_to_A.pp_erroreletproxyclockab=leta2b=lett=A_to_B.startclockabinA_to_B.waitt>>=funresult->A.shutdown_reada>>=fun()->B.shutdown_writeb>|=fun()->letstats=statstinmatchresultwith|Ok()->Okstats|Errore->Erroreinletb2a=lett=B_to_A.startclockbainB_to_A.waitt>>=funresult->B.shutdown_readb>>=fun()->A.shutdown_writea>|=fun()->letstats=statstinmatchresultwith|Ok()->Okstats|Errore->Erroreina2b>>=funa_stats->b2a>|=funb_stats->matcha_stats,b_statswith|Oka_stats,Okb_stats->Ok(a_stats,b_stats)|Errore1,Errore2->Error(`A_and_B(e1,e2))|Errore1,_->Error(`Ae1)|_,Errore2->Error(`Be2)endmoduleF=structlet(>>=)=Lwt.bindtype'aio='aLwt.ttypebuffer=Cstruct.ttyperefill=Cstruct.t->int->int->intLwt.ttypeerrorletpp_errorppf(_:error)=Fmt.stringppf"Mirage_flow_lwt.Fun.error"typewrite_error=Mirage_flow.write_errorletpp_write_error=Mirage_flow.pp_write_errorletseqf1f2bufofflen=f1bufofflen>>=function|0->f2bufofflen|n->Lwt.returnnletzero_buf_off_len=Lwt.return0letreciterfn=function|[]->zero|h::t->seq(fnh)(iterfnt)typeflow={close:unit->unitLwt.t;input:refill;output:refill;mutablebuf:Cstruct.t;mutableic_closed:bool;mutableoc_closed:bool;}letdefault_buffer_size=4096letmake?(close=fun()->Lwt.return_unit)?input?output()=letbuf=Cstruct.createdefault_buffer_sizeinletic_closed=input=Noneinletoc_closed=output=Noneinletinput=matchinputwithNone->zero|Somex->xinletoutput=matchoutputwithNone->zero|Somex->xin{close;input;output;buf;ic_closed;oc_closed;}letinput_fnlenblitstr=letstr_off=ref0inletstr_len=lenstrinfunbufofflen->if!str_off>=str_lenthenLwt.return0else(letlen=min(str_len-!str_off)leninblitstr!str_offbufofflen;str_off:=!str_off+len;Lwt.returnlen)letoutput_fnlenblitstr=letstr_off=ref0inletstr_len=lenstrinfunbufofflen->if!str_off>=str_lenthenLwt.return0else(letlen=min(str_len-!str_off)leninblitbufoffstr!str_offlen;str_off:=!str_off+len;Lwt.returnlen)letmkfn_ifn_o?input?output()=letinput=matchinputwithNone->None|Somex->Some(fn_ix)inletoutput=matchoutputwithNone->None|Somex->Some(fn_ox)inmake?input?output()letinput_string=input_fnString.lengthCstruct.blit_from_stringletoutput_bytes=output_fnBytes.lengthCstruct.blit_to_bytesletstring=mkinput_stringoutput_bytesletinput_cstruct=input_fnCstruct.lenCstruct.blitletoutput_cstruct=output_fnCstruct.lenCstruct.blitletcstruct=mkinput_cstructoutput_cstructletinput_strings=iterinput_stringletoutput_bytess=iteroutput_bytesletstrings=mkinput_stringsoutput_bytessletinput_cstructs=iterinput_cstructletoutput_cstructs=iteroutput_cstructletcstructs=mkinput_cstructsoutput_cstructsletrefillch=ifCstruct.lench.buf=0then(letbuf=Cstruct.createdefault_buffer_sizeinch.buf<-buf)letreadch=ifch.ic_closedthenLwt.return@@Ok`Eofelse(refillch;ch.inputch.buf0default_buffer_size>>=funn->ifn=0then(ch.ic_closed<-true;Lwt.return(Ok`Eof);)else(letret=Cstruct.subch.buf0ninletbuf=Cstruct.shiftch.bufninch.buf<-buf;Lwt.return(Ok(`Dataret))))letwritechbuf=ifch.oc_closedthenLwt.return@@Error`Closedelse(letlen=Cstruct.lenbufinletrecauxoff=ifoff=lenthenLwt.return(Ok())else(ch.outputbufoff(len-off)>>=funn->ifn=0then(ch.oc_closed<-true;Lwt.return@@Error`Closed)elseaux(off+n))inaux0)letwritevchbufs=ifch.oc_closedthenLwt.return@@Error`Closedelseletrecaux=function|[]->Lwt.return(Ok())|h::t->writechh>>=function|Errore->Lwt.return(Errore)|Ok()->auxtinauxbufsletclosech=ch.ic_closed<-true;ch.oc_closed<-true;ch.close()endtype'aio='aLwt.ttypebuffer=Cstruct.ttypeerror=[`Msgofstring]typewrite_error=[Mirage_flow.write_error|error]letpp_errorppf(`Msgs)=Fmt.stringppfsletpp_write_errorppf=function|#Mirage_flow.write_errorase->Mirage_flow.pp_write_errorppfe|#errorase->pp_errorppfetypeflow=|Flow:string*(moduleCONCRETEwithtypeflow='a)*'a->flowtypet=flowletcreate(typea)(moduleM:Swithtypeflow=a)tname=letm=(moduleConcrete(M):CONCRETEwithtypeflow=a)inFlow(name,m,t)letread(Flow(_,(moduleF),flow))=F.readflowletwrite(Flow(_,(moduleF),flow))b=F.writeflowbletwritev(Flow(_,(moduleF),flow))b=F.writevflowbletclose(Flow(_,(moduleF),flow))=F.closeflowletppppf(Flow(name,_,_))=Fmt.stringppfnameletforward?(verbose=false)~src~dst=letrecloop()=readsrc>>=function|Ok`Eof->Log.err(funl->l"forward[%a => %a] EOF"ppsrcppdst);Lwt.return_unit|Errore->Log.err(funl->l"forward[%a => %a] %a"ppsrcppdstpp_errore);Lwt.return_unit|Ok(`Databuf)->Log.debug(funl->letpayload=ifverbosethenFmt.strf"[%S]"@@Cstruct.to_stringbufelseFmt.strf"%d bytes"(Cstruct.lenbuf)inl"forward[%a => %a] %s"ppsrcppdstpayload);writedstbuf>>=function|Ok()->loop()|Errore->Log.err(funl->l"forward[%a => %a] %a"ppsrcppdstpp_write_errore);Lwt.return_unitinloop()letproxy?verbosef1f2=Lwt.join[forward?verbose~src:f1~dst:f2;forward?verbose~src:f2~dst:f1;]