Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file server_pool.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236(**
This module is built around [Resource_pool]. While a pool of type
[Resource_pool.t] manages a number of resources, here we manage a cluster of
such pools. A typical use case would be a cluster of servers, where for each
server we maintain a number of connections. A user of this module can call [use]
to access one of the connections, which are served in a round-robin fashion.
*)[@@@ocaml.warning"+A-9-44-48"]let(>>=)=Lwt.(>>=)letsection=Lwt_log.Section.make"server-pool"let()=Lwt_log.Section.set_levelsectionLwt_log.InfomoduletypeCONF=sigtypeconnectiontypeservertypeserveridvalserverid_to_string:serverid->stringvalconnect:server->connectionLwt.tvalclose:connection->unitLwt.tvalcheck_delay:floatvalcheck_server:serverid->server->boolLwt.tendmoduleMake(Conf:CONF)=structletshow=Conf.serverid_to_stringtypeserver_status={serverid:Conf.serverid;desired:int;current:int;essential:bool;suspended:bool;check_server:unit->boolLwt.t;connections:Conf.connectionResource_pool.t;}letmk_server_status~serverid~desired~essential~check_server~connections={serverid=serverid;desired;current=0;essential;suspended=false;check_server;connections}letservers:(Conf.serverid,server_status)Hashtbl.t=Hashtbl.create9letget_statusserverid=Hashtbl.find_optserversserveridletnon_essential_active_connection_pools()=letaccumserverid{essential;suspended;connections}acc=ifnotessential&¬suspendedthen(serverid,connections)::accelseaccinHashtbl.foldaccumservers[]letremoveserverid=Lwt_log.ign_notice_f~section"removing server %s"(showserverid);Hashtbl.removeserversserveridletupdate_current_countserveridf=matchget_statusserveridwithNone->()|Somestatus->letstatus={statuswithcurrent=fstatus.current}inHashtbl.replaceserversserveridstatus;Lwt_log.ign_debug_f~section"current number of instances for %s: %d/%d"(showserverid)status.currentstatus.desired(* Each server holds its own connection_pool, so a server pool is a pool of
connection pools. HOWEVER, [server_pool] will not contain one
connection pool per server, but [n] times the same connection pool per
server, where [n] is the (maximum) size of the servers connection pool. *)letserver_pool:server_statusResource_pool.t=letnil()=failwith"Bs_db.server_pool: invalid connection attempt"in(* We supply [0] as the first argument to [Resource_pool.create] as it will
prevent [Resource_pool] to ever create a new resource on its own. This is what
we want since new servers are to be added by the user of this module. *)letn=0anddispose{serverid}=update_current_countserveridpred;Lwt.return_unitandcheck_{serverid}=matchget_statusserveridwith|None->Lwt.return_false(* remove retired server from pool *)|Some_->Lwt.return_true(* | Some status -> f status.essential *)(* For now, do not dispose of servers upon Resource_invalid {safe =
true} as there is currently no mechanism for reinstating them.
Potentially it might be advisable to dispose of them temporarily. *)inResource_pool.create~check~disposennilletserver_existsserverid=Hashtbl.memserversserveridletadd_many?(essential=false)?(connect_immediately=false)~num_connnew_servers=letmk_connection_pool(serverid,server):server_status=Lwt_log.ign_notice_f~section"adding server: %s"(showserverid);letconnect()=Lwt_log.ign_info_f~section"opening connection to %s"(showserverid);Conf.connectserver>>=funconn->Lwt.returnconninletdisposeconn=(* TODO: reopen closed connections if connect_immediately is true ? *)Lwt_log.ign_info_f~section"closing connection to %s"(showserverid);Lwt.catch(fun()->Conf.closeconn)(fun_->Lwt.return_unit)inletcheck_server()=Conf.check_serverserveridserverinletcheck__=Lwt.return_truein(* never close connections *)letconnections=Resource_pool.createnum_conn~check~disposeconnectinletstatus=mk_server_status~serverid~desired:num_conn~essential~check_server~connectionsinHashtbl.addserversserveridstatus;ifconnect_immediatelythenfor_=1tonum_conndoLwt.async@@fun()->connect()>>=func->tryResource_pool.addconnectionsc;Lwt.return_unitwithResource_pool.Resource_limit_exceeded->disposecdone;statusinletpools=List.mapmk_connection_pool@@List.filter(funl->not@@server_exists@@fstl)new_serversinfor_=1tonum_conndopools|>List.iter@@funconn_pool->Resource_pool.add~omit_max_check:trueserver_poolconn_pool;update_current_countconn_pool.serveridsuccdoneletadd_one?essential?connect_immediately~num_connserveridserver=add_many?essential?connect_immediately~num_conn[(serverid,server)]letadd_existing?(essential=false)?(check_server=fun()->Lwt.return_true)~num_connserveridconnections=Lwt_log.ign_notice_f~section"adding existing server: %s"(showserverid);letstatus=mk_server_status~serverid~desired:num_conn~essential~check_server~connectionsinHashtbl.addserversserveridstatus;for_=1tonum_conndoResource_pool.add~omit_max_check:trueserver_poolstatus;update_current_countserveridsuccdoneletreactivate_server~check_serverconnection_pool=letserverid=connection_pool.serveridinletcheck()=Lwt_unix.sleepConf.check_delay>>=fun()->Lwt_log.ign_debug_f~section"checking server health of %s"(showserverid);Lwt.catchcheck_server(fune->Lwt_log.ign_info_f~section"exception during health check of %s: %s"(showserverid)(Printexc.to_stringe);Lwt.return_false)inletreactivate()=matchget_statusserveridwithNone->Lwt.return_unit|Somestatus->Lwt_log.ign_notice_f~section"reactivating healthy server %s"(showserverid);Hashtbl.replaceserversserverid{statuswithsuspended=false};for_=status.currenttostatus.desired-1doResource_pool.add~omit_max_check:trueserver_poolconnection_pool;update_current_countserveridsuccdone;Lwt.return_unitinletrecloop()=ifnot@@server_existsserveridthenLwt.return_unitelsecheck()>>=funhealthy->ifhealthythenreactivate()elseloop()inloop()letsuspend_server~check_serverconnection_pool=letserverid=connection_pool.serveridinmatchget_statusserveridwithNone->()|Somestatus->ifstatus.essential||status.suspendedthen()elsebeginLwt_log.ign_warning_f~section"suspending %s"(showserverid);Hashtbl.replaceserversserverid{statuswithsuspended=true};Lwt.async@@fun()->Resource_pool.clearconnection_pool.connections>>=fun()->reactivate_server~check_serverconnection_poolendletuse?usage_attemptsf=(* We use retry here, since elements cannot be removed from an
[Resource_pool.t] directly. Therefore we detect whether a server has been
removed by our own means and try again with another server it this was
the case. Once a server has been removed (by the use of [remove]) there
will be [n] such retries before all traces of a server have been
erased, where [n] equals the value used for [num_conn] when the server
was added. *)Resource_pool.use~usage_attempts:9server_pool@@funconnection_pool->let{serverid;connections}=connection_poolinmatchget_statusserveridwith|None->Lwt_log.info_f~section"cannot use %s (removed)"(showserverid)>>=fun()->Lwt.failResource_pool.(Resource_invalid{safe=true})|Some{suspended=true}->Lwt_log.info_f~section"not using %s (suspended)"(showserverid)>>=fun()->Lwt.failResource_pool.(Resource_invalid{safe=true})|Some{check_server}->Lwt_log.debug_f~section"using connection to %s"(showserverid)>>=fun()->Lwt.catch(fun()->Resource_pool.use?usage_attemptsconnectionsf)(fune->matchewith|Resource_pool.(Resource_invalid{safe=true})->Lwt_log.warning"connection unusable (safe to retry using another server)">>=fun()->Lwt.faile|Resource_pool.(Resource_invalid{safe=false})->Lwt_log.warning"connection unusable (unsafe to retry using another server)">>=fun()->suspend_server~check_serverconnection_pool;Lwt.faile|e->Lwt.faile)letserver_statuses()=Hashtbl.fold(fun_statusl->status::l)servers[]letservers()=Hashtbl.fold(funserver_l->server::l)servers[]end