Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file service.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264openLwt.SyntaxopenModelmoduleDefault:Sig.SERVICE=structletprint_pool_usagepool=letn_connections=Caqti_lwt.Pool.sizepoolinletmax_connections=Option.value(Core.Configuration.read_int"DATABASE_POOL_SIZE")~default:10inLogs.debug(funm->m"DB: Pool usage: %i/%i"n_connectionsmax_connections);;letcreate_pool()=match!pool_refwith|Somepool->Logs.debug(funm->m"DB: Skipping pool creation, re-using existing pool");pool|None->letpool_size=Option.value(Core.Configuration.read_int"DATABASE_POOL_SIZE")~default:10inLogs.debug(funm->m"DB: Create pool with size %i"pool_size);Option.get("DATABASE_URL"|>Core.Configuration.read_string)|>Uri.of_string|>Caqti_lwt.connect_pool~max_size:pool_size|>(function|Okpool->pool_ref:=Somepool;pool|Errorerr->letmsg="DB: Failed to connect to DB pool"inLogs.err(funm->m"%s %s"msg(Caqti_error.showerr));raise(Exception("DB: Failed to create pool "^msg)));;letctx_with_pool()=letpool=create_pool()inCore.Ctx.(empty|>add_poolpool);;letadd_poolctx=letpool=create_pool()inadd_poolpoolctx;;letqueryctxf=matchfind_transactionctx,find_connectionctx,find_poolctxwith|Someconnection,None,None->let*result=fconnectionin(matchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|None,Someconnection,_->let*result=fconnectionin(matchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|None,None,Somepool->print_pool_usagepool;let*result=Caqti_lwt.Pool.usefpoolin(matchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|Some_,Some_,Some_->Logs.err(funm->m"DB: Connection AND transaction AND pool found in context, this should never \
happen and might indicate connection leaks. Please report this issue.");Lwt.fail(Exception"Connection and pool found")|_->Logs.err(funm->m"DB: No connection pool found");Logs.info(funm->m"DB: Have you applied the DB middleware?");Lwt.fail(Exception"No connection pool found");;letwith_connectionctxf=matchfind_transactionctx,find_connectionctx,find_poolctxwith|Some_,None,None->ctx|>remove_pool|>f|None,Some_,None->ctx|>remove_pool|>f|None,None,Somepool->print_pool_usagepool;let*pool_result=Caqti_lwt.Pool.use(funconnection->Logs.debug(funm->m"DB TX: Fetched connection from pool");let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninletctx_with_connection=ctx|>remove_pool|>add_connection(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninLwt.return@@Okresult)(fune->Lwt.faile))poolin(matchpool_resultwith|Okresult->(* All good, return result of f ctx *)Lwt.returnresult|Errorpool_err->(* Failed to start, commit or rollback transaction *)Lwt.fail(Exception(pool_err|>Caqti_error.show)))|Some_,Some_,Some_->Logs.err(funm->m"DB: Connection AND transaction AND pool found in context, this should never \
happen and might indicate connection leaks. Please report this issue.");Lwt.fail(Exception"Connection and pool found")|_->Logs.err(funm->m"No connection pool found");Logs.info(funm->m"Have you applied the DB middleware?");Lwt.fail(Exception"No connection pool found");;letatomicctxf=matchfind_transactionctx,find_connectionctx,find_poolctxwith|Someconnection,None,None->(* Make sure [f] can not use the pool or some other connection *)ctx|>remove_pool|>remove_connection|>add_transactionconnection|>f|None,Someconnection,None->(* TODO start transaction and store current connection as transaction in trx *)let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()in(matchstart_resultwith|Errormsg->Logs.debug(funm->m"DB TX: Failed to start transaction %s"(Caqti_error.showmsg));Lwt.fail@@Exception(Caqti_error.showmsg)|Ok()->Logs.debug(funm->m"DB TX: Started transaction");(* Remove the pool so that all subsequent queries are executed on the connection.
A transaction can only be done only at one connection, it can not span multiple
connections. *)letctx_with_connection=ctx|>remove_pool|>remove_connection|>add_transaction(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully committed transaction");Lwt.return@@result|Errorerror->Logs.err(funm->m"DB TX: Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully rolled back transaction");Lwt.faile|Errorerror->Logs.err(funm->m"DB TX: Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))|None,None,Somepool->(* There is no transaction active, create a new one *)print_pool_usagepool;let*pool_result=Caqti_lwt.Pool.use(funconnection->Logs.debug(funm->m"DB TX: Fetched connection from pool");let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()inmatchstart_resultwith|Errormsg->Logs.debug(funm->m"DB TX: Failed to start transaction %s"(Caqti_error.showmsg));Lwt.return@@Errormsg|Ok()->Logs.debug(funm->m"DB TX: Started transaction");(* Remove the pool so that all subsequent queries are executed on the
connection. A transaction can only be done only at one connection, it can
not span multiple connections. *)letctx_with_connection=ctx|>remove_pool|>remove_connection|>add_transaction(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully committed transaction");Lwt.return@@Okresult|Errorerror->Logs.err(funm->m"DB TX: Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully rolled back transaction");Lwt.faile|Errorerror->Logs.err(funm->m"DB TX: Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))poolin(matchpool_resultwith|Okresult->(* All good, return result of f ctx *)Lwt.returnresult|Errorpool_err->(* Failed to start, commit or rollback transaction *)Lwt.fail(Exception(pool_err|>Caqti_error.show)))|Some_,Some_,Some_->Logs.err(funm->m"DB: Connection AND transaction AND pool found in context, this should never \
happen and might indicate connection leaks. Please report this issue.");Lwt.fail(Exception"Connection and pool found")|_->Logs.err(funm->m"No connection pool found");Logs.info(funm->m"Have you applied the DB middleware?");Lwt.fail(Exception"No connection pool found");;letset_fk_check_request=Caqti_request.execCaqti_type.bool"SET FOREIGN_KEY_CHECKS = ?;";;letset_fk_checkctx~check=queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execset_fk_check_requestcheck);;letwith_disabled_fk_checkctxf=with_connectionctx(functx->let*()=set_fk_checkctx~check:falseinLwt.finalize(fun()->fctx)(fun()->set_fk_checkctx~check:true));;letstartctx=ctx|>add_pool|>Lwt.returnletstop_=Lwt.return()letlifecycle=Core.Container.Lifecycle.create"db"~start~stopletconfigureconfiguration=letconfiguration=Core.Configuration.makeconfigurationinCore.Container.Service.create~configurationlifecycle;;end