Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file service.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219openLwt.SyntaxopenModelletlog_src=Logs.Src.create~doc:"database""sihl.service.database"moduleLogs=(valLogs.src_loglog_src:Logs.LOG)letctx_key_pool:poolCore.Ctx.key=Core.Ctx.create_key()letfind_poolctx=Core.Ctx.findctx_key_poolctxletadd_poolpoolctx=Core.Ctx.addctx_key_poolpoolctxletremove_poolctx=Core.Ctx.removectx_key_poolctxletprint_pool_usagepool=letn_connections=Caqti_lwt.Pool.sizepoolinletmax_connections=Option.value(Core.Configuration.read_int"DATABASE_POOL_SIZE")~default:10inLogs.debug(funm->m"DB: Pool usage: %i/%i"n_connectionsmax_connections);;letfetch_pool()=match!pool_refwith|Somepool->Logs.debug(funm->m"DB: Skipping pool creation, re-using existing pool");pool|None->letpool_size=Option.value(Core.Configuration.read_int"DATABASE_POOL_SIZE")~default:10inLogs.debug(funm->m"DB: Create pool with size %i"pool_size);Option.get("DATABASE_URL"|>Core.Configuration.read_string)|>Uri.of_string|>Caqti_lwt.connect_pool~max_size:pool_size|>(function|Okpool->pool_ref:=Somepool;pool|Errorerr->letmsg="DB: Failed to connect to DB pool"inLogs.err(funm->m"%s %s"msg(Caqti_error.showerr));raise(Exception("DB: Failed to create pool "^msg)));;letadd_poolctx=letpool=fetch_pool()inadd_poolpoolctx;;letqueryctxf=matchfind_transactionctx,find_connectionctx,find_poolctxwith|Someconnection,None,None->let*result=fconnectionin(matchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|None,Someconnection,_->let*result=fconnectionin(matchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|None,None,pool->letpool=Option.value~default:(fetch_pool())poolinprint_pool_usagepool;let*result=Caqti_lwt.Pool.usefpoolin(matchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLogs.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|_->Logs.err(funm->m"DB: Connection, transaction or pool found in context, this should never \
happen and might indicate connection leaks. Please report this issue.");Lwt.fail(Exception"Connection and pool found");;letatomicctxf=matchfind_transactionctx,find_connectionctx,find_poolctxwith|Someconnection,None,None->(* Make sure [f] can not use the pool or some other connection *)ctx|>remove_pool|>remove_connection|>add_transactionconnection|>f|None,Someconnection,None->let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()in(matchstart_resultwith|Errormsg->Logs.debug(funm->m"DB TX: Failed to start transaction %s"(Caqti_error.showmsg));Lwt.fail@@Exception(Caqti_error.showmsg)|Ok()->Logs.debug(funm->m"DB TX: Started transaction");(* Remove the pool so that all subsequent queries are executed on the connection. A
transaction can only be done only at one connection, it can not span multiple
connections. *)letctx_with_connection=ctx|>remove_pool|>remove_connection|>add_transaction(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully committed transaction");Lwt.return@@result|Errorerror->Logs.err(funm->m"DB TX: Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully rolled back transaction");Lwt.faile|Errorerror->Logs.err(funm->m"DB TX: Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))|None,None,pool->(* There is no transaction active, create a new one *)letpool=Option.value~default:(fetch_pool())poolinprint_pool_usagepool;let*pool_result=Caqti_lwt.Pool.use(funconnection->Logs.debug(funm->m"DB TX: Fetched connection from pool");let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()inmatchstart_resultwith|Errormsg->Logs.debug(funm->m"DB TX: Failed to start transaction %s"(Caqti_error.showmsg));Lwt.return@@Errormsg|Ok()->Logs.debug(funm->m"DB TX: Started transaction");(* Remove the pool so that all subsequent queries are executed on the
connection. A transaction can only be done only at one connection, it can
not span multiple connections. *)letctx_with_connection=ctx|>remove_pool|>remove_connection|>add_transaction(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully committed transaction");Lwt.return@@Okresult|Errorerror->Logs.err(funm->m"DB TX: Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Logs.debug(funm->m"DB TX: Successfully rolled back transaction");Lwt.faile|Errorerror->Logs.err(funm->m"DB TX: Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))poolin(matchpool_resultwith|Okresult->(* All good, return result of f ctx *)Lwt.returnresult|Errorpool_err->(* Failed to start, commit or rollback transaction *)Lwt.fail(Exception(pool_err|>Caqti_error.show)))|_->Logs.err(funm->m"DB: Connection, transaction or pool found in context together, this should \
never happen and might indicate connection leaks. Please report this issue.");Lwt.fail(Exception"Connection and pool found");;letset_fk_check_request=Caqti_request.execCaqti_type.bool"SET FOREIGN_KEY_CHECKS = ?;";;letset_fk_checkctx~check=queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execset_fk_check_requestcheck);;letwith_disabled_fk_checkctxf=atomicctx(functx->let*()=set_fk_checkctx~check:falseinLwt.finalize(fun()->fctx)(fun()->set_fk_checkctx~check:true));;(* Service lifecycle *)letstartctx=ctx|>add_pool|>Lwt.returnletstop_=Lwt.return()letlifecycle=Core.Container.Lifecycle.create"db"~start~stopletconfigureconfiguration=letconfiguration=Core.Configuration.makeconfigurationinCore.Container.Service.create~configurationlifecycle;;