Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file netchannels.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169(* $Id$
* ----------------------------------------------------------------------
*
*)openNetsys_typesopenNetstring_tstringexceptionClosed_channelexceptionBuffer_underrunexceptionCommand_failureofUnix.process_statuslet()=Netexn.register_printer(Command_failure(Unix.WEXITED0))(fune->matchewith|Command_failureps->letps_str=matchpswith|Unix.WEXITEDn-> "WEXITED "^string_of_intn|Unix.WSIGNALEDn->"WSIGNALED "^string_of_intn|Unix.WSTOPPEDn->"WSTOPPED "^string_of_intnin"Netchannels.Command_failure(" ^ps_str^")"|_->assertfalse)let()=Netsys_signal.init()classtyperec_in_channel=objectmethodinput:Bytes.t->int->int->intmethodclose_in:unit->unitendclasstyperaw_in_channel=objectinheritrec_in_channelmethodpos_in:int(* number of read characters *)endtypeinput_result=[`Dataofint|`Separatorofstring]classtypeenhanced_raw_in_channel=objectinheritraw_in_channelmethodprivateenhanced_input_line:unit->stringmethodprivateenhanced_input:Bytes.t->int->int->input_resultendclasstyperec_out_channel=objectmethodoutput:Bytes.t->int->int->intmethodclose_out:unit->unitmethodflush:unit->unitendclasstyperaw_out_channel=objectinheritrec_out_channelmethodpos_out:int(* number of written characters *)endclasstyperaw_io_channel=objectinheritraw_in_channelinheritraw_out_channelendclasstypecompl_in_channel=object(* Classic operations: *)methodreally_input:Bytes.t->int->int->unitmethodreally_input_string:int->stringmethodinput_char:unit ->charmethodinput_line:unit->stringmethodinput_byte:unit ->intendclasstypein_obj_channel=objectinheritraw_in_channelinheritcompl_in_channelendclasstypecompl_out_channel=object(* Classic operations: *)methodreally_output:Bytes.t->int->int->unitmethodreally_output_string:string ->int->int->unitmethodoutput_char:char->unitmethodoutput_bytes:Bytes.t->unitmethodoutput_string:string->unitmethodoutput_byte:int->unitmethodoutput_buffer:Buffer.t->unitmethodoutput_channel:?len:int->in_obj_channel->unit(* ~len: optionally limit the number of bytes *)endclasstypeout_obj_channel=objectinheritraw_out_channelinheritcompl_out_channelendclasstypeio_obj_channel=objectinheritin_obj_channelinheritout_obj_channelendclasstypetrans_out_obj_channel=objectinheritout_obj_channelmethodcommit_work:unit->unitmethodrollback_work:unit ->unitend;;(* error_behavior: currently not used. This was a proposal to control
* error handling, but it is not clear whether it is really
* useful or not.
* I do not delete these types because they remind us of this
* possibility. Maybe we find an outstanding example for them, and
* want to have them back.
*)typeerror_behavior=[`Close|`Funof(unit->unit)|`None]typeextended_error_behavior=[`Close|`Rollback|`Funof(unit->unit)|`None]typeclose_mode=[`Commit|`Rollback ];;(* Delegation *)classrec_in_channel_delegation ?(close=true)(ch:rec_in_channel)=object(self)methodinput=ch#inputmethod close_in()=ifclosethench#close_in()endclassraw_in_channel_delegation?(close=true)(ch:raw_in_channel)=object(self)methodinput=ch#inputmethod close_in()=ifclosethench#close_in()methodpos_in=ch#pos_inendclassin_obj_channel_delegation?(close=true)(ch:in_obj_channel)=object(self)methodinput=ch#inputmethod close_in()=ifclosethench#close_in()methodpos_in=ch#pos_inmethodreally_input=ch#really_inputmethodreally_input_string=ch#really_input_stringmethodinput_char=ch#input_charmethodinput_line=ch#input_linemethodinput_byte=ch#input_byteendclassrec_out_channel_delegation?(close=true)(ch:rec_out_channel)=object(self)methodoutput =ch#outputmethodclose_out()=ifclosethench#close_out()methodflush=ch#flushendclassraw_out_channel_delegation?(close=true)(ch:raw_out_channel)=object(self)methodoutput =ch#outputmethodclose_out()=ifclosethench#close_out()methodflush=ch#flushmethodpos_out=ch#pos_outendclassout_obj_channel_delegation?(close=true)(ch:out_obj_channel)=object(self)methodoutput =ch#outputmethodclose_out()=ifclosethench#close_out()methodflush=ch#flushmethodpos_out=ch#pos_outmethodreally_output=ch#really_outputmethodreally_output_string=ch#really_output_stringmethodoutput_char=ch#output_charmethodoutput_string=ch#output_stringmethodoutput_bytes=ch#output_bytesmethodoutput_byte=ch#output_bytemethodoutput_buffer=ch#output_buffermethodoutput_channel=ch#output_channelend(****************************** input ******************************)classinput_channel?(onclose=fun()->())ch(* : in_obj_channel *) =object (self)valch=chvalmutableclosed=falsemethodprivatecomplain_closed()=raiseClosed_channelmethodinputbufposlen=ifclosedthenself #complain_closed();tryiflen=0thenraiseSys_blocked_io;letn=input chbufposleninifn=0then raiseEnd_of_fileelsenwithSys_blocked_io->0methodreally_inputbufposlen=ifclosedthenself #complain_closed();really_inputchbufposlenmethodreally_input_stringlen=ifclosedthenself#complain_closed();#ifdefHAVE_BYTESreally_input_stringchlen#elseletbuf=String.createleninreally_inputchbuf0len;buf#endifmethodinput_char()=ifclosedthenself#complain_closed();input_charchmethodinput_line()=ifclosedthenself#complain_closed();input_linechmethodinput_byte()=ifclosedthenself#complain_closed();input_bytechmethodclose_in ()=ifnotclosedthen(close_inch;closed<-true;onclose())methodpos_in=ifclosedthenself#complain_closed();pos_inchend;;letinput_channel=newinput_channelclassinput_commandcmd=letch=Unix.open_process_incmdinobject(self)inheritinput_channelchassupermethodclose_in()=ifnotclosedthen(letp=Unix.close_process_inchinclosed<-true;ifp<>Unix.WEXITED0thenraise(Command_failurep);)end;;letinput_command=newinput_commandclass['t]input_generic nameops?(pos=0)?len(s:'t):in_obj_channel=object(self)valmutablestr=svalmutablestr_len=matchlenwithNone->ops.lengths|Somel->pos+lvalmutablestr_pos=posvalmutableclosed=falseinitializerifstr_pos<0||str_pos>ops.lengthstr||str_len<0||str_len>ops.lengthstheninvalid_arg("new Netchannels."^name)methodprivatecomplain_closed()=raiseClosed_channelmethodinputbufposlen=ifclosedthenself#complain_closed();ifpos <0||len<0||pos+len >Bytes.lengthbuftheninvalid_arg"input";letn=minlen(str_len-str_pos)inops.blit_to_bytesstrstr_posbufposn;str_pos<-str_pos+n;ifn=0&&len>0thenraiseEnd_of_fileelsenmethodreally_inputbufposlen=ifclosedthenself#complain_closed();ifpos <0||len<0||pos+len >Bytes.lengthbuftheninvalid_arg"really_input";letn=self#inputbufposleninifn<>lenthenraiseEnd_of_file;()methodreally_input_stringlen=ifclosedthenself#complain_closed();iflen<0theninvalid_arg"really_input_string";letbuf =Bytes.createleninletn=self #inputbuf0leninifn<>lenthenraiseEnd_of_file;Bytes.to_stringbufmethodinput_char()=ifclosedthenself#complain_closed();ifstr_pos>=str_lenthenraiseEnd_of_file;letc=ops.getstrstr_posinstr_pos<-str_pos+1;cmethodinput_line()=ifclosedthenself#complain_closed();tryletk=ops.index_fromstrstr_pos'\n'in(* CHECK: Are the different end of line conventions important here? *)letline=ops.substringstrstr_pos(k-str_pos)instr_pos<-k+1;linewithNot_found->ifstr_pos>=str_lenthenraiseEnd_of_file;(* Implicitly add linefeed at the end of the file: *)letline=ops.substringstrstr_pos(str_len-str_pos)instr_pos<-str_len;linemethodinput_byte()=Char.code(self#input_char())methodclose_in()=(* str <- ""; *)closed<-true;methodpos_in=ifclosedthenself#complain_closed();str_posend;;classinput_string=[string]input_generic"input_string"Netstring_tstring.string_opsletinput_string=newinput_stringclassinput_bytes=[Bytes.t]input_generic"input_bytes"Netstring_tstring.bytes_opsletinput_bytes=newinput_bytesclassinput_memory=[memory]input_generic"input_memory"Netstring_tstring.memory_opsletinput_memory=newinput_memorylet input_tstring?pos?lents=matchtswith|`Strings->input_string ?pos?lens|`Bytess->input_bytes?pos?lens|`Memorys->input_memory ?pos?lensclasstype nb_in_obj_channel=objectinheritin_obj_channelmethod shutdown :unit->unitendclass input_netbuffer?(keep_data=false)b:nb_in_obj_channel=object(self)valmutableb=bvalmutableoffset=0valmutableeof=falsevalmutableclosed=falsevalmutablech_pos=0methodprivatecomplain_closed()=raiseClosed_channelmethodprivateinput_into:typet.(int->int->t)->int->t=funflen->letn=minlen (Netbuffer.lengthb-offset)inifn=0&&len>0thenbeginifeofthenraiseEnd_of_fileelseraiseBuffer_underrunendelsebeginletresult=foffsetninifkeep_datathenoffset<-offset+nelseNetbuffer.deleteb0n;ch_pos<-ch_pos+n;resultendmethodinputbufposlen=ifclosedthenself#complain_closed();ifpos <0||len<0||pos>Bytes.lengthbuf-lentheninvalid_arg"input";self#input_into(funb_offsn->Netbuffer.blit bb_offsbufposn;n)lenmethodreally_inputbufposlen=ifclosedthenself#complain_closed();ifpos <0||len<0||pos+len >Bytes.lengthbuftheninvalid_arg"really_input";letn=self#inputbufposleninifn<>lenthenraiseEnd_of_file;()methodreally_input_stringlen=ifclosedthenself#complain_closed();iflen<0theninvalid_arg"really_input_string";self#input_into(funb_offsn->ifn<>lenthenraiseEnd_of_file;Netbuffer.subbb_offsn)lenmethodinput_char()=ifclosedthenself#complain_closed();lets=Bytes.create1inmatchself#inputs01with|1->Bytes.gets0|_->assertfalsemethodinput_line()=ifclosedthenself#complain_closed();tryletk=Netbuffer.index_fromboffset'\n'in(* CHECK: Are the different end of line conventions important here? *)letline=Netbuffer.subboffset(k-offset)inifkeep_datathenoffset<-offset+k+1elseNetbuffer.deleteb0(k+1);ch_pos<-ch_pos+k+1;linewithNot_found->ifeofthenbeginletn=Netbuffer.lengthb-offsetinifn=0thenraiseEnd_of_file;(* Implicitly add linefeed at the end of the file: *)letline=Netbuffer.subboffsetninifkeep_datathenoffset<-offset+nelseNetbuffer.clearb;ch_pos<-ch_pos+n;lineendelseraiseBuffer_underrunmethodinput_byte()=Char.code(self#input_char())methodclose_in()=closed<-true;methodpos_in=ifclosedthenself#complain_closed();ch_posmethodshutdown()=eof<-trueend;;letcreate_input_netbuffer ?keep_datab=letch=newinput_netbuffer?keep_databin(ch:>in_obj_channel),(ch#shutdown);;letlexbuf_of_in_obj_channel (objch :in_obj_channel):Lexing.lexbuf=letfill_bufferbuflen=tryletn=objch#inputbuf0leninifn=0thenfailwith"Netchannels.lexbuf_of_in_obj_channel: No data (non-blocking I/O?)";nwithEnd_of_file->0inLexing.from_functionfill_buffer;;letbytes_of_in_obj_channel(objch:in_obj_channel):Bytes.t=(* There are similarities to copy_channel below. *)(* The following algorithm uses only up to 2 * N memory, not 3 * N
* as with the Buffer module.
*)letslen=1024 inletl=ref[]inletk=ref0intrywhiletruedolets=Bytes.createsleninletn=objch #inputs0sleninifn=0then failwith"Netchannels.bytes_of_in_obj_channel: No data (non-blockingI/O?)";k:=!k+n;ifn<slenthenl:=(Bytes.subs0n)::!lelsel:=s::!l;done;assertfalsewithEnd_of_file->lets=Bytes.create!kinwhile!l<>[]domatch!lwithu::l'->letn=Bytes.lengthuink:=!k-n;Bytes.blitu0s!kn;l:=l'|[]->assertfalsedone;assert(!k=0);s;;letstring_of_in_obj_channel objch=Bytes.unsafe_to_string(bytes_of_in_obj_channelobjch)letlines_of_in_obj_channelch=letacc=ref[]intrywhiletruedoacc:=ch#input_line()::!accdone;assertfalsewith|End_of_file ->List.rev!acc;;letwith_in_obj_channelchf=tryletresult=fchin(trych#close_in()withClosed_channel->());resultwithe->(trych#close_in()withClosed_channel->());raisee;;classvirtualaugment_raw_in_channel =object(self)methodvirtualinput :Bytes.t->int->int->intmethodvirtual close_in :unit->unitmethodvirtualpos_in:intmethodreally_input sposlen=letrecread_restn=ifn<lenthenletm=self#inputs(pos+n)(len-n)inifm=0thenraiseSys_blocked_io;read_rest(n+m)else()inread_rest0methodreally_input_stringlen=letb=Bytes.createleninself#really_inputb0len;Bytes.unsafe_to_stringbmethodinput_char()=lets=Bytes.create1inself#really_inputs01;Bytes.gets0methodinput_byte()=lets=Bytes.create1inself#really_inputs01;Char.code(Bytes.gets0)methodinput_line()=lets=Bytes.create1inlet b=Buffer.create80inletm=self#inputs01inifm=0thenraiseSys_blocked_io;whileBytes.gets0<>'\n'doBuffer.add_char b(Bytes.gets0);tryletm=self#inputs01inifm=0thenraiseSys_blocked_io;withEnd_of_file->Bytes.sets0'\n'done;Buffer.contentsbend;;classlift_raw_in_channelr=object(self)inheritaugment_raw_in_channelmethodinputspl=r#inputsplmethodclose_in()=r#close_in()methodpos_in=r#pos_inend;;classlift_rec_in_channel?(start_pos_in=0)(r:rec_in_channel)=object(self)inheritaugment_raw_in_channelvalmutableclosed=falsevalmutablepos_in=start_pos_inmethodinputspl=ifclosedthenraiseClosed_channel;letn=r#inputsplinpos_in<-pos_in+n;nmethodclose_in()=ifnotclosedthen(closed<-true;r#close_in())methodpos_in=ifclosedthenraiseClosed_channel;pos_inend;;typeeol_status=EOL_not_found|EOL_partially_found ofint(* Position *)|EOL_found ofint*int(* Position, length *)exceptionPass_throughclassbuffered_raw_in_channel?(eol=["\n"])?(buffer_size=4096)?(pass_through =max_int)(ch:raw_in_channel):enhanced_raw_in_channel=object(self)valout=chvalbufsize=buffer_sizevalbuf=Bytes.createbuffer_sizevalmutablebufpos=0valmutablebuflen=0valmutableeof=falsevalmutableclosed=falseinitializerifList.exists(funs->s="")eoltheninvalid_arg"Netchannels.buffered_raw_in_channel";ifList.exists(funs->String.lengths>buffer_size)eoltheninvalid_arg"Netchannels.buffered_raw_in_channel";method inputsposlen=ifclosedthenraiseClosed_channel;tryiflen>0then(ifbufpos=buflenthen(iflen>=pass_throughthenraisePass_throughelseself#refill(););letn=minlen(buflen-bufpos)inBytes.blitbufbufpossposn;bufpos<-bufpos+n;n)else0withPass_through->ch#inputsposlenmethodprivaterefill()=letd=bufposinifd>0&&d<buflenthen(Bytes.blitbufdbuf0(buflen-d));bufpos<-0;buflen<-buflen-d;tryassert(bufsize>buflen);(* otherwise problems... *)letn=ch#inputbufbuflen(bufsize-buflen)in(* or End_of_file *)ifn=0thenraiseSys_blocked_io;buflen<-buflen+n;withEnd_of_fileasexn->eof<-true;raiseexnmethodclose_in()=ifnotclosedthen(ch#close_in();closed<-true)methodpos_in=(ch#pos_in)-(buflen-bufpos)methodprivatefind_eol()=(* Try all strings from [eol] in turn. For every string we may
* have three results:
* - Not found
* - Partially found
* - Found
* The eol delimiter is only found if there are no partial
* results, and at least one positive result. The longest
* string is taken.
*)let find_this_eoleol=(* Try to find the eol string [eol] in [buf] starting at
* [bufpos] up to [buflen]. Return [eol_status].
*)leteol0=eol.[0]intryletk=Bytes.index_frombufbufposeol0in(* or Not_found *)ifk>=buflenthenraiseNot_found;letk'=minbuflen (k+String.lengtheol)inlets=Bytes.sub_stringbufk(k'-k)inifs=eolthenEOL_found(k,String.lengtheol)elseifnot eof &&String.subeol0(String.lengths)=sthenEOL_partially_foundkelseEOL_not_foundwithNot_found->EOL_not_foundinlet recfind_best_eol besteol_result=matcheol_resultwithEOL_not_found ::eol_result'->find_best_eol besteol_result'|EOL_partially_foundposasr::eol_result'->(matchbestwithEOL_partially_foundpos'->ifpos<pos'thenfind_best_eol reol_result'elsefind_best_eol besteol_result'|_->find_best_eol reol_result')|EOL_found(pos,len)asr::eol_result'->(matchbestwithEOL_found(pos',len')->ifpos<pos'||(pos=pos'&&len>len')thenfind_best_eol reol_result'elsefind_best_eol besteol_result'|EOL_partially_found_->find_best_eol besteol_result'|EOL_not_found->find_best_eol reol_result')|[]->bestinleteol_results =List.mapfind_this_eoleolinfind_best_eolEOL_not_foundeol_resultsmethodprivateenhanced_inputsposlen:input_result=ifclosedthenraiseClosed_channel;iflen>0then(ifbufpos=buflenthen(self#refill();(* may raise End_of_file *));letresult=refNoneinwhile!result=Nonedoletbest=self#find_eol()inmatchbestwithEOL_not_found->letn=minlen(buflen-bufpos)inBytes.blitbufbufpossposn;bufpos<-bufpos+n;result:=Some(`Datan)|EOL_found(p,l)->ifp=bufposthen(bufpos<-bufpos+l;result:=Some(`Separator(Bytes.sub_stringbufpl)))else(letn=minlen(p-bufpos)inBytes.blitbufbufpossposn;bufpos<-bufpos+n;result:=Some(`Datan))|EOL_partially_foundp->ifp=bufposthen(tryself#refill()withEnd_of_file->()(* ... and continue! *))else(letn=minlen(p-bufpos)inBytes.blitbufbufpossposn;bufpos<-bufpos+n;result:=Some(`Datan))done;match!resultwithNone->assertfalse|Somer->r)else`Data0methodprivateenhanced_input_line()=ifclosedthenraiseClosed_channel;letb=Netbuffer.create80inleteol_found=reffalseinifbufpos=buflenthen(self#refill();(* may raise End_of_file *));whilenot!eol_founddoletbest=self#find_eol()intrymatchbestwithEOL_not_found->Netbuffer.add_subbytesbbufbufpos(buflen-bufpos);bufpos<-buflen;self#refill();(* may raise End_of_file *)|EOL_partially_foundpos->Netbuffer.add_subbytesbbufbufpos(pos-bufpos);bufpos<-pos;self#refill();(* may raise End_of_file *)|EOL_found(pos,len)->Netbuffer.add_subbytesbbufbufpos(pos-bufpos);bufpos<-pos+len;eol_found:=truewithEnd_of_file->bufpos<-0;buflen<-0;eof<-true;eol_found:=truedone;Netbuffer.contentsbend;;classlift_raw_in_channel_buf?eol?buffer_size ?pass_throughr=object(self)inheritbuffered_raw_in_channel?eol?buffer_size ?pass_throughrinheritaugment_raw_in_channelmethodinput_line()=self#enhanced_input_line()end;;typelift_in_arg=[`Recofrec_in_channel|`Rawofraw_in_channel]letlift_in?(eol=["\n"])?(buffered=true)?buffer_size?pass_through(x:lift_in_arg)=matchxwith`Recrwhennotbuffered ->ifeol<>["\n"]theninvalid_arg"Netchannels.lift_in";newlift_rec_in_channelr|`Recrwhenbuffered->letr'=newlift_rec_in_channelrinnewlift_raw_in_channel_buf~eol?buffer_size ?pass_through(r':>raw_in_channel)|`Rawrwhennotbuffered ->ifeol<>["\n"]theninvalid_arg"Netchannels.lift_in";newlift_raw_in_channelr|`Rawrwhenbuffered->newlift_raw_in_channel_buf~eol?buffer_size ?pass_throughr;;(****************************** output ******************************)exceptionNo_end_of_fileletcopy_channel?(buf=Bytes.create1024)?len(src_ch:in_obj_channel)(dest_ch :out_obj_channel)=(* Copies contents from src_ch to dest_ch. Returns [true] if at EOF.
*)letslen=Bytes.lengthbufinletk=ref0intrywhiletruedoletm=minslen(matchlenwithSomex->x-!k|None->max_int)inifm<=0thenraiseNo_end_of_file;letn=src_ch#inputbuf0minifn=0thenraiseSys_blocked_io;dest_ch#really_outputbuf0n;k:=!k+ndone;assertfalsewithEnd_of_file->true|No_end_of_file->false;;classoutput_channel?(onclose=fun()->())ch(* : out_obj_channel *)=leterrflag=reffalseinletmonitoredfarg=tryletr=farginerrflag:=false;rwith|error->errflag:=true;raiseerrorinobject(self)valch=chvalonclose=onclosevalmutableclosed=falsemethodprivatecomplain_closed()=raiseClosed_channelmethod outputbufposlen=ifclosedthenself#complain_closed();(* output does not support non-blocking I/O directly.
* Work around it: *)letp0=pos_outchintryoutput chbufposlen;errflag:=false;lenwith|Sys_blocked_io->letp1=pos_outchinerrflag:=false;p1 -p0|error->errflag:=true;raiseerrormethodreally_outputbufposlen=ifclosedthenself#complain_closed();monitored(output chbufpos)lenmethodreally_output_stringbufposlen=ifclosedthenself#complain_closed();#ifdefHAVE_BYTESmonitored(output_substringchbufpos)len#elsemonitored(outputchbufpos)len#endifmethod output_charc=ifclosedthenself#complain_closed();monitored(output_charch)cmethodoutput_strings=ifclosedthenself#complain_closed();monitored(output_stringch)smethodoutput_bytess=ifclosedthenself#complain_closed();#ifdefHAVE_BYTESmonitored(output_bytesch)s#elsemonitored(output_stringch)s#endifmethodoutput_byteb=ifclosedthenself#complain_closed();monitored(output_bytech)bmethod output_bufferb=ifclosedthenself#complain_closed();monitored(Buffer.output_bufferch)bmethodoutput_channel?lench=ifclosedthenself#complain_closed();ignore(monitored(copy_channel?lench)(self:#out_obj_channel:>out_obj_channel))methodflush()=ifclosedthenself#complain_closed();monitoredflushchmethodclose_out()=ifnotclosedthen((try(* if !errflag is set, we know that the immediately preceding
operation raised an exception, and we are now likely in the
exception handler
*)if!errflagthenclose_out_noerr chelseclose_outch;closed<-true;with|error->letbt=Printexc.get_backtrace()inNetlog.logf`Err"Netchannels.output_channel: \
Suppressed error in close_out: %s - backtrace: %s"(Netexn.to_stringerror)bt;close_out_noerr ch;closed<-true;);onclose())methodpos_out=ifclosedthenself #complain_closed();pos_outchend;;classoutput_command?onclosecmd=letch=Unix.open_process_outcmdinobject(self)inheritoutput_channel?onclosechassupermethodclose_out()=ifnotclosedthen(letp=Unix.close_process_outchinclosed<-true;onclose();ifp<>Unix.WEXITED 0thenraise(Command_failurep);(* Keep this *))end;;class output_buffer?(onclose=fun()->())buffer:out_obj_channel=object(self)valbuffer=buffervalonclose=onclosevalmutableclosed=falsemethodprivatecomplain_closed()=raiseClosed_channelmethodoutputbufposlen=ifclosedthenself#complain_closed();#ifdefHAVE_BYTESBuffer.add_subbytesbufferbufposlen;#elseBuffer.add_substringbufferbufposlen;#endiflenmethodreally_outputbufposlen=ifclosedthenself#complain_closed();#ifdefHAVE_BYTESBuffer.add_subbytesbufferbufposlen;#elseBuffer.add_substringbufferbufposlen;#endifmethodreally_output_stringbufposlen=ifclosedthenself#complain_closed();Buffer.add_substringbufferbufposlen;methodoutput_charc=ifclosedthenself#complain_closed();Buffer.add_charbuffercmethodoutput_strings=ifclosedthenself#complain_closed();Buffer.add_stringbuffersmethodoutput_bytess=ifclosedthenself#complain_closed();#ifdefHAVE_BYTESBuffer.add_bytesbuffers#elseBuffer.add_stringbuffers#endifmethodoutput_byteb=ifclosedthenself#complain_closed();Buffer.add_charbuffer(Char.chrb)methodoutput_bufferb=ifclosed thenself#complain_closed();Buffer.add_buffer bufferbmethodoutput_channel?lench=ifclosedthenself#complain_closed();ignore(copy_channel?lench(self:#out_obj_channel:>out_obj_channel))methodflush()=ifclosedthenself#complain_closed();()methodclose_out()=ifnotclosed then(closed<-true;onclose())methodpos_out=ifclosedthenself#complain_closed();Buffer.lengthbufferend;;classoutput_netbuffer?(onclose=fun()->())buffer:out_obj_channel=object(self)valbuffer=buffervalonclose =onclosevalmutableclosed=falsevalmutablech_pos=0methodprivatecomplain_closed()=raiseClosed_channelmethodoutputbufposlen=ifclosedthenself#complain_closed();Netbuffer.add_subbytesbufferbuf poslen;ch_pos <-ch_pos+len;lenmethodreally_outputbufposlen=ifclosedthenself#complain_closed();Netbuffer.add_subbytesbufferbufposlen;ch_pos<-ch_pos+len;methodreally_output_stringbufposlen=ifclosedthenself#complain_closed();Netbuffer.add_substringbufferbufposlen;ch_pos<-ch_pos+len;methodoutput_charc=ifclosedthenself#complain_closed();Netbuffer.add_stringbuffer(String.make1c);ch_pos<-ch_pos+1;methodoutput_strings=ifclosedthenself#complain_closed();Netbuffer.add_stringbuffers;ch_pos<-ch_pos+String.lengthsmethodoutput_bytess=ifclosedthenself#complain_closed();Netbuffer.add_bytesbuffers;ch_pos<-ch_pos+Bytes.lengthsmethodoutput_byteb=ifclosedthenself#complain_closed();Netbuffer.add_stringbuffer(String.make1(Char.chrb));ch_pos<-ch_pos+1;methodoutput_bufferb=ifclosedthenself#complain_closed();Netbuffer.add_stringbuffer(Buffer.contents b);ch_pos<-ch_pos+Buffer.lengthb;methodoutput_channel?lench=ifclosedthenself#complain_closed();ignore(copy_channel?lench(self:#out_obj_channel:>out_obj_channel))methodflush()=ifclosedthenself#complain_closed();()methodclose_out()=ifnotclosedthen(closed<-true;onclose())methodpos_out=ifclosedthenself#complain_closed();ch_pos(* We cannot return Netbuffer.length b as [pos_out] (like in the class
* [output_buffer]) because the user of this class is allowed to delete
* data from the netbuffer. So we manually count how many bytes are
* ever appended to the netbuffer.
* This behavior is especially needed by [pipe_channel] below.
*)end;;classoutput_null?(onclose=fun()->())():out_obj_channel=object(self)valmutableclosed=falsevalmutablepos=0method privatecomplain_closed()=raiseClosed_channelmethodoutputsstartlen=ifclosedthenself#complain_closed();pos<-pos+len;lenmethodreally_outputsstartlen=ifclosedthen self#complain_closed();pos<-pos+lenmethodreally_output_stringsstartlen=ifclosedthenself#complain_closed();pos<-pos+lenmethodoutput_char_=ifclosedthenself#complain_closed();pos<-pos+1methodoutput_strings=ifclosedthenself#complain_closed();pos<-pos+String.lengthsmethodoutput_bytess=ifclosedthenself#complain_closed();pos<-pos+Bytes.lengthsmethodoutput_byte_=ifclosedthenself#complain_closed();pos<-pos+1methodoutput_bufferb=ifclosed thenself#complain_closed();pos<-pos+Buffer.length bmethodoutput_channel?lench=ifclosedthenself#complain_closed();ignore(copy_channel?lench(self:#out_obj_channel:>out_obj_channel))methodflush()=ifclosedthenself#complain_closed();methodclose_out()=closed<-truemethodpos_out=ifclosedthenself#complain_closed();posend;;letwith_out_obj_channelchf=tryletresult=fchin(* we _have_ to flush here because close_out often does no longer
report exceptions
*)(trych#flush()withClosed_channel->());(trych#close_out()withClosed_channel->());resultwithe->(trych#close_out()withClosed_channel->());raise e;;classvirtualaugment_raw_out_channel=object(self)method virtualoutput:Bytes.t->int->int->intmethodvirtual close_out:unit->unitmethodvirtualflush:unit->unitmethodvirtualpos_out:intmethodreally_outputsposlen=letrecprint_restn=ifn<lenthenletm=self#outputs(pos+n)(len-n)inifm=0thenraiseSys_blocked_io;print_rest(n+m)else()inprint_rest0methodreally_output_stringsposlen=self#really_output(Bytes.unsafe_of_strings)poslenmethodoutput_charc=ignore(self#output(Bytes.make1c)01)methodoutput_byten=ignore(self#output(Bytes.make1(Char.chrn))01)methodoutput_strings=self#really_output_strings0(String.lengths)methodoutput_bytess=self #really_outputs0(Bytes.lengths)methodoutput_bufferb=self#output_string(Buffer.contentsb)methodoutput_channel?lench=ignore(copy_channel?lench(self:#out_obj_channel:>out_obj_channel))end;;classlift_raw_out_channel(r:raw_out_channel)=object(self)inheritaugment_raw_out_channelmethodoutputspl=r#outputsplmethodflush ()=r#flush()methodclose_out()=r#close_out()methodpos_out=r#pos_outend;;classlift_rec_out_channel?(start_pos_out=0)(r:rec_out_channel)=object(self)inheritaugment_raw_out_channelvalmutableclosed=falsevalmutablepos_out=start_pos_outmethodoutputspl=ifclosedthenraiseClosed_channel;letn=r#outputsplinpos_out<-pos_out+n;nmethodflush()=ifclosedthenraiseClosed_channel;r#flush();methodclose_out()=ifnotclosedthen(closed <-true;r#close_out())method pos_out=ifclosed thenraiseClosed_channel;pos_outend;;classbuffered_raw_out_channel?(buffer_size=4096)?(pass_through =max_int)(ch:raw_out_channel):raw_out_channel=object(self)val out=chvalbufsize=buffer_sizevalbuf=Bytes.createbuffer_sizevalmutablebufpos=0valmutableclosed=falsemethodoutputsposlen=ifclosedthenraiseClosed_channel;ifbufpos=0&&len>=pass_throughthench#outputsposlenelseletn=minlen(bufsize-bufpos)inBytes.blitsposbufbufposn;bufpos<-bufpos+n;ifbufpos=bufsizethenself#flush();nmethodflush()=letk=ref0inwhile!k<bufposdok:=!k+(ch#outputbuf!k(bufpos-!k))done;bufpos<-0;ch#flush()method close_out()=ifnotclosedthen((tryself#flush()with|error->letbt=Printexc.get_backtrace()inNetlog.logf`Err"Netchannels.buffered_raw_out_channel: \
Suppressed error in close_out: %s - backtrace: %s"(Netexn.to_stringerror)bt;);ch#close_out();closed<-true)methodpos_out =(ch#pos_out)+bufposend;;typelift_out_arg=[`Recofrec_out_channel|`Rawofraw_out_channel]letlift_out?(buffered=true)?buffer_size?pass_through(x:lift_out_arg)=matchxwith`Recrwhennotbuffered->newlift_rec_out_channel r|`Recrwhenbuffered->letr'=newlift_rec_out_channelrinletr''=newbuffered_raw_out_channel?buffer_size ?pass_through(r':>raw_out_channel)innewlift_raw_out_channel r''|`Rawrwhennotbuffered->newlift_raw_out_channelr|`Rawrwhenbuffered->letr'=newbuffered_raw_out_channel?buffer_size ?pass_throughrinnewlift_raw_out_channelr';;(************************* raw channels *******************************)letnorestart___farg=tryfargwith|Unix.Unix_error(Unix.EAGAIN,_,_)|Unix.Unix_error(Unix.EWOULDBLOCK,_,_)|Netsys_types.EAGAIN_RD|Netsys_types.EAGAIN_WR ->0letshutdown_fdmodefd_stylefd=tryignore(Netsys.restart_waitmodefd_stylefd(fun()->Netsys.gshutdownfd_stylefdUnix.SHUTDOWN_ALL;0)())with|Netsys.Shutdown_not_supported->()|Unix.Unix_error(Unix.EPERM,_,_)->()classinput_descr_prelim?(blocking=true)?(start_pos_in=0)?fd_stylefd=letfd_style=match fd_stylewith|None->Netsys.get_fd_stylefd|Somest->stinletwrapper=ifblockingthenNetsys.restart_waitelsenorestart inobject(self)valfd_in=fdvalmutablepos_in=start_pos_invalmutableclosed_in=falsemethodprivatecomplain_closed()=raiseClosed_channelmethodinputbufposlen=ifclosed_inthenself#complain_closed();wrapper`Rfd_stylefd(fun()->letn=Netsys.greadfd_stylefd_inbufposleninpos_in<-pos_in+n;ifn=0&&len>0thenraiseEnd_of_file;n)()method close_in()=ifnotclosed_inthen((* The gshutdown call only exists because of TLS: *)shutdown_fd`Rfd_stylefd;Netsys.gclose fd_stylefd_in;closed_in <-true)methodpos_in=ifclosed_in thenself#complain_closed();pos_inend;;classinput_descr ?blocking?start_pos_in?fd_stylefd:raw_in_channel =input_descr_prelim?blocking?start_pos_in?fd_style fd;;classoutput_descr_prelim?(blocking=true)?(start_pos_out=0)?fd_stylefd=letfd_style=match fd_stylewith|None->Netsys.get_fd_stylefd|Somest->stinletwrapper=ifblockingthenNetsys.restart_waitelsenorestartinobject(self)valfd_out=fdval mutablepos_out=start_pos_outvalmutableclosed_out=falsemethodprivatecomplain_closed()=raiseClosed_channelmethodoutputbufpos len=ifclosed_outthenself#complain_closed();wrapper`Wfd_stylefd(fun()->letn=Netsys.gwritefd_stylefd_outbufposleninpos_out<-pos_out+n;n)()methodclose_out()=ifnotclosed_outthen((* FIXME. We block here even when non-blocking semantics
is requested. We do this because most programmers would
be surprised to get EAGAIN when closing a channel.
Actually, this only affects Win32 output threads and TLS.
*)shutdown_fd`Wfd_stylefd;Netsys.gclosefd_stylefd_out;closed_out<-true)methodpos_out=ifclosed_out thenself#complain_closed();pos_outmethodflush()=ifclosed_outthenself #complain_closed()end;;classoutput_descr?blocking?start_pos_out?fd_stylefd:raw_out_channel=output_descr_prelim?blocking?start_pos_out?fd_stylefd;;classsocket_descr?blocking?(start_pos_in=0)?(start_pos_out=0)?fd_stylefd:raw_io_channel=letfd_style=matchfd_stylewith|None->Netsys.get_fd_stylefd|Somest->stinlet()=matchfd_stylewith|`Recv_send_|`Recv_send_implied|`W32_pipe|`TLS_->()|_->failwith "Netchannels.socket_descr: This type of descriptor is \
unsupported"inobject(self)inherit input_descr_prelim?blocking~start_pos_in~fd_stylefdinheritoutput_descr_prelim?blocking~start_pos_out~fd_stylefdmethodprivate gen_closecmd=shutdown_fd`Wfd_stylefd;ifcmd=Unix.SHUTDOWN_ALLthenNetsys.gclosefd_stylefdmethodclose_in()=ifnotclosed_inthen(closed_in<-true;ifclosed_outthenself#gen_closeUnix.SHUTDOWN_ALLelseself#gen_closeUnix.SHUTDOWN_RECEIVE)methodclose_out()=ifnotclosed_outthen(closed_out<-true;ifclosed_inthenself#gen_closeUnix.SHUTDOWN_ALLelseself #gen_closeUnix.SHUTDOWN_SEND)end;;(************************** transactional *****************************)classbuffered_trans_channel?(close_mode=(`Commit:close_mode))(ch:out_obj_channel):trans_out_obj_channel =letclosed=reffalseinlettransbuf=ref(Buffer.create50)inlettrans=ref(newoutput_buffer!transbuf)inletreset()=transbuf:=Buffer.create50;trans:=newoutput_buffer!transbufinobject(self)valout=chvalclose_mode=close_modemethodoutput=!trans#outputmethodreally_output=!trans#really_outputmethodreally_output_string=!trans#really_output_stringmethodoutput_char=!trans#output_charmethodoutput_string=!trans#output_stringmethodoutput_bytes=!trans#output_bytesmethodoutput_byte=!trans#output_bytemethodoutput_buffer=!trans#output_buffermethodoutput_channel=!trans#output_channelmethodflush=!trans#flushmethodclose_out()=ifnot!closedthen((try(match close_modewith`Commit->self#commit_work()|`Rollback->self#rollback_work())with|error->letbt=Printexc.get_backtrace()inNetlog.logf`Err"Netchannels.buffered_trans_channel: \
Suppressed error in close_out: %s - backtrace: %s"(Netexn.to_stringerror)bt;);!trans#close_out();out#close_out();closed:=true)methodpos_out=out#pos_out+!trans#pos_outmethodcommit_work()=try(* in any way avoid that the contents of transbuf are printed twice *)letb=!transbufinreset();out#output_bufferb;out#flush();witherr ->self#rollback_work();(* reset anyway *)raiseerrmethodrollback_work()=reset()end;;letmake_temporary_file?(mode=0o600)?(limit=1000)?(tmp_directory=Netsys_tmp.tmp_directory())?(tmp_prefix="netstring")()=(* Returns (filename, in_channel, out_channel). *)letrectry_creationn=tryletfn=Filename.concattmp_directory(Netsys_tmp.tmp_prefixtmp_prefix^"-"^(string_of_intn))inletfd_in=Unix.openfilefn[Unix.O_RDWR;Unix.O_CREAT;Unix.O_EXCL]modeinletfd_out =Unix.openfile fn[Unix.O_RDWR]modein(* For security reasons check that fd_in and fd_out are the same file: *)letstat_in =Unix.fstatfd_ininletstat_out=Unix.fstat fd_outinifstat_in.Unix.st_dev<>stat_out.Unix.st_dev||stat_in.Unix.st_rdev<>stat_out.Unix.st_rdev||stat_in.Unix.st_ino<>stat_out.Unix.st_inothenraise(Sys_error("File has been replaced (security alert)"));letch_in=Unix.in_channel_of_descrfd_ininletch_out=Unix.out_channel_of_descrfd_outinfn,ch_in,ch_outwithUnix.Unix_error(Unix.EEXIST,_,_)->(* This does not look very intelligent, but it is the only chance
* to limit the number of trials.
* Note that we get EACCES if thedirectory is not writeable.
*)ifn>limitthenfailwith("Netchannels: Cannot create temporary file - too many files in this temp directory: "^tmp_directory);try_creation(n+1)|Unix.Unix_error(e,_,_)->raise(Sys_error("Cannot create a temporary file in the directory " ^tmp_directory^": "^Unix.error_message e))intry_creation0;;classtempfile_trans_channel ?(close_mode =(`Commit:close_mode))?tmp_directory?tmp_prefix(ch:out_obj_channel):trans_out_obj_channel =let_transname,_transch_in,_transch_out=make_temporary_file?tmp_directory?tmp_prefix()inletclosed=reffalseinobject(self)valtransch_out=_transch_outvalmutabletransch_in=_transch_invaltrans=newoutput_channel_transch_outvalmutableout=chvalclose_mode=close_modevalmutableneed_clear=falseinitializertrySys.remove_transname;(* Remove the file immediately. This requires "Unix semantics" of the
* underlying file system, because we don't remove the file but only
* the entry in the directory. So we can read and write the file and
* allocate disk space, but the file is private from now on. (It's
* not fully private, because another process can obtain a descriptor
* between creation of the file and removal of the entry. We should
* keep that in mind if privacy really matters.)
* The disk space will be freed when the descriptor is closed.
*)witherr->close_in_transch_in;close_out_transch_out;raiseerrmethodoutput=ifneed_clearthenself#clear();trans#outputmethodreally_output=ifneed_clearthenself#clear();trans#really_outputmethodreally_output_string=ifneed_clearthenself#clear();trans#really_output_stringmethodoutput_char=ifneed_clearthenself#clear();trans#output_charmethodoutput_string=ifneed_clearthenself#clear();trans#output_stringmethodoutput_bytes=ifneed_clearthenself#clear();trans#output_bytesmethodoutput_byte=ifneed_clearthenself#clear();trans#output_bytemethodoutput_buffer=ifneed_clearthenself#clear();trans#output_buffermethodoutput_channel=ifneed_clearthenself#clear();trans#output_channelmethodflush=ifneed_clearthenself#clear();trans#flushmethodclose_out()=ifnot!closedthen(ifneed_clearthenself#clear();(try(match close_modewith`Commit->self#commit_work()|`Rollback->self#rollback_work())with|error->letbt=Printexc.get_backtrace()inNetlog.logf`Err"Netchannels.tempfile_trans_channel: \
Suppressed error in close_out: %s - backtrace: %s"(Netexn.to_stringerror)bt;);close_intransch_in;trans#close_out();(* closes transch_out *)out#close_out();closed:=true)methodpos_out=ifneed_clearthenself#clear();out#pos_out+trans#pos_outmethodcommit_work()=need_clear<-true;letlen=trans #pos_out intrans#flush();seek_intransch_in0;lettrans'=newinput_channeltransch_inin(tryout#output_channel~lentrans';out#flush();witherr->self#rollback_work();raiseerr);self#clear()methodrollback_work()=self#clear()methodprivateclear()=(* delete the contents of the file *)(* First empty the file and reset the output channel: *)seek_outtransch_out0;Unix.ftruncate(Unix.descr_of_out_channeltransch_out)0;(* Renew the input channel. We create a new channel to avoid problems
* with the internal buffer of the channel.
* (Problem: transch_in has an internal buffer, and the buffer contains
* old data now. So we drop the channel and create a new channel for the
* same file descriptor. Note that we cannot set the file offset with
* seek_in because neither the old nor the new channel is properly
* synchronized with the file. So we fall back to lseek.)
*)letfd=Unix.descr_of_in_channeltransch_ininignore(Unix.lseekfd0Unix.SEEK_END);(* set the offset *)transch_in<-Unix.in_channel_of_descrfd;(* renew channel *)(* Now check that everything worked: *)assert(pos_intransch_in=0);assert(in_channel_lengthtransch_in=0);(* Note: the old transch_in will be automatically finalized, but the
* underlying file descriptor will not be closed in this case
*)need_clear <-falseend;;letid_conv incomingincoming_eofoutgoing=(* Copies everything from [incoming]to [outgoing] *)letlen=Netbuffer.length incominginignore(Netbuffer.add_inplace~lenoutgoing(funs_outgoingposlen'->assert(len=len');Netbuffer.blitincoming0s_outgoingposlen';Netbuffer.clearincoming;len'));;let call_inputrefillfarg=(* Try to satisfy the request: *)try fargwithBuffer_underrun->(* Not enough data in the outgoing buffer. *)refill();farg;;class pipe?(conv=id_conv)?(buffer_size=1024)():io_obj_channel=let_incoming=Netbuffer.createbuffer_sizeinlet_outgoing=Netbuffer.createbuffer_sizeinobject(self)(* The properties as "incoming buffer" [output_super] are simply inherited
* from [output_netbuffer]. The "outgoing buffer" [input_super] invocations
* are delegated to [input_netbuffer]. Inheritance does not work because
* there is no way to make the public method [shutdown] private again.
*)inheritoutput_netbuffer_incomingasoutput_supervalconv=convvalincoming=_incomingvaloutgoing=_outgoingvalinput_super=newinput_netbuffer_outgoingvalmutableincoming_eof=falsevalmutablepos_in=0(* We must count positions ourselves. Can't use input_super#pos_in
* because conv may manipulate the buffer.
*)valmutableoutput_closed=false(*Input methods: *)methodprivaterefill()=convincomingincoming_eofoutgoing;ifincoming_eoftheninput_super#shutdown()methodinputstrposlen=letn=call_inputself#refill(input_super#inputstrpos)leninpos_in<-pos_in +n;nmethodinput_line()=letp=input_super#pos_ininletline=call_inputself#refill(input_super#input_line)()inletp'=input_super#pos_ininpos_in<-pos_in+(p'-p);linemethodreally_inputstrposlen=call_inputself#refill(input_super#really_inputstrpos)len;pos_in<-pos_in+lenmethodreally_input_stringlen=letbuf=Bytes.createlenincall_inputself#refill(input_super#really_inputbuf0)len;pos_in<-pos_in+len;Bytes.unsafe_to_stringbufmethodinput_char()=letc=call_inputself#refill(input_super#input_char)()inpos_in<-pos_in+1;cmethodinput_byte()=letb=call_inputself#refill(input_super#input_byte)()inpos_in<-pos_in+1;bmethodclose_in()=(* [close_in] implies [close_out]: *)ifnotoutput_closedthen(output_super#close_out();output_closed<-true;);input_super#close_in()methodpos_in=pos_in(* [close_out] also shuts down the input side of the pipe. *)methodclose_out()=ifnotoutput_closedthen (output_super #close_out();output_closed<-true;);incoming_eof<-trueendclassoutput_filter(p:io_obj_channel)(out:out_obj_channel):out_obj_channel=object(self)valp=pvalmutablep_closed=false(* output side of p is closed *)valout=outvalbuf=Bytes.create1024(* for copy_channel *)methodoutputsposlen=ifp_closedthenraiseClosed_channel;letn=p#output sposleninself#transfer();nmethodreally_output sposlen=ifp_closed thenraiseClosed_channel;p#really_outputsposlen;self#transfer();methodreally_output_stringsposlen=ifp_closedthenraiseClosed_channel;p#really_output_stringsposlen;self#transfer();methodoutput_charc=ifp_closedthenraiseClosed_channel;p#output_charc;self#transfer();methodoutput_strings=ifp_closedthenraiseClosed_channel;p#output_strings;self#transfer();methodoutput_bytess=ifp_closedthenraiseClosed_channel;p#output_bytess;self#transfer();methodoutput_byteb=ifp_closedthenraiseClosed_channel;p#output_byteb;self#transfer();methodoutput_bufferb=ifp_closedthenraiseClosed_channel;p#output_bufferb;self#transfer();methodoutput_channel?lench=(* To avoid large intermediate buffers, the channel is copied * chunk by chunk *)ifp_closedthenraiseClosed_channel;letlen_to_do=ref(matchlenwithNone->-1|Somel->max0l)inletbuf=bufinwhile!len_to_do<>0doletn=if!len_to_do<0then1024elsemin!len_to_do1024inifcopy_channel~buf~len:nch(p:>out_obj_channel)then(* EOF *)len_to_do:=0elseif!len_to_do>=0then(len_to_do:=!len_to_do-n;assert(!len_to_do>=0));self#transfer();donemethodflush()=p#flush();self#transfer();out#flush()methodclose_out()=ifnotp_closedthen(p#close_out();p_closed<-true;(tryself#transfer()with|error->(* We report the error. However, we prevent that another,
immediately following [close_out] reports the same
error again. This is done by setting p_closed.
*)raiseerror))methodpos_out=p#pos_outmethodprivatetransfer()=(* Copy as much as possible from [p] to [out] *)try(* Call [copy_channel] directly (and not the method [output_channel])
* because we can pass the copy buffer ~buf*)ignore(copy_channel~buf(p:>in_obj_channel)out);out#flush();withBuffer_underrun->()endletrecfilter_inputrefillfarg=(* Try to satisfy the request: *)tryfargwithBuffer_underrun->(* Not enough data in the outgoing buffer. *)refill();filter_inputrefillfarg;;classinput_filter(inp:in_obj_channel)(p:io_obj_channel):in_obj_channel=object(self)valinp=inpvalp=pvalbuf=Bytes.create1024(* for copy_channel *)methodprivaterefill()=(* Copy some data from [inp] to [p] *)(* Call [copy_channel] directly (and not the method [output_channel])
* because we can pass the copy buffer ~buf
*)leteof=copy_channel~len:(Bytes.lengthbuf)~bufinp(p:>out_obj_channel)inifeofthenp#close_out();methodinputstrpos=filter_inputself#refill(p#inputstr pos)methodinput_line=filter_inputself#refill(p#input_line)methodreally_inputstrpos=filter_inputself#refill(p#really_inputstrpos)methodreally_input_string=filter_inputself#refillp#really_input_stringmethodinput_char=filter_inputself#refill(p#input_char)methodinput_byte=filter_inputself#refill(p#input_byte)methodclose_in()=p#close_in();methodpos_in=p#pos_inend