Source file cluster_worker.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
open Lwt.Infix
open Capnp_rpc_lwt
module Log_data = Log_data
module Process = Process
module Metrics = struct
open Prometheus
let namespace = "ocluster"
let subsystem = "worker"
let jobs_accepted =
let help = "Number of jobs accepted in total" in
Counter.v ~help ~namespace ~subsystem "jobs_accepted_total"
let job_time =
let help = "Time jobs ran for" in
Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds"
let docker_push_time =
let help = "Time uploading to Docker Hub" in
Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds"
let docker_prune_time =
let help = "Time spent pruning Docker cache" in
Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds"
let running_jobs =
let help = "Number of jobs currently running" in
Gauge.v ~help ~namespace ~subsystem "running_jobs"
let healthcheck_time =
let help = "Time to perform last healthcheck" in
Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds"
let unhealthy =
let help = "Number of unhealthy workers" in
Gauge.v ~help ~namespace ~subsystem "unhealthy"
end
let buildkit_env =
let orig = Unix.environment () |> Array.to_list in
"DOCKER_BUILDKIT=1" :: orig |> Array.of_list
let ( >>!= ) = Lwt_result.bind
let ( / ) = Filename.concat
let docker_push_lock = Lwt_mutex.create ()
let read_file path =
let ch = open_in_bin path in
Fun.protect ~finally:(fun () -> close_in ch)
(fun () ->
let len = in_channel_length ch in
really_input_string ch len
)
let min_reconnect_time = 10.0
type job_spec = [
| `Docker of [ `Contents of string | `Path of string ] * Cluster_api.Docker.Spec.options
| `Obuilder of [ `Contents of string ]
| `Custom of Cluster_api.Custom.recv
]
type build =
switch:Lwt_switch.t ->
log:Log_data.t ->
src:string ->
secrets:(string * string) list ->
job_spec ->
(string, [`Cancelled | `Msg of string]) Lwt_result.t
type t = {
name : string;
context : Context.t;
build : build;
healthcheck_period : float;
prune_threshold : float option;
docker_max_df_size : float option;
registration_service : Cluster_api.Raw.Client.Registration.t Sturdy_ref.t;
capacity : int;
mutable in_use : int;
cond : unit Lwt_condition.t;
mutable cancel : unit -> unit;
allow_push : string list;
}
let docker_push ~switch ~log t hash { Cluster_api.Docker.Spec.target; auth } =
let repo = Cluster_api.Docker.Image_id.repo target in
let target = Cluster_api.Docker.Image_id.to_string target in
let pp_auth = Fmt.option (Fmt.using fst (Fmt.fmt " as user %S")) in
Log.info (fun f -> f "Push %S to %S%a" hash target pp_auth auth);
Log_data.info log "Pushing %S to %S%a" hash target pp_auth auth;
if not (List.mem repo t.allow_push) then
Lwt_result.fail (`Msg (Fmt.str "To allow pushing to this repository, start the worker with --allow-push %S" repo))
else (
Lwt_mutex.with_lock docker_push_lock @@ fun () ->
Lwt_io.with_temp_dir ~prefix:"build-worker-" ~suffix:"-docker" @@ fun config_dir ->
let docker args = "docker" :: "--config" :: config_dir :: args in
let tag_and_push () =
Process.check_call ~label:"docker-tag" ~switch ~log @@ docker ["tag"; "--"; hash; target] >>!= fun () ->
Process.check_call ~label:"docker-push" ~switch ~log @@ docker ["push"; "--"; target] >>!= fun () ->
Lwt_process.pread_line ("", [| "docker"; "image"; "inspect"; "-f"; "{{ range index .RepoDigests }}{{ . }} {{ end }}"; "--"; target |]) >>= function
| "" -> Lwt_result.fail (`Msg "Failed to read RepoDigests for newly-pushed image!")
| ids ->
let open Astring in
match List.find_opt (String.is_prefix ~affix:(repo ^ "@")) (String.cuts ~sep:" " ids) with
| Some repo_id -> Lwt_result.return repo_id
| None -> Lwt_result.fail (`Msg (Fmt.str "Can't find target repository '%s@…' in list %S!" repo ids)) in
match auth with
| None -> tag_and_push ()
| Some (user, password) ->
let login_cmd = docker ["login"; "--password-stdin"; "--username"; user] in
Process.exec ~label:"docker-login" ~switch ~log ~stdin:password ~stderr:`Keep login_cmd >>= function
| Error (`Exit_code _) ->
Lwt_result.fail (`Msg (Fmt.str "Failed to docker-login as %S" user))
| Error (`Msg _ | `Cancelled as e) -> Lwt_result.fail e
| Ok () -> tag_and_push ()
)
let build ~switch ~log t descr =
let module R = Cluster_api.Raw.Reader.JobDescr in
let cache_hint = R.cache_hint_get descr in
let secrets = R.secrets_get_list descr |> List.map (fun t -> Cluster_api.Raw.Reader.Secret.(id_get t, value_get t)) in
begin match Cluster_api.Submission.get_action descr with
| Docker_build { dockerfile; options; push_to } ->
Log.info (fun f ->
match dockerfile with
| `Contents contents -> f "Got request to build (%s):\n%s" cache_hint (String.trim contents)
| `Path path -> f "Got request to build %S (%s)" path cache_hint
);
begin
Context.with_build_context t.context ~log descr @@ fun src ->
t.build ~switch ~log ~src ~secrets (`Docker (dockerfile, options)) >>!= fun hash ->
match push_to with
| None -> Lwt_result.return ""
| Some target ->
Prometheus.Summary.time Metrics.docker_push_time Unix.gettimeofday
(fun () -> docker_push ~switch ~log t hash target)
end
| Obuilder_build { spec = `Contents spec } ->
Log.info (fun f ->
f "Got request to build (%s):\n%s" cache_hint (String.trim spec)
);
Context.with_build_context t.context ~log descr @@ fun src ->
t.build ~switch ~log ~src ~secrets (`Obuilder (`Contents spec))
| Custom_build c ->
Log.info (fun f -> f "Got request to build a job of kind \"%s\"" (Cluster_api.Custom.kind c));
Context.with_build_context t.context ~log descr @@ fun src ->
t.build ~switch ~log ~src ~secrets (`Custom c)
end
>|= function
| Error `Cancelled ->
Log_data.write log "Job cancelled\n";
Log.info (fun f -> f "Job cancelled");
Error (`Msg "Build cancelled"), "cancelled"
| Ok output ->
Log_data.write log "Job succeeded\n";
Log.info (fun f -> f "Job succeeded");
Ok output, "ok"
| Error (`Msg msg) ->
Log_data.write log (msg ^ "\n");
Log.info (fun f -> f "Job failed: %s" msg);
Error (`Msg "Build failed"), "fail"
let convert_memory_string s =
let length = String.length s in
match s.[length - 2], s.[length - 1] with
| 'G', 'B' -> float_of_string_opt (String.sub s 0 (length - 2))
| 'M', 'B' ->
Option.map (fun f -> f /. 1_000.) @@ float_of_string_opt (String.sub s 0 (length - 2))
| 'K', 'B' ->
Option.map (fun f -> f /. 1_000_000.) @@ float_of_string_opt (String.sub s 0 (length - 2))
| c, 'B' -> (
match int_of_string_opt (String.make 1 c) with
| Some _ -> Option.map (fun f -> f /. 1_000_000_000.) @@ float_of_string_opt (String.sub s 0 (length - 1))
| None -> None
)
| _ -> None
let check_docker_partition t =
match t.prune_threshold, t.docker_max_df_size with
| None, None -> Lwt_result.return ()
| Some prune_threshold, _ ->
Lwt_process.pread_line("", [| "docker"; "info"; "-f"; "{{.DockerRootDir}}" |]) >|= fun line ->
let trimed_line = String.trim line in
let free_blocks = Df.free_space_percent trimed_line in
let free_files = Df.free_files_percent trimed_line in
Log.info (fun f -> f "Docker partition: %.0f%% free blocks, %.0f%% free files" free_blocks free_files);
if (min free_blocks free_files) < prune_threshold then Error `Disk_space_low
else Ok ()
| _, Some max_df_size ->
Lwt_process.pread_line ("", [| "docker"; "system"; "df"; "--format"; "{{.Size}}" |]) >|= fun line ->
match convert_memory_string line with
| None ->
Log.info (fun f -> f "Failed to calculate max df size from %s" line);
Ok ()
| Some gb ->
Log.info (fun f -> f "Docker images take up %.0f%%GB" gb);
if max_df_size < gb then Error `Disk_space_low
else Ok ()
let rec maybe_prune t queue =
check_docker_partition t >>= function
| Ok () -> Lwt.return_unit
| Error `Disk_space_low ->
Log.info (fun f -> f "Disk-space low. Will finish current jobs and then prune.");
Cluster_api.Queue.set_active queue false >>= fun () ->
let rec drain () =
if t.in_use = 0 then Lwt.return_unit
else Lwt_condition.wait t.cond >>= drain
in
drain () >>= fun () ->
Log.info (fun f -> f "All jobs finished. Pruning…");
Prometheus.Summary.time Metrics.docker_prune_time Unix.gettimeofday
(fun () ->
Lwt_process.exec ("", [| "docker"; "system"; "prune"; "-af" |]) >>= function
| Unix.WEXITED 0 ->
Lwt_process.exec ("", [| "docker"; "builder"; "prune"; "-af" |])
| e -> Lwt.return e
)
>>= function
| Unix.WEXITED 0 ->
begin
check_docker_partition t >>= function
| Ok () ->
Log.info (fun f -> f "Prune complete. Re-activating queue…");
Cluster_api.Queue.set_active queue true
| Error `Disk_space_low ->
Log.warn (fun f -> f "Disk-space still low after pruning! Will retry in one hour.");
Unix.sleep (60 * 60);
maybe_prune t queue
end
| _ ->
Log.warn (fun f -> f "docker prune command failed! Will retry in one hour.");
Unix.sleep (60 * 60);
maybe_prune t queue
let healthcheck obuilder =
let t0 = Unix.gettimeofday () in
Obuilder_build.healthcheck obuilder >|= fun r ->
let t1 = Unix.gettimeofday () in
Prometheus.Gauge.set Metrics.healthcheck_time (t1 -. t0);
Prometheus.Gauge.set Metrics.unhealthy (if Result.is_ok r then 0.0 else 1.0);
r
let check_health t ~last_healthcheck ~queue = function
| None -> Lwt.return_unit
| Some obuilder ->
if t.healthcheck_period = 0.0 || !last_healthcheck +. t.healthcheck_period > Unix.gettimeofday () then Lwt.return_unit
else (
let rec aux ~active =
healthcheck obuilder >>= fun r ->
last_healthcheck := Unix.gettimeofday ();
match r with
| Ok () when active -> Lwt.return_unit
| Ok () ->
Log.info (fun f -> f "System is healthy again; unpausing");
Cluster_api.Queue.set_active queue true
| Error (`Msg m) ->
Log.warn (fun f -> f "Health check failed: %s" m);
begin
if active then Cluster_api.Queue.set_active queue false
else Lwt.return_unit
end >>= fun () ->
Lwt_unix.sleep 60.0 >>= fun () ->
aux ~active:false
in
aux ~active:true
)
let loop ~switch ?obuilder t queue =
let last_healthcheck = ref (Unix.gettimeofday ()) in
let rec loop () =
match switch with
| Some switch when not (Lwt_switch.is_on switch) ->
Log.info (fun f -> f "Builder shutting down (switch turned off)");
Lwt.return `Cancelled
| _ ->
if t.in_use >= t.capacity then (
Log.info (fun f -> f "At capacity. Waiting for a build to finish before requesting more…");
Lwt_condition.wait t.cond >>= loop
) else (
maybe_prune t queue >>= fun () ->
check_health t ~last_healthcheck ~queue obuilder >>= fun () ->
let outcome, set_outcome = Lwt.wait () in
let log = Log_data.create () in
Log.info (fun f -> f "Requesting a new job…");
let switch = Lwt_switch.create () in
let pop =
Capability.with_ref (Cluster_api.Job.local ~switch ~outcome ~stream_log_data:(Log_data.stream log)) @@ fun job ->
Cluster_api.Queue.pop queue job
in
t.cancel <- (fun () -> Lwt.cancel pop);
pop >>= fun request ->
t.in_use <- t.in_use + 1;
Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use);
Prometheus.Counter.inc_one Metrics.jobs_accepted;
Lwt.async (fun () ->
Lwt.finalize
(fun () ->
let t0 = Unix.gettimeofday () in
Lwt.try_bind
(fun () ->
Log_data.info log "Building on %s" t.name;
build ~switch ~log t request
)
(fun (outcome, metric_label) ->
let t1 = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.job_time metric_label) (t1 -. t0);
Log_data.close log;
Lwt.wakeup set_outcome outcome;
Lwt.return_unit)
(fun ex ->
let t1 = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.job_time "error") (t1 -. t0);
Log.warn (fun f -> f "Build failed: %a" Fmt.exn ex);
Log_data.write log (Fmt.str "Uncaught exception: %a@." Fmt.exn ex);
Log_data.close log;
Lwt.wakeup_exn set_outcome ex;
Lwt.return_unit)
)
(fun () ->
t.in_use <- t.in_use - 1;
Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use);
Lwt_switch.turn_off switch >>= fun () ->
Lwt_condition.broadcast t.cond ();
Lwt.return_unit)
);
loop ()
)
in
loop ()
let error_msg fmt =
fmt |> Fmt.kstr @@ fun x -> Error (`Msg x)
let check_contains ~path src =
match Fpath.of_string path with
| Error (`Msg m) -> Error (`Msg m)
| Ok path ->
let path = Fpath.normalize path in
if Fpath.is_abs path then error_msg "%a is an absolute path!" Fpath.pp path
else (
let rec aux ~src = function
| [] -> error_msg "Empty path!"
| x :: _ when Fpath.is_rel_seg x -> error_msg "Relative segment in %a" Fpath.pp path
| "" :: _ -> error_msg "Empty segment in %a!" Fpath.pp path
| x :: xs ->
let src = src / x in
match Unix.lstat src with
| Unix.{ st_kind = S_DIR; _ } -> aux ~src xs
| Unix.{ st_kind = S_REG; _ } when xs = [] -> Ok src
| _ -> error_msg "%S is not a directory (in %a)" x Fpath.pp path
| exception Unix.Unix_error(Unix.ENOENT, _, _) -> error_msg "%S does not exist (in %a)" x Fpath.pp path
in
aux ~src (Fpath.segs path)
)
let write_to_file ~path data =
Lwt_io.(with_file ~mode:output) ~flags:Unix.[O_TRUNC; O_CREAT; O_RDWR] path @@ fun ch ->
Lwt_io.write_from_string_exactly ch data 0 (String.length data)
let create_secret_file value =
let file = Filename.temp_file "build-worker-" ".secret" in
write_to_file ~path:file value >|= fun () -> file
let try_unlink file =
if Sys.file_exists file then Lwt_unix.unlink file
else Lwt.return_unit
let default_build ?obuilder ~switch ~log ~src ~secrets = function
| `Docker (dockerfile, options) ->
let iid_file = Filename.temp_file "build-worker-" ".iid" in
Lwt_list.map_p (fun (id, value) -> create_secret_file value >|= fun fname -> id, fname) secrets >>= fun secret_files ->
Lwt.finalize
(fun () ->
begin
match dockerfile with
| `Contents contents ->
let path = src / "Dockerfile" in
write_to_file ~path contents >>= fun () ->
Lwt_result.return path
| `Path "-" -> Lwt_result.fail (`Msg "Path cannot be '-'!")
| `Path path ->
match check_contains ~path src with
| Ok path -> Lwt_result.return path
| Error e -> Lwt_result.fail e
end >>!= fun dockerpath ->
let { Cluster_api.Docker.Spec.build_args; squash; buildkit; include_git = _ } = options in
let args =
List.concat_map (fun x -> ["--build-arg"; x]) build_args
@ (if squash then ["--squash"] else [])
@ (List.map (fun (id, fname) -> ["--secret"; Fmt.str "id=%s,src=%s" id fname]) secret_files |> List.flatten)
@ ["--pull"; "--iidfile"; iid_file; "-f"; dockerpath; src]
in
Log.info (fun f -> f "docker build @[%a@]" Fmt.(list ~sep:sp (quote string)) args);
let env = if buildkit then Some buildkit_env else None in
Process.check_call ~label:"docker-build" ?env ~switch ~log ("docker" :: "build" :: args) >>!= fun () ->
Lwt_result.return (String.trim (read_file iid_file))
)
(fun () -> try_unlink iid_file >>= fun () ->
secret_files |> List.map snd |> Lwt_list.iter_p try_unlink
)
| `Obuilder (`Contents spec) -> begin
let spec = Obuilder.Spec.t_of_sexp (Sexplib.Sexp.of_string spec) in
match obuilder with
| None -> Fmt.failwith "This worker is not configured for use with OBuilder!"
| Some builder -> Obuilder_build.build builder ~switch ~log ~spec ~src_dir:src ~secrets
end
| `Custom c ->
Log.warn (fun f -> f "The default cluster_worker build does not support any custom jobs (kind: %s)" (Cluster_api.Custom.kind c));
Lwt.return @@ Error (`Msg "Unsupported custom job")
let collect_external_metric uri =
Lwt.catch
(fun () ->
Cohttp_lwt_unix.Client.get uri >>= fun (resp, body) ->
match Cohttp.Response.status resp with
| `OK ->
begin match Cohttp.Header.get (Cohttp.Response.headers resp) "content-type" with
| Some content_type ->
body |> Cohttp_lwt.Body.to_string >|= fun body ->
Ok (content_type, body)
| None ->
Lwt.return @@ Fmt.error_msg "Missing Content-Type in HTTP response from prometheus-node-exporter"
end
| code ->
Log.warn (fun f -> f "prometheus-node-exporter: %s" (Cohttp.Code.string_of_status code));
Lwt.return @@ Fmt.error_msg "prometheus-node-exporter: %s" (Cohttp.Code.string_of_status code)
)
(fun ex ->
Log.warn (fun f -> f "Failed to connect to prometheus-node-exporter: %a" Fmt.exn ex);
Lwt.return @@ Fmt.error_msg "Failed to connect to prometheus-node-exporter"
)
let metrics = function
| `Agent ->
Prometheus.CollectorRegistry.(collect default) >>= fun data ->
let content_type = "text/plain; version=0.0.4; charset=utf-8" in
Lwt_result.return (content_type, Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output data)
| `Host ->
collect_external_metric (Uri.of_string "http://127.0.0.1:9100/metrics")
let collect_additional_metrics metrics s =
match List.assoc_opt s metrics with
| None -> Lwt.return (Ok None)
| Some uri ->
collect_external_metric uri >>= function Ok (content_type, data) ->
Lwt.return @@ Ok (Some (content_type, data))
| Error _ as e -> Lwt.return e
let self_update ~update t =
Lwt.catch
(fun () ->
Log.info (fun f -> f "Self-update requested.");
update () >>= fun finish ->
Log.info (fun f -> f "Waiting for all existing jobs to complete.");
let rec drain () =
if t.in_use = 0 then Lwt.return_unit
else Lwt_condition.wait t.cond >>= drain
in
drain () >>= fun () ->
Log.info (fun f -> f "All jobs finished. Updating…");
Lwt_unix.sleep 1.0 >>= fun () ->
finish () >>= fun () ->
Lwt_unix.sleep 1.0 >>= fun () ->
exit 1
)
(fun ex ->
Lwt_result.fail (`Msg (Printexc.to_string ex))
)
let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
begin match prune_threshold, docker_max_df_size with
| None, None -> Log.info (fun f -> f "Prune threshold not set and docker max df size is not. Will not check for low disk-space!")
| None, Some size -> Log.info (fun f -> f "Pruning docker whenever the memory used exceeds %3.2fGB" size)
| Some frac, _ when frac < 0.0 || frac > 100.0 -> Fmt.invalid_arg "prune_threshold must be in the range 0 to 100"
| Some _, _ -> ()
end;
begin match obuilder with
| None -> Lwt.return_none
| Some config -> Obuilder_build.create ?prune_threshold:obuilder_prune_threshold config >|= Option.some
end >>= fun obuilder ->
let build =
match build with
| Some x -> x
| None -> default_build ?obuilder
in
let t = {
name;
context = Context.v ~state_dir;
healthcheck_period;
prune_threshold;
docker_max_df_size;
registration_service;
build;
cond = Lwt_condition.create ();
capacity;
in_use = 0;
cancel = ignore;
allow_push;
} in
Lwt_switch.add_hook_or_exec switch (fun () ->
Log.info (fun f -> f "Switch turned off. Will shut down.");
t.cancel ();
Lwt_condition.broadcast t.cond ();
Lwt.return_unit
)
>>= fun () ->
let rec reconnect () =
let connect_time = Unix.gettimeofday () in
Lwt.catch
(fun () ->
Sturdy_ref.connect_exn t.registration_service >>= fun reg ->
Capability.with_ref reg @@ fun reg ->
let queue =
let additional_metric s = collect_additional_metrics additional_metrics s in
let api = Cluster_api.Worker.local ~additional_metric ~metrics ~self_update:(fun () -> self_update ~update t) () in
let queue = Cluster_api.Registration.register reg ~name ~capacity api in
Capability.dec_ref api;
queue
in
Capability.with_ref queue @@ fun queue ->
Lwt.catch
(fun () -> loop ~switch ?obuilder t queue)
(fun ex ->
Lwt.pause () >>= fun () ->
match Capability.problem queue, switch with
| _, Some switch when not (Lwt_switch.is_on switch) -> Lwt.return `Cancelled
| Some problem, _ ->
Log.info (fun f -> f "Worker loop failed (probably because queue connection failed): %a" Fmt.exn ex);
Lwt.fail (Failure (Fmt.to_to_string Capnp_rpc.Exception.pp problem))
| None, _ ->
Lwt.return (`Crash ex)
)
)
(fun ex ->
let delay = max 0.0 (connect_time +. min_reconnect_time -. Unix.gettimeofday ()) in
Log.info (fun f -> f "Lost connection to scheduler (%a). Will retry in %.1fs…" Fmt.exn ex delay);
Lwt_unix.sleep delay >>= reconnect
)
in
reconnect () >>= function
| `Cancelled -> Lwt.return_unit
| `Crash ex -> Lwt.fail ex
module Obuilder_config = Obuilder_build.Config