package sihl
The modular functional web framework
Install
Dune Dependency
Authors
Maintainers
Sources
sihl-0.1.3.tbz
sha256=3d1acdd1eae24a7131033656f90b5d20c1621e6ef92957edf88a09b8b5f2d9e9
sha512=d224f54e20a9465c7a03d534dadcb2b9a181ae87c13731840db945aab37534f6f3982c5cb25a197e90c17d8772da062b19fa92bb93ed53a8b736c3776a7776db
doc/src/sihl.queue/queue_service.ml.html
Source file queue_service.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
open Base open Lwt.Syntax module Job = Queue_core.Job module WorkableJob = Queue_core.WorkableJob module JobInstance = Queue_core.JobInstance module Sig = Queue_service_sig let registered_jobs : WorkableJob.t list ref = ref [] let stop_schedule : (unit -> unit) option ref = ref None module MakePolling (Log : Log.Service.Sig.SERVICE) (ScheduleService : Schedule.Service.Sig.SERVICE) (Repo : Sig.REPO) : Sig.SERVICE = struct let dispatch ctx ~job ?delay input = let name = Job.name job in Log.debug (fun m -> m "QUEUE: Dispatching job %s" name); let now = Ptime_clock.now () in let job_instance = JobInstance.create ~input ~delay ~now job in Repo.enqueue ctx ~job_instance let run_job ctx input ~job ~job_instance = let job_instance_id = JobInstance.id job_instance in let* result = Lwt.catch (fun () -> WorkableJob.work job ctx ~input) (fun exn -> let exn_string = Exn.to_string exn in Lwt.return @@ Error ( "Exception caught while running job, this is a bug in your \ job handler, make sure to not throw exceptions " ^ exn_string )) in match result with | Error msg -> ( Log.err (fun m -> m "QUEUE: Failure while running job instance %a %s" JobInstance.pp job_instance msg); let* result = Lwt.catch (fun () -> WorkableJob.failed job ctx) (fun exn -> let exn_string = Exn.to_string exn in Lwt.return @@ Error ( "Exception caught while cleaning up job, this is a bug in \ your job failure handler, make sure to not throw \ exceptions " ^ exn_string )) in match result with | Error msg -> Log.err (fun m -> m "QUEUE: Failure while run failure handler for job instance \ %a %s" JobInstance.pp job_instance msg); Lwt.return None | Ok () -> Log.err (fun m -> m "QUEUE: Clean up job %a" Uuidm.pp job_instance_id); Lwt.return None ) | Ok () -> Log.debug (fun m -> m "QUEUE: Successfully ran job instance %a" Uuidm.pp job_instance_id); Lwt.return @@ Some () let update ctx ~job_instance = Repo.update ctx ~job_instance let work_job ctx ~job ~job_instance = let now = Ptime_clock.now () in if JobInstance.should_run ~job_instance ~now then let input_string = JobInstance.input job_instance in let* job_run_status = run_job ctx input_string ~job ~job_instance in let job_instance = job_instance |> JobInstance.incr_tries |> JobInstance.update_next_run_at job in let job_instance = match job_run_status with | None -> if JobInstance.tries job_instance >= WorkableJob.max_tries job then JobInstance.set_failed job_instance else job_instance | Some () -> JobInstance.set_succeeded job_instance in update ctx ~job_instance else ( Log.debug (fun m -> m "QUEUE: Not going to run job instance %a" JobInstance.pp job_instance); Lwt.return () ) let work_queue ctx ~jobs = let* pending_job_instances = Repo.find_workable ctx in let n_job_instances = List.length pending_job_instances in if n_job_instances > 0 then ( Log.debug (fun m -> m "QUEUE: Start working queue of length %d" (List.length pending_job_instances)); let rec loop job_instances jobs = match job_instances with | [] -> Lwt.return () | job_instance :: job_instances -> ( let job = List.find jobs ~f:(fun job -> job |> WorkableJob.name |> String.equal (JobInstance.name job_instance)) in match job with | None -> loop job_instances jobs | Some job -> work_job ctx ~job ~job_instance ) in let* () = loop pending_job_instances jobs in Log.debug (fun m -> m "QUEUE: Finish working queue"); Lwt.return () ) else Lwt.return () let register_jobs _ ~jobs = let jobs_to_register = jobs |> List.map ~f:WorkableJob.of_job in registered_jobs := List.concat [ !registered_jobs; jobs_to_register ]; Lwt.return () let start_queue ctx = Log.debug (fun m -> m "QUEUE: Start job queue"); (* This function run every second, the request context gets created here with each tick *) let scheduled_function () = let jobs = !registered_jobs in if List.length jobs > 0 then ( let job_strings = jobs |> List.map ~f:WorkableJob.name |> String.concat ~sep:", " in Logs.debug (fun m -> m "QUEUE: Run job queue with registered jobs: %s" job_strings); (* Combine all context middleware functions of registered jobs to get the context the jobs run with*) let combined_context_fn = jobs |> List.map ~f:WorkableJob.with_context |> List.fold ~init:Fn.id ~f:Fn.compose in let ctx = combined_context_fn Core.Ctx.empty in work_queue ctx ~jobs ) else ( Logs.debug (fun m -> m "QUEUE: No jobs found to run, trying again later"); Lwt.return () ) in let schedule = Schedule.create Schedule.every_second ~f:scheduled_function ~label:"job_queue" in stop_schedule := Some (ScheduleService.schedule ctx schedule); Lwt.return () let start ctx = Repo.register_migration (); Repo.register_cleaner (); start_queue ctx |> Lwt.map (fun () -> ctx) let stop _ = registered_jobs := []; match !stop_schedule with | Some stop_schedule -> stop_schedule (); Lwt.return () | None -> Log.warn (fun m -> m "QUEUE: Can not stop schedule"); Lwt.return () let lifecycle = Core.Container.Lifecycle.make "queue" ~dependencies:[ ScheduleService.lifecycle; Log.lifecycle ] ~start ~stop end module Repo = Queue_service_repo
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>