package sihl
The modular functional web framework
Install
Dune Dependency
Authors
Maintainers
Sources
sihl-0.1.1.tbz
sha256=eac58e5ee9c869aa3b0f0bcee936b01c53bf7fe1febb42edd607268dfb11f4e9
sha512=012b6cf1cf6af0966059761b4916ea8aa590aa8d5809a6f480cb17e23ee10c3b9245062c4f0cf9ad98ab950391c0827c9780999d39fa16a93f7aab4b12f9ab8c
doc/src/sihl.queue/queue_service.ml.html
Source file queue_service.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
open Base open Lwt.Syntax module Job = Queue_core.Job module WorkableJob = Queue_core.WorkableJob module JobInstance = Queue_core.JobInstance module Sig = Queue_service_sig let registered_jobs : WorkableJob.t list ref = ref [] let stop_schedule : (unit -> unit) option ref = ref None module MakePolling (Log : Log.Service.Sig.SERVICE) (ScheduleService : Schedule.Service.Sig.SERVICE) (Repo : Sig.REPO) : Sig.SERVICE = struct let dispatch ctx ~job ?delay input = let name = Job.name job in Log.debug (fun m -> m "QUEUE: Dispatching job %s" name); let now = Ptime_clock.now () in let job_instance = JobInstance.create ~input ~delay ~now job in Repo.enqueue ctx ~job_instance let run_job ctx input ~job ~job_instance = let job_instance_id = JobInstance.id job_instance in let* result = Lwt.catch (fun () -> WorkableJob.work job ctx ~input) (fun exn -> let exn_string = Exn.to_string exn in Lwt.return @@ Error ( "Exception caught while running job, this is a bug in your \ job handler, make sure to not throw exceptions " ^ exn_string )) in match result with | Error msg -> ( Log.err (fun m -> m "QUEUE: Failure while running job instance %a %s" JobInstance.pp job_instance msg); let* result = Lwt.catch (fun () -> WorkableJob.failed job ctx) (fun exn -> let exn_string = Exn.to_string exn in Lwt.return @@ Error ( "Exception caught while cleaning up job, this is a bug in \ your job failure handler, make sure to not throw \ exceptions " ^ exn_string )) in match result with | Error msg -> Log.err (fun m -> m "QUEUE: Failure while run failure handler for job instance \ %a %s" JobInstance.pp job_instance msg); Lwt.return None | Ok () -> Log.err (fun m -> m "QUEUE: Clean up job %a" Uuidm.pp job_instance_id); Lwt.return None ) | Ok () -> Log.debug (fun m -> m "QUEUE: Successfully ran job instance %a" Uuidm.pp job_instance_id); Lwt.return @@ Some () let update ctx ~job_instance = Repo.update ctx ~job_instance let work_job ctx ~job ~job_instance = let now = Ptime_clock.now () in if JobInstance.should_run ~job_instance ~now then let input_string = JobInstance.input job_instance in let* job_run_status = run_job ctx input_string ~job ~job_instance in let job_instance = job_instance |> JobInstance.incr_tries |> JobInstance.update_next_run_at job in let job_instance = match job_run_status with | None -> if JobInstance.tries job_instance >= WorkableJob.max_tries job then JobInstance.set_failed job_instance else job_instance | Some () -> JobInstance.set_succeeded job_instance in update ctx ~job_instance else ( Log.debug (fun m -> m "QUEUE: Not going to run job instance %a" JobInstance.pp job_instance); Lwt.return () ) let work_queue ctx ~jobs = let* pending_job_instances = Repo.find_workable ctx in let n_job_instances = List.length pending_job_instances in if n_job_instances > 0 then ( Log.debug (fun m -> m "QUEUE: Start working queue of length %d" (List.length pending_job_instances)); let rec loop job_instances jobs = match job_instances with | [] -> Lwt.return () | job_instance :: job_instances -> ( let job = List.find jobs ~f:(fun job -> job |> WorkableJob.name |> String.equal (JobInstance.name job_instance)) in match job with | None -> loop job_instances jobs | Some job -> work_job ctx ~job ~job_instance ) in let* () = loop pending_job_instances jobs in Log.debug (fun m -> m "QUEUE: Finish working queue"); Lwt.return () ) else Lwt.return () let register_jobs _ ~jobs = let jobs_to_register = jobs |> List.map ~f:WorkableJob.of_job in registered_jobs := List.concat [ !registered_jobs; jobs_to_register ]; Lwt.return () let start_queue ctx = Log.debug (fun m -> m "QUEUE: Start job queue"); (* This function run every second, the request context gets created here with each tick *) let scheduled_function () = let jobs = !registered_jobs in if List.length jobs > 0 then ( let job_strings = jobs |> List.map ~f:WorkableJob.name |> String.concat ~sep:", " in Logs.debug (fun m -> m "QUEUE: Run job queue with registered jobs: %s" job_strings); (* Combine all context middleware functions of registered jobs to get the context the jobs run with*) let combined_context_fn = jobs |> List.map ~f:WorkableJob.with_context |> List.fold ~init:Fn.id ~f:Fn.compose in let ctx = combined_context_fn Core.Ctx.empty in work_queue ctx ~jobs ) else ( Logs.debug (fun m -> m "QUEUE: No jobs found to run, trying again later"); Lwt.return () ) in let schedule = Schedule.create Schedule.every_second ~f:scheduled_function ~label:"job_queue" in stop_schedule := Some (ScheduleService.schedule ctx schedule); Lwt.return () let lifecycle = Core.Container.Lifecycle.make "queue" ~dependencies:[ ScheduleService.lifecycle; Log.lifecycle ] (fun ctx -> Repo.register_migration (); Repo.register_cleaner (); start_queue ctx |> Lwt.map (fun () -> ctx)) (fun _ -> registered_jobs := []; match !stop_schedule with | Some stop_schedule -> stop_schedule (); Lwt.return () | None -> Log.warn (fun m -> m "QUEUE: Can not stop schedule"); Lwt.return ()) end module Repo = Queue_service_repo
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>