Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file data_migration_service.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277openBaseopenLwt.SyntaxmoduleSig=Data_migration_service_sigmoduleModel=Data_migration_coremoduleMake(Log:Log.Service.Sig.SERVICE)(CmdService:Cmd.Service.Sig.SERVICE)(Db:Data_db_service_sig.SERVICE)(MigrationRepo:Sig.REPO):Sig.SERVICE=structletsetupctx=Log.debug(funm->m"MIGRATION: Setting up table if not exists");MigrationRepo.create_table_if_not_existsctxlethasctx~namespace=MigrationRepo.getctx~namespace|>Lwt.mapOption.is_someletgetctx~namespace=let*state=MigrationRepo.getctx~namespaceinLwt.return@@matchstatewith|Somestate->state|None->raise(Data_migration_core.Exception(Printf.sprintf"MIGRATION: Could not get migration state for %s"namespace))letupsertctxstate=MigrationRepo.upsertctx~stateletmark_dirtyctx~namespace=let*state=getctx~namespaceinletdirty_state=Model.mark_dirtystateinlet*()=upsertctxdirty_stateinLwt.returndirty_stateletmark_cleanctx~namespace=let*state=getctx~namespaceinletclean_state=Model.mark_cleanstateinlet*()=upsertctxclean_stateinLwt.returnclean_stateletincrementctx~namespace=let*state=getctx~namespaceinletupdated_state=Model.incrementstateinlet*()=upsertctxupdated_stateinLwt.returnupdated_stateletregistermigration=Data_migration_core.Registry.registermigration|>ignoreletget_migrations_=Lwt.return(Data_migration_core.Registry.get_all())letexecute_stepsctxmigration=letnamespace,steps=migrationinletrecrunsteps=matchstepswith|[]->Lwt.return()|Model.Migration.{label;statement;check_fk=true}::steps->Log.debug(funm->m"MIGRATION: Running %s"label);letquery(moduleConnection:Caqti_lwt.CONNECTION)=letreq=Caqti_request.exec~oneshot:trueCaqti_type.unitstatementinConnection.execreq()inlet*()=Db.queryctxqueryinLog.debug(funm->m"MIGRATION: Ran %s"label);let*_=incrementctx~namespaceinrunsteps|{label;statement;check_fk=false}::steps->let*()=Db.with_disabled_fk_checkctx(functx->Log.debug(funm->m"MIGRATION: Running %s without fk checks"label);letquery(moduleConnection:Caqti_lwt.CONNECTION)=letreq=Caqti_request.exec~oneshot:trueCaqti_type.unitstatementinConnection.execreq()inDb.queryctxquery)inLog.debug(funm->m"MIGRATION: Ran %s"label);let*_=incrementctx~namespaceinrunstepsinlet()=matchList.lengthstepswith|0->Log.debug(funm->m"MIGRATION: No migrations to apply for %s"namespace)|n->Log.debug(funm->m"MIGRATION: Applying %i migrations for %s"nnamespace)inrunstepsletexecute_migrationctxmigration=letnamespace,_=migrationinLog.debug(funm->m"MIGRATION: Execute migrations for %s"namespace);let*()=setupctxinlet*has_state=hasctx~namespaceinlet*state=ifhas_statethenlet*state=getctx~namespaceinifModel.dirtystatethen(letmsg=Printf.sprintf"Dirty migration found for %s, has to be fixed manually"namespaceinLog.err(funm->m"MIGRATION: %s"msg);raise(Data_migration_core.Exceptionmsg))elsemark_dirtyctx~namespaceelse(Log.debug(funm->m"MIGRATION: Setting up table for %s"namespace);letstate=Model.create~namespaceinlet*()=upsertctxstateinLwt.returnstate)inletmigration_to_apply=Model.steps_to_applymigrationstateinlet*()=execute_stepsctxmigration_to_applyinlet*_=mark_cleanctx~namespaceinLwt.return@@Ok()letexecutectxmigrations=letn=List.lengthmigrationsinifn>0thenLog.debug(funm->m"MIGRATION: Executing %i migrations"(List.lengthmigrations))elseLog.debug(funm->m"MIGRATION: No migrations to execute");letopenLwtinletrecrunmigrationsctx=matchmigrationswith|[]->Lwt.return()|migration::migrations->(execute_migrationctxmigration>>=function|Ok()->runmigrationsctx|Errorerr->Log.err(funm->m"MIGRATION: Error while running migration %a: %s"Model.Migration.ppmigrationerr);raise(Model.Exceptionerr))inrunmigrationsctxletrun_allctx=let*migrations=get_migrationsctxinexecutectxmigrationsletmigrate_cmd=Cmd.make~name:"migrate"~description:"Run all migrations"~fn:(fun_->letctx=Core.Ctx.empty|>Db.add_poolinrun_allctx)()letlifecycle=Core.Container.Lifecycle.make"migration"~dependencies:[CmdService.lifecycle;Db.lifecycle;Log.lifecycle](functx->CmdService.register_commandmigrate_cmd;Lwt.returnctx)(fun_->Lwt.return())endmoduleRepo=structmoduleMakeMariaDb(Db:Data_db_service_sig.SERVICE):Sig.REPO=structletcreate_request=Caqti_request.execCaqti_type.unit{sql|
CREATE TABLE IF NOT EXISTS core_migration_state (
namespace VARCHAR(128) NOT NULL,
version INTEGER,
dirty BOOL NOT NULL,
PRIMARY KEY (namespace)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
|sql}letcreate_table_if_not_existsctx=Db.queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execcreate_request())letget_request=Caqti_request.find_optCaqti_type.stringCaqti_type.(tup3stringintbool){sql|
SELECT
namespace,
version,
dirty
FROM core_migration_state
WHERE namespace = ?;
|sql}letgetctx~namespace=Db.queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.find_optget_requestnamespace)|>Lwt.map(Option.map~f:Model.of_tuple)letupsert_request=Caqti_request.execCaqti_type.(tup3stringintbool){sql|
INSERT INTO core_migration_state (
namespace,
version,
dirty
) VALUES (
?,
?,
?
) ON DUPLICATE KEY UPDATE
version = VALUES(version),
dirty = VALUES(dirty)
|sql}letupsertctx~state=Db.queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execupsert_request(Model.to_tuplestate))endmoduleMakePostgreSql(Db:Data_db_service_sig.SERVICE):Sig.REPO=structletcreate_request=Caqti_request.execCaqti_type.unit{sql|
CREATE TABLE IF NOT EXISTS core_migration_state (
namespace VARCHAR(128) NOT NULL PRIMARY KEY,
version INTEGER,
dirty BOOL NOT NULL
);
|sql}letcreate_table_if_not_existsctx=Db.queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execcreate_request())letget_request=Caqti_request.find_optCaqti_type.stringCaqti_type.(tup3stringintbool){sql|
SELECT
namespace,
version,
dirty
FROM core_migration_state
WHERE namespace = ?;
|sql}letgetctx~namespace=Db.queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.find_optget_requestnamespace)|>Lwt.map(Option.map~f:Model.of_tuple)letupsert_request=Caqti_request.execCaqti_type.(tup3stringintbool){sql|
INSERT INTO core_migration_state (
namespace,
version,
dirty
) VALUES (
?,
?,
?
) ON CONFLICT (namespace)
DO UPDATE SET version = EXCLUDED.version,
dirty = EXCLUDED.dirty
|sql}letupsertctx~state=Db.queryctx(fun(moduleConnection:Caqti_lwt.CONNECTION)->Connection.execupsert_request(Model.to_tuplestate))endend