Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file iO.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325(******************************************************************************
* capnp-ocaml
*
* Copyright (c) 2013-2014, Paul Pelzl
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
******************************************************************************)openCapnpmoduleDeque=Core_kernel.Dequetypecompression_t=[`None|`Packing]exceptionUnsupported_message_framemoduleWriteContext=structtype'at={(** File descriptor we're writing to *)fd:'a;(** Compression format *)comp:compression_t;(** Function for writing to the descriptor *)write:'a->buf:string->pos:int->len:int->int;(** Data remaining to write to the descriptor *)fragments:stringDeque.t;(** Total number of bytes stored in [fragments] *)mutablefragments_size:int;(** Position within the first fragment where writing should begin *)mutablefirst_fragment_pos:int;}letcreate~write~compressionfd={fd;comp=compression;write;fragments=Deque.create();fragments_size=0;first_fragment_pos=0;}letenqueue_messagecontextmessage=Codecs.serialize_itermessage~compression:context.comp~f:(funbuf->Deque.enqueue_backcontext.fragmentsbuf;context.fragments_size<-context.fragments_size+(String.lengthbuf))letbytes_remainingcontext=context.fragments_size-context.first_fragment_posletwritecontext=ifDeque.is_emptycontext.fragmentsthen0elseletfirst_fragment=Deque.peek_front_exncontext.fragmentsinletfirst_fragment_remaining=String.lengthfirst_fragment-context.first_fragment_posinletbytes_written=context.writecontext.fd~buf:first_fragment~pos:context.first_fragment_pos~len:first_fragment_remaininginlet()=ifbytes_written=first_fragment_remainingthenlet(_:string)=Deque.dequeue_front_exncontext.fragmentsinlet()=context.fragments_size<-context.fragments_size-(String.lengthfirst_fragment)incontext.first_fragment_pos<-0elsecontext.first_fragment_pos<-context.first_fragment_pos+bytes_writteninbytes_writtenletwrite_messagecontextmessage=let()=enqueue_messagecontextmessageinwhilebytes_remainingcontext>0dolet(_:int)=writecontextin()doneendmoduleReadContext=structtype'at={(** File descriptor we're writing to *)fd:'a;(** Stream format *)stream:Codecs.FramedStream.t;(** Function for reading from the descriptor *)read:'a->buf:Bytes.t->pos:int->len:int->int;(** Persistent read buffer *)read_buf:Bytes.t;}letcreate~read~compressionfd={fd;stream=Codecs.FramedStream.emptycompression;read;read_buf=Bytes.create(64*1024);(* Size of ocaml internal Unix buffer *)}letdequeue_messagecontext=matchCodecs.FramedStream.get_next_framecontext.streamwith|Result.Okmessage->Somemessage|Result.ErrorCodecs.FramingError.Incomplete->None|Result.ErrorCodecs.FramingError.Unsupported->raiseUnsupported_message_frameletbytes_availablecontext=Codecs.FramedStream.bytes_availablecontext.streamletreadcontext=letbytes_read=context.readcontext.fd~buf:context.read_buf~pos:0~len:(Bytes.lengthcontext.read_buf)inifbytes_read>0thenletstr_buf=Bytes.unsafe_to_stringcontext.read_bufinletsubstr=StringLabels.substr_buf~pos:0~len:bytes_readinlet()=Codecs.FramedStream.add_fragmentcontext.streamsubstrinbytes_readelsebytes_readletread_messagecontext=letrecloop()=matchdequeue_messagecontextwith|Somemessage->Somemessage|None->letbytes_read=readcontextinifbytes_read=0thenNoneelseloop()inloop()endletrecloop_eintrf=tryf()withUnixLabels.Unix_error(UnixLabels.EINTR,_,_)->loop_eintrfletcreate_write_context_for_fd?(restart=true)~compressionfd=letunix_writefd'~buf~pos~len=letf()=UnixLabels.single_writefd'~buf:(Bytes.unsafe_of_stringbuf)~pos~leninifrestartthenloop_eintrfelsef()inWriteContext.create~write:unix_write~compressionfdletcreate_write_context_for_channel~compressionchan=letchan_writechan'~buf~pos~len=let()=Core_kernel.Out_channel.output_substringchan'~buf~pos~leninleninWriteContext.create~write:chan_write~compressionchanletcreate_read_context_for_fd?(restart=true)~compressionfd=letunix_readfd'~buf~pos~len=letf()=UnixLabels.readfd'~buf~pos~leninifrestartthenloop_eintrfelsef()inReadContext.create~read:unix_read~compressionfdletcreate_read_context_for_channel~compressionchan=letin_chan_readic~buf~pos~len=Pervasives.inputicbufposleninReadContext.create~read:in_chan_read~compressionchanletwrite_message_to_fd?(restart=true)~compressionmessagefd=letcontext=create_write_context_for_fd~restart~compressionfdinlet()=WriteContext.enqueue_messagecontextmessageinwhileWriteContext.bytes_remainingcontext>0dotrylet(_:int)=WriteContext.writecontextin()with|UnixLabels.Unix_error(UnixLabels.EAGAIN,_,_)|UnixLabels.Unix_error(UnixLabels.EWOULDBLOCK,_,_)->(* Avoid burning CPU time looping on EAGAIN *)let(_,_,_)=letselect()=UnixLabels.select~read:[]~write:[fd]~except:[fd]~timeout:(-1.0)inifrestartthenloop_eintrselectelseselect()in()doneletwrite_message_to_channel~compressionmessagechan=letcontext=create_write_context_for_channel~compressionchaninWriteContext.write_messagecontextmessageletwrite_message_to_file?perm~compressionmessagefilename=Core_kernel.Out_channel.with_filefilename~binary:true?perm~f:(funoc->write_message_to_channel~compressionmessageoc)letwrite_message_to_file_robust?perm~compressionmessagefilename=letparent_dir=Filename.dirnamefilenameinlettmp_prefix=(Filename.basenamefilename)^"-tmp"inlet(tmp_filename,tmp_oc)=Filename.open_temp_file~mode:[Open_binary]~temp_dir:parent_dirtmp_prefix""inlet()=Core_kernel.Exn.protectxtmp_oc~finally:Core_kernel.Out_channel.close~f:(funoc->let()=write_message_to_channel~compressionmessageocinlet()=Core_kernel.Out_channel.flushocinletfd=UnixLabels.descr_of_out_channeltmp_ocinExtUnix.Specific.fsyncfd)inlet()=UnixLabels.rename~src:tmp_filename~dst:filenameinlet()=(* [open_temp_file] always creates as 0o600, so we may need to touch up permissions *)matchpermwith|Someperm->UnixLabels.chmodfilename~perm|None->()in(* Attempt to sync directory metadata, so the rename is durably
recorded. May not work as expected on all platforms, so
suppress errors. *)tryletfd=UnixLabels.openfileparent_dir~mode:[UnixLabels.O_RDONLY]~perm:0o600inCore_kernel.Exn.protectxfd~finally:UnixLabels.close~f:ExtUnix.Specific.fsyncwithUnix.Unix_error(_,_,_)->()letread_single_message_from_fd?(restart=true)~compressionfd=letcontext=create_read_context_for_fd~restart~compressionfdinletrecread_loop()=tryReadContext.readcontextwith|UnixLabels.Unix_error(UnixLabels.EAGAIN,_,_)|UnixLabels.Unix_error(UnixLabels.EWOULDBLOCK,_,_)->(* Avoid burning CPU time looping on EAGAIN *)let(_,_,_)=letselect()=UnixLabels.select~read:[fd]~write:[]~except:[fd]~timeout:(-1.0)inifrestartthenloop_eintrselectelseselect()inread_loop()inletrecloop()=letbytes_read=read_loop()inifbytes_read=0thenNoneelsematchReadContext.dequeue_messagecontextwith|Somemessage->Somemessage|None->loop()inloop()letread_single_message_from_channel~compressionchan=letcontext=create_read_context_for_channel~compressionchaninletrecloop()=letbytes_read=ReadContext.readcontextinifbytes_read=0thenNoneelsematchReadContext.dequeue_messagecontextwith|Somemessage->Somemessage|None->loop()inloop()letread_message_from_file~compressionfilename=Core_kernel.In_channel.with_filefilename~binary:true~f:(funic->read_single_message_from_channel~compressionic)