Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file caqti1_async.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112(* Copyright (C) 2017--2018 Petter A. Urkedal <paurkedal@gmail.com>
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version, with the OCaml static compilation exception.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this library. If not, see <http://www.gnu.org/licenses/>.
*)[@@@ocaml.warning"-3"]openCoreopenAsyncmoduleSystem=structtype'aio='aDeferred.Or_error.tlet(>>=)mf=Deferred.Or_error.bindm~flet(>|=)=Deferred.Or_error.(>>|)letreturn=Deferred.Or_error.returnletfail=Deferred.Or_error.of_exnletjoin=Deferred.Or_error.all_unitletcatchfg=letopenDeferredinf()>>=function|Errorerr->g(Error.to_exnerr)|Ok_asr->returnrmoduleMvar=structtype'at='aIvar.tletcreate=Ivar.createletstorexv=Ivar.fillvxletfetchv=letopenDeferredinIvar.readv>>=funx->Deferred.Or_error.returnxendmoduleUnix=structtypefile_descr=Async_unix.Fd.tletfdinfo=Info.of_string"Caqti_async file descriptor"letwrap_fdfufd=letfd=Fd.create(Fd.Kind.Socket`Active)ufdfdinfoinletopenDeferredinffd>>=funr->Fd.(close~file_descriptor_handling:Do_not_close_file_descriptor)fd>>=fun()->returnrletpoll?(read=false)?(write=false)?timeoutfd=letwait_read=ifreadthenAsync_unix.Fd.ready_tofd`ReadelseDeferred.never()inletwait_write=ifwritethenAsync_unix.Fd.ready_tofd`WriteelseDeferred.never()inletwait_timeout=(matchtimeoutwith|Somet->Clock.after(Time.Span.of_sect)|None->Deferred.never())inletdid_read,did_write,did_timeout=reffalse,reffalse,reffalseinDeferred.enabled[Deferred.choicewait_read(funst->did_read:=st=`Ready);Deferred.choicewait_write(funst->did_write:=st=`Ready);Deferred.choicewait_timeout(fun()->did_timeout:=true);]>>|(funf->ignore(f());Ok(!did_read,!did_write,!did_timeout))endmoduleLog=structletlog_flevelfmt=ksprintf(funs->Log.string~level(Lazy.forceLog.Global.log)s;return())fmtleterror_ffmt=log_f`Errorfmtletwarning_ffmt=log_f`Infofmtletinfo_ffmt=log_f`Infofmtletdebug_ffmt=log_f`Debugfmt(* TODO: Check how log filtering works in async. *)letdebug_query_enabled()=falseletdebug_tuple_enabled()=falseletdebug_queryqiparams=beginmatchqiwith|`Oneshotqs->log_f`Debug"Sent query: %s"qs|`Prepared(qn,qs)->log_f`Debug"Sent query %s: %s"qnqsend>>=fun()->ifparams=[]thenreturn()elselog_f`Debug"with parameters: %s"(String.concat~sep:", "params)letdebug_tupletuple=log_f`Debug"Received tuple: %s"(String.concat~sep:", "tuple)endmodulePreemptive=structletdetachfx=In_thread.run(fun()->Or_error.try_with(fun()->fx))letrun_in_mainf=Or_error.ok_exn(Thread_safe.block_on_async_exnf)endendincludeCaqti1_connect.Make(System)