Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file data_db_service.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260openBaseopenLwt.SyntaxopenData_db_coremoduleSig=Data_db_service_sigmoduleMake(Config:Configuration.Service.Sig.SERVICE)(Log:Log.Service.Sig.SERVICE):Sig.SERVICE=structletprint_pool_usagepool=letn_connections=Caqti_lwt.Pool.sizepoolinletmax_connections=Config.read_int~default:10"DATABASE_POOL_SIZE"inLog.debug(funm->m"DB: Pool usage: %i/%i"n_connectionsmax_connections)letcreate_pool()=match!pool_refwith|Somepool->Log.debug(funm->m"DB: Skipping pool creation, re-using existing pool");pool|None->(letpool_size=Config.read_int~default:10"DATABASE_POOL_SIZE"inLog.debug(funm->m"DB: Create pool with size %i"pool_size);"DATABASE_URL"|>Config.read_string|>Uri.of_string|>Caqti_lwt.connect_pool~max_size:pool_size|>function|Okpool->pool_ref:=Somepool;pool|Errorerr->letmsg="DB: Failed to connect to DB pool"inLog.err(funm->m"%s %s"msg(Caqti_error.showerr));raise(Exception("DB: Failed to create pool "^msg)))letctx_with_pool()=letpool=create_pool()inCore.Ctx.(empty|>add_poolpool)letadd_poolctx=letpool=create_pool()inadd_poolpoolctxletqueryctxf=match(find_transactionctx,find_connectionctx,find_poolctx)with|Someconnection,None,None->(let*result=fconnectioninmatchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLog.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|None,Someconnection,_->(let*result=fconnectioninmatchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLog.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|None,None,Somepool->(print_pool_usagepool;let*result=Caqti_lwt.Pool.usefpoolinmatchresultwith|Okresult->Lwt.returnresult|Errorerror->letmsg=Caqti_error.showerrorinLog.err(funm->m"DB: %s"msg);Lwt.fail(Exceptionmsg))|Some_,Some_,Some_->Log.err(funm->m"DB: Connection AND transaction AND pool found in context, this \
should never happen and might indicate connection leaks. Please \
report this issue.");Lwt.fail(Exception"Connection and pool found")|_->Log.err(funm->m"DB: No connection pool found");Log.info(funm->m"DB: Have you applied the DB middleware?");Lwt.fail(Exception"No connection pool found")letwith_connectionctxf=match(find_transactionctx,find_connectionctx,find_poolctx)with|Some_,None,None->ctx|>remove_pool|>f|None,Some_,None->ctx|>remove_pool|>f|None,None,Somepool->(print_pool_usagepool;let*pool_result=Caqti_lwt.Pool.use(funconnection->Log.debug(funm->m"DB TX: Fetched connection from pool");let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninletctx_with_connection=ctx|>remove_pool|>add_connection(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninLwt.return@@Okresult)(fune->Lwt.faile))poolinmatchpool_resultwith|Okresult->(* All good, return result of f ctx *)Lwt.returnresult|Errorpool_err->(* Failed to start, commit or rollback transaction *)Lwt.fail(Exception(pool_err|>Caqti_error.show)))|Some_,Some_,Some_->Log.err(funm->m"DB: Connection AND transaction AND pool found in context, this \
should never happen and might indicate connection leaks. Please \
report this issue.");Lwt.fail(Exception"Connection and pool found")|_->Log.err(funm->m"No connection pool found");Log.info(funm->m"Have you applied the DB middleware?");Lwt.fail(Exception"No connection pool found")letatomicctxf=match(find_transactionctx,find_connectionctx,find_poolctx)with|Someconnection,None,None->(* Make sure [f] can not use the pool or some other connection *)ctx|>remove_pool|>remove_connection|>add_transactionconnection|>f|None,Someconnection,None->((* TODO start transaction and store current connection as transaction in trx *)let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()inmatchstart_resultwith|Errormsg->Log.debug(funm->m"DB TX: Failed to start transaction %s"(Caqti_error.showmsg));Lwt.fail@@Exception(Caqti_error.showmsg)|Ok()->Log.debug(funm->m"DB TX: Started transaction");(* Remove the pool so that all subsequent queries are executed on the connection.
A transaction can only be done only at one connection, it can not span multiple connections. *)letctx_with_connection=ctx|>remove_pool|>remove_connection|>add_transaction(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Log.debug(funm->m"DB TX: Successfully committed transaction");Lwt.return@@result|Errorerror->Log.err(funm->m"DB TX: Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Log.debug(funm->m"DB TX: Successfully rolled back transaction");Lwt.faile|Errorerror->Log.err(funm->m"DB TX: Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))|None,None,Somepool->((* There is no transaction active, create a new one *)print_pool_usagepool;let*pool_result=Caqti_lwt.Pool.use(funconnection->Log.debug(funm->m"DB TX: Fetched connection from pool");let(moduleConnection:Caqti_lwt.CONNECTION)=connectioninlet*start_result=Connection.start()inmatchstart_resultwith|Errormsg->Log.debug(funm->m"DB TX: Failed to start transaction %s"(Caqti_error.showmsg));Lwt.return@@Errormsg|Ok()->Log.debug(funm->m"DB TX: Started transaction");(* Remove the pool so that all subsequent queries are executed on the connection. A transaction can only be done only at one connection, it can not span multiple connections. *)letctx_with_connection=ctx|>remove_pool|>remove_connection|>add_transaction(moduleConnection)inLwt.catch(fun()->let*result=fctx_with_connectioninlet*commit_result=Connection.commit()inmatchcommit_resultwith|Ok()->Log.debug(funm->m"DB TX: Successfully committed transaction");Lwt.return@@Okresult|Errorerror->Log.err(funm->m"DB TX: Failed to commit transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to commit transaction")(fune->let*rollback_result=Connection.rollback()inmatchrollback_resultwith|Ok()->Log.debug(funm->m"DB TX: Successfully rolled back transaction");Lwt.faile|Errorerror->Log.err(funm->m"DB TX: Failed to rollback transaction %s"(Caqti_error.showerror));Lwt.fail@@Exception"Failed to rollback transaction"))poolinmatchpool_resultwith|Okresult->(* All good, return result of f ctx *)Lwt.returnresult|Errorpool_err->(* Failed to start, commit or rollback transaction *)Lwt.fail(Exception(pool_err|>Caqti_error.show)))|Some_,Some_,Some_->Log.err(funm->m"DB: Connection AND transaction AND pool found in context, this \
should never happen and might indicate connection leaks. Please \
report this issue.");Lwt.fail(Exception"Connection and pool found")|_->Log.err(funm->m"No connection pool found");Log.info(funm->m"Have you applied the DB middleware?");Lwt.fail(Exception"No connection pool found")letset_fk_check_request=Caqti_request.execCaqti_type.bool"SET FOREIGN_KEY_CHECKS = ?;"letset_fk_checkctx~check=queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execset_fk_check_requestcheck)letwith_disabled_fk_checkctxf=with_connectionctx(functx->let*()=set_fk_checkctx~check:falseinLwt.finalize(fun()->fctx)(fun()->set_fk_checkctx~check:true))letstartctx=ctx|>add_pool|>Lwt.returnletstop_=Lwt.return()letlifecycle=Core.Container.Lifecycle.make"db"~dependencies:[Config.lifecycle;Log.lifecycle]~start~stopend