Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file caqti_async.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103(* Copyright (C) 2014--2018 Petter A. Urkedal <paurkedal@gmail.com>
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version, with the OCaml static compilation exception.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this library. If not, see <http://www.gnu.org/licenses/>.
*)(* This is partly based on https://github.com/janestreet/lwt-async *)openCaqti_prereqopenCoreopenAsyncmoduleSystem=structtype'afuture='aDeferred.tlet(>>=)mf=Deferred.bindm~flet(>|=)=Deferred.(>>|)letreturn=Deferred.returnletjoin=Deferred.all_unitmoduleMvar=structtype'at='aIvar.tletcreate=Ivar.createletstorexv=Ivar.fillvxletfetchv=Ivar.readvendmoduleUnix=structtypefile_descr=Async_unix.Fd.tletfdinfo=Info.of_string"Caqti_async file descriptor"letwrap_fdfufd=letfd=Fd.create(Fd.Kind.Socket`Active)ufdfdinfoinletopenDeferredinffd>>=funr->Fd.(close~file_descriptor_handling:Do_not_close_file_descriptor)fd>>=fun()->returnrletpoll?(read=false)?(write=false)?timeoutfd=letwait_read=ifreadthenAsync_unix.Fd.ready_tofd`ReadelseDeferred.never()inletwait_write=ifwritethenAsync_unix.Fd.ready_tofd`WriteelseDeferred.never()inletwait_timeout=(matchtimeoutwith|Somet->Clock.after(Time.Span.of_sect)|None->Deferred.never())inletdid_read,did_write,did_timeout=reffalse,reffalse,reffalseinDeferred.enabled[Deferred.choicewait_read(funst->did_read:=st=`Ready);Deferred.choicewait_write(funst->did_write:=st=`Ready);Deferred.choicewait_timeout(fun()->did_timeout:=true);]>>|(funf->ignore(f());(!did_read,!did_write,!did_timeout))endmoduleLog=structtype'alog=('a,unitDeferred.t)Logs.msgf->unitDeferred.t(* Based on Logs_lwt.kmsg. *)letkmsg?(src=Logs.default)levelmsgf=letcount_it()=(matchlevelwith|Logs.Error->Logs.incr_err_count()|Logs.Warning->Logs.incr_warn_count()|_->())in(matchLogs.Src.levelsrcwith|None->return()|Somelevel'whenlevel>level'->count_it();return()|Some_->count_it();letivar=Ivar.create()inletk()=Ivar.readivarinletover()=Ivar.fillivar()inLogs.reportsrclevel~overkmsgf)leterr?(src=default_log_src)msgf=kmsg~srcLogs.Errormsgfletwarn?(src=default_log_src)msgf=kmsg~srcLogs.Warningmsgfletinfo?(src=default_log_src)msgf=kmsg~srcLogs.Infomsgfletdebug?(src=default_log_src)msgf=kmsg~srcLogs.DebugmsgfendmodulePreemptive=structletdetachfx=In_thread.run(fun()->fx)letrun_in_mainf=Thread_safe.block_on_async_exnfendendincludeCaqti_connect.Make_unix(System)