package sihl

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file queue_service.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
open Base
open Lwt.Syntax
module Job = Queue_core.Job
module WorkableJob = Queue_core.WorkableJob
module JobInstance = Queue_core.JobInstance
module Sig = Queue_service_sig

let registered_jobs : WorkableJob.t list ref = ref []

let stop_schedule : (unit -> unit) option ref = ref None

module MakePolling
    (Log : Log.Service.Sig.SERVICE)
    (ScheduleService : Schedule.Service.Sig.SERVICE)
    (Repo : Sig.REPO) : Sig.SERVICE = struct
  let dispatch ctx ~job ?delay input =
    let name = Job.name job in
    Log.debug (fun m -> m "QUEUE: Dispatching job %s" name);
    let now = Ptime_clock.now () in
    let job_instance = JobInstance.create ~input ~delay ~now job in
    Repo.enqueue ctx ~job_instance

  let run_job ctx input ~job ~job_instance =
    let job_instance_id = JobInstance.id job_instance in
    let* result =
      Lwt.catch
        (fun () -> WorkableJob.work job ctx ~input)
        (fun exn ->
          let exn_string = Exn.to_string exn in
          Lwt.return
          @@ Error
               ( "Exception caught while running job, this is a bug in your \
                  job handler, make sure to not throw exceptions " ^ exn_string
               ))
    in
    match result with
    | Error msg -> (
        Log.err (fun m ->
            m "QUEUE: Failure while running job instance %a %s" JobInstance.pp
              job_instance msg);
        let* result =
          Lwt.catch
            (fun () -> WorkableJob.failed job ctx)
            (fun exn ->
              let exn_string = Exn.to_string exn in
              Lwt.return
              @@ Error
                   ( "Exception caught while cleaning up job, this is a bug in \
                      your job failure handler, make sure to not throw \
                      exceptions " ^ exn_string ))
        in
        match result with
        | Error msg ->
            Log.err (fun m ->
                m
                  "QUEUE: Failure while run failure handler for job instance \
                   %a %s"
                  JobInstance.pp job_instance msg);
            Lwt.return None
        | Ok () ->
            Log.err (fun m ->
                m "QUEUE: Clean up job %a" Uuidm.pp job_instance_id);
            Lwt.return None )
    | Ok () ->
        Log.debug (fun m ->
            m "QUEUE: Successfully ran job instance %a" Uuidm.pp job_instance_id);
        Lwt.return @@ Some ()

  let update ctx ~job_instance = Repo.update ctx ~job_instance

  let work_job ctx ~job ~job_instance =
    let now = Ptime_clock.now () in
    if JobInstance.should_run ~job_instance ~now then
      let input_string = JobInstance.input job_instance in
      let* job_run_status = run_job ctx input_string ~job ~job_instance in
      let job_instance =
        job_instance |> JobInstance.incr_tries
        |> JobInstance.update_next_run_at job
      in
      let job_instance =
        match job_run_status with
        | None ->
            if JobInstance.tries job_instance >= WorkableJob.max_tries job then
              JobInstance.set_failed job_instance
            else job_instance
        | Some () -> JobInstance.set_succeeded job_instance
      in
      update ctx ~job_instance
    else (
      Log.debug (fun m ->
          m "QUEUE: Not going to run job instance %a" JobInstance.pp
            job_instance);
      Lwt.return () )

  let work_queue ctx ~jobs =
    let* pending_job_instances = Repo.find_workable ctx in
    let n_job_instances = List.length pending_job_instances in
    if n_job_instances > 0 then (
      Log.debug (fun m ->
          m "QUEUE: Start working queue of length %d"
            (List.length pending_job_instances));

      let rec loop job_instances jobs =
        match job_instances with
        | [] -> Lwt.return ()
        | job_instance :: job_instances -> (
            let job =
              List.find jobs ~f:(fun job ->
                  job |> WorkableJob.name
                  |> String.equal (JobInstance.name job_instance))
            in
            match job with
            | None -> loop job_instances jobs
            | Some job -> work_job ctx ~job ~job_instance )
      in
      let* () = loop pending_job_instances jobs in
      Log.debug (fun m -> m "QUEUE: Finish working queue");
      Lwt.return () )
    else Lwt.return ()

  let register_jobs _ ~jobs =
    let jobs_to_register = jobs |> List.map ~f:WorkableJob.of_job in
    registered_jobs := List.concat [ !registered_jobs; jobs_to_register ];
    Lwt.return ()

  let start_queue ctx =
    Log.debug (fun m -> m "QUEUE: Start job queue");
    (* This function run every second, the request context gets created here with each tick *)
    let scheduled_function () =
      let jobs = !registered_jobs in
      if List.length jobs > 0 then (
        let job_strings =
          jobs |> List.map ~f:WorkableJob.name |> String.concat ~sep:", "
        in
        Logs.debug (fun m ->
            m "QUEUE: Run job queue with registered jobs: %s" job_strings);
        (* Combine all context middleware functions of registered jobs to get the context the jobs run with*)
        let combined_context_fn =
          jobs
          |> List.map ~f:WorkableJob.with_context
          |> List.fold ~init:Fn.id ~f:Fn.compose
        in
        let ctx = combined_context_fn Core.Ctx.empty in
        work_queue ctx ~jobs )
      else (
        Logs.debug (fun m ->
            m "QUEUE: No jobs found to run, trying again later");
        Lwt.return () )
    in

    let schedule =
      Schedule.create Schedule.every_second ~f:scheduled_function
        ~label:"job_queue"
    in
    stop_schedule := Some (ScheduleService.schedule ctx schedule);
    Lwt.return ()

  let start ctx =
    Repo.register_migration ();
    Repo.register_cleaner ();
    start_queue ctx |> Lwt.map (fun () -> ctx)

  let stop _ =
    registered_jobs := [];
    match !stop_schedule with
    | Some stop_schedule ->
        stop_schedule ();
        Lwt.return ()
    | None ->
        Log.warn (fun m -> m "QUEUE: Can not stop schedule");
        Lwt.return ()

  let lifecycle =
    Core.Container.Lifecycle.make "queue"
      ~dependencies:[ ScheduleService.lifecycle; Log.lifecycle ]
      ~start ~stop
end

module Repo = Queue_service_repo
OCaml

Innovation. Community. Security.