package sihl
The modular functional web framework
Install
Dune Dependency
Authors
Maintainers
Sources
sihl-0.1.1.tbz
sha256=eac58e5ee9c869aa3b0f0bcee936b01c53bf7fe1febb42edd607268dfb11f4e9
sha512=012b6cf1cf6af0966059761b4916ea8aa590aa8d5809a6f480cb17e23ee10c3b9245062c4f0cf9ad98ab950391c0827c9780999d39fa16a93f7aab4b12f9ab8c
doc/src/sihl.queue/queue_service_repo.ml.html
Source file queue_service_repo.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
open Base module Job = Queue_core.Job module JobInstance = Queue_core.JobInstance module MakeMemory (RepoService : Data.Repo.Service.Sig.SERVICE) : Queue_service_sig.REPO = struct let state = ref (Map.empty (module String)) let ordered_ids = ref [] let register_cleaner _ = let cleaner _ = state := Map.empty (module String); ordered_ids := []; Lwt.return () in RepoService.register_cleaner cleaner let register_migration () = () let enqueue _ ~job_instance = let id = JobInstance.id job_instance |> Data.Id.to_string in ordered_ids := List.cons id !ordered_ids; state := Map.add_exn !state ~key:id ~data:job_instance; Lwt.return () let update _ ~job_instance = let id = JobInstance.id job_instance |> Data.Id.to_string in state := Map.set !state ~key:id ~data:job_instance; Lwt.return () let find_workable _ = let all_job_instances = List.map !ordered_ids ~f:(fun id -> Map.find !state id) in let now = Ptime_clock.now () in let rec filter_pending all_job_instances result = match all_job_instances with | Some job_instance :: job_instances -> if JobInstance.should_run ~job_instance ~now then filter_pending job_instances (List.cons job_instance result) else filter_pending job_instances result | None :: job_instances -> filter_pending job_instances result | [] -> result in Lwt.return @@ filter_pending all_job_instances [] end module Model = struct open Queue_core.JobInstance let status = let encode m = Ok (Status.to_string m) in let decode = Status.of_string in Caqti_type.(custom ~encode ~decode string) let t = let encode m = Ok ( m.id, ( m.name, (m.input, (m.tries, (m.next_run_at, (m.max_tries, m.status)))) ) ) in let decode (id, (name, (input, (tries, (next_run_at, (max_tries, status)))))) = Ok { id; name; input; tries; next_run_at; max_tries; status } in Caqti_type.( custom ~encode ~decode (tup2 Data.Id.t (tup2 string (tup2 (option string) (tup2 int (tup2 ptime (tup2 int status))))))) end module MakeMariaDb (DbService : Data.Db.Service.Sig.SERVICE) (RepoService : Data.Repo.Service.Sig.SERVICE) (MigrationService : Data.Migration.Service.Sig.SERVICE) : Queue_service_sig.REPO = struct let enqueue_request = Caqti_request.exec Model.t {sql| INSERT INTO queue_jobs ( uuid, name, input, tries, next_run_at, max_tries, status ) VALUES ( ?, ?, ?, ?, ?, ?, ? ) |sql} let enqueue ctx ~job_instance = DbService.query ctx (fun connection -> let module Connection = (val connection : Caqti_lwt.CONNECTION) in Connection.exec enqueue_request job_instance) let update_request = Caqti_request.exec Model.t {sql| UPDATE queue_jobs SET name = $2, input = $3, tries = $4, next_run_at = $5, max_tries = $6, status = $7 WHERE queue_jobs.uuid = $1 |sql} let update ctx ~job_instance = DbService.query ctx (fun connection -> let module Connection = (val connection : Caqti_lwt.CONNECTION) in Connection.exec update_request job_instance) let find_workable_request = Caqti_request.collect Caqti_type.unit Model.t {sql| SELECT uuid, name, input, tries, next_run_at, max_tries, status FROM queue_jobs WHERE status = "pending" AND next_run_at <= NOW() AND tries < max_tries ORDER BY id DESC |sql} let find_workable ctx = DbService.query ctx (fun connection -> let module Connection = (val connection : Caqti_lwt.CONNECTION) in Connection.collect_list find_workable_request ()) let clean_request = Caqti_request.exec Caqti_type.unit {sql| TRUNCATE TABLE email_templates; |sql} let clean ctx = DbService.query ctx (fun connection -> let module Connection = (val connection : Caqti_lwt.CONNECTION) in Connection.exec clean_request ()) module Migration = struct let fix_collation = Data.Migration.create_step ~label:"fix collation" "SET collation_server = 'utf8mb4_unicode_ci';" let create_jobs_table = Data.Migration.create_step ~label:"create jobs table" {sql| CREATE TABLE IF NOT EXISTS queue_jobs ( id BIGINT UNSIGNED AUTO_INCREMENT, uuid BINARY(16) NOT NULL, name VARCHAR(128) NOT NULL, input TEXT NULL, tries BIGINT UNSIGNED, next_run_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, max_tries BIGINT UNSIGNED, status VARCHAR(128) NOT NULL, PRIMARY KEY (id), CONSTRAINT unique_uuid UNIQUE KEY (uuid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; |sql} let migration = Data.Migration.( empty "queue" |> add_step fix_collation |> add_step create_jobs_table) end let register_cleaner () = RepoService.register_cleaner clean let register_migration () = MigrationService.register Migration.migration end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>