Source file delegationManager.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
open Types
let Log log = Log.mk_log "delegationManager"
type sentence_id = Stateid.t
type link = {
write_to : Unix.file_descr;
read_from: Unix.file_descr;
}
let write_value { write_to; _ } x =
let [@warning "-26"] log _ = () in
let data = Marshal.to_bytes x [] in
let datalength = Bytes.length data in
let writeno = Unix.write write_to data 0 datalength in
assert(writeno = datalength);
flush_all ()
let abort_on_unix_error f x =
try
f x
with Unix.Unix_error(e,f,p) ->
Printf.eprintf "Error: %s: %s: %s\n%!" f p (Unix.error_message e);
exit 3
type job_handle = (Feedback.route_id * sentence_id) * int option ref
module type Job = sig
type t
val name : string
val binary_name : string
val initial_pool_size : int
type update_request
val appendFeedback : Feedback.route_id * sentence_id -> (Feedback.level * Loc.t option * Quickfix.t list * Pp.t) -> update_request
end
let mk_job_handle (rid, sid) : job_handle = (rid, sid), ref None
let cancel_job (_,id) =
match !id with
| None -> ()
| Some pid -> Unix.kill pid 9
let install_feedback send =
Log.feedback_add_feeder_on_Message (fun route span _ lvl loc qf msg ->
send (route,span,(lvl,loc, qf, msg)))
module type Worker = sig
type job_t
type job_update_request
val resize_pool : int -> unit
(** Event for the main loop *)
type delegation
val pr_event : delegation -> Pp.t
type events = delegation Sel.Event.t list
(** handling an event may require an update to a sentence in the exec state,
e.g. when a feedback is received *)
val handle_event : delegation -> (job_update_request option * events)
val worker_available :
jobs:((job_handle * Sel.Event.cancellation_handle * job_t) Queue.t) ->
fork_action:(job_t -> send_back:(job_update_request -> unit) -> unit) ->
feedback_cleanup:(unit -> unit) ->
delegation Sel.Event.t
type options
val parse_options : string list -> options * string list
val setup_plumbing : options -> ((job_update_request -> unit) * job_t)
val log : string -> unit
end
module MakeWorker (Job : Job) = struct
type job_t = Job.t
type job_update_request = Job.update_request
type worker_message =
| Job_update of Job.update_request
| DebugMessage of Log.event
let Log log_worker = Log.mk_log ("worker." ^ Job.name)
let install_feedback_worker ~feedback_cleanup link =
feedback_cleanup ();
ignore(install_feedback (fun (rid,id,fb) -> write_value link (Job.appendFeedback (rid, id) fb)))
type feedback_cleanup = unit -> unit
type delegation =
| WorkerStart : feedback_cleanup * job_handle * 'job * ('job -> send_back:(Job.update_request -> unit) -> unit) * string -> delegation
| WorkerProgress of { link : link; update_request : worker_message }
| WorkerEnd of (int * Unix.process_status)
| WorkerIOError of exn
let pr_event = function
| WorkerEnd _ -> Pp.str "WorkerEnd"
| WorkerIOError _ -> Pp.str "WorkerIOError"
| WorkerProgress _ -> Pp.str "WorkerProgress"
| WorkerStart _ -> Pp.str "WorkerStart"
let install_debug_worker link =
Log.worker_initialization_done
~fwd_event:(fun e -> write_value link (DebugMessage e))
type events = delegation Sel.Event.t list
type role = Master | Worker of link
let pool = Queue.create ()
let () =
assert(Job.initial_pool_size >= 1);
for _i = 0 to Job.initial_pool_size do Queue.push () pool done
let current_pool_size = ref Job.initial_pool_size
let resize_pool new_pool_size =
assert(new_pool_size >= 1);
let delta = !current_pool_size - new_pool_size in
current_pool_size := new_pool_size;
if delta < 0 then for _i = 1 to abs(delta) do Queue.push () pool done;
if delta > 0 then for _i = 1 to abs(delta) do ignore(Queue.take_opt pool) done
;;
let worker_available ~jobs ~fork_action ~feedback_cleanup : delegation Sel.Event.t =
Sel.On.queues jobs pool (fun (job_handle, _, job) () ->
WorkerStart (feedback_cleanup,job_handle,job,fork_action,Job.binary_name))
let worker_ends pid : delegation Sel.Event.t =
Sel.On.death_of ~pid (fun reason -> WorkerEnd(pid,reason))
let worker_progress link : delegation Sel.Event.t =
Sel.On.ocaml_value link.read_from (function
| Error e -> WorkerIOError e
| Ok update_request -> WorkerProgress { link; update_request; })
let accept_timeout ?(timeout=2.0) sr =
let r, _, _ = Unix.select [sr] [] [] timeout in
if r = [] then None
else Some (Unix.accept sr)
let fork_worker : feedback_cleanup:feedback_cleanup -> int option ref -> (role * events, string * events) result = fun ~feedback_cleanup cancellation_handle ->
let open Unix in
try
let chan = socket PF_INET SOCK_STREAM 0 in
bind chan (ADDR_INET (Unix.inet_addr_loopback,0));
listen chan 1;
let address = getsockname chan in
log @@ "forking...";
flush_all ();
let null = openfile "/dev/null" [O_RDWR] 0o640 in
let pid = fork () in
if pid = 0 then begin
dup2 null stdin;
dup2 null stdout;
close chan;
Log.worker_initialization_begins ();
let chan = socket PF_INET SOCK_STREAM 0 in
connect chan address;
let read_from = chan in
let write_to = chan in
let link = { write_to; read_from } in
install_feedback_worker ~feedback_cleanup link;
install_debug_worker link;
log_worker @@ "borning...";
Ok (Worker link, [])
end else
let () = cancellation_handle := Some pid in
match accept_timeout chan with
| None ->
close chan;
log @@ Printf.sprintf "forked pid %d did not connect back" pid;
Unix.kill pid 9;
Error ("worker did not connect back", [worker_ends pid])
| Some (worker, _worker_addr) ->
close chan;
log @@ Printf.sprintf "forked pid %d called back" pid;
let read_from = worker in
let write_to = worker in
let link = { write_to; read_from } in
Ok (Master, [worker_progress link; worker_ends pid])
with Unix_error(e,f,p) ->
Error (f ^": "^ p^": " ^error_message e,[])
;;
let option_name = "-" ^ Str.global_replace (Str.regexp_string " ") "." Job.name ^ "_master_address"
let create_process_worker procname cancellation_handle job =
let open Unix in
try
let chan = socket PF_INET SOCK_STREAM 0 in
bind chan (ADDR_INET (Unix.inet_addr_loopback,0));
listen chan 1;
let port = match getsockname chan with
| ADDR_INET(_,port) -> port
| _ -> assert false in
let null = openfile "/dev/null" [O_RDWR] 0o640 in
let = if CDebug.get_flags () = "all" then [|"-debug"|] else [||] in
let args = Array.append [|procname;option_name;string_of_int port|] extra_flags in
let pid = create_process procname args null stdout stderr in
close null;
let () = cancellation_handle := Some pid in
log @@ Printf.sprintf "created worker %d, waiting on port %d" pid port;
match accept_timeout chan with
| Some(worker, _worker_addr) ->
close chan;
let read_from = worker in
let write_to = worker in
let link = { write_to; read_from } in
install_feedback_worker ~feedback_cleanup:(fun _ -> ()) link;
install_debug_worker link;
log @@ "sending job";
write_value link job;
flush_all ();
log @@ "sent";
Ok [worker_progress link; worker_ends pid]
| None ->
log @@ Printf.sprintf "child process %d did not connect back" pid;
Unix.kill pid 9;
Error ("worker did not connect back", [worker_ends pid])
with Unix_error(e,f,p) ->
Error (f ^": "^ p^": " ^error_message e,[])
let handle_event = function
| WorkerIOError e ->
log @@ "worker IO Error: " ^ Printexc.to_string e;
if Queue.length pool < !current_pool_size then
Queue.push () pool;
(None, [])
| WorkerEnd (pid, _status) ->
log @@ Printf.sprintf "worker %d went on holidays" pid;
if Queue.length pool < !current_pool_size then
Queue.push () pool;
(None,[])
| WorkerProgress { link; update_request = DebugMessage d } ->
Log.handle_event d;
(None, [worker_progress link])
| WorkerProgress { link; update_request = Job_update u } ->
log "worker progress";
(Some u, [worker_progress link])
| WorkerStart (feedback_cleanup, (feedback_route,cancellation_handle),job,action,procname) ->
log "worker starts";
if Sys.os_type = "Unix" then
match fork_worker ~feedback_cleanup cancellation_handle with
| Ok(Master, events) ->
log "worker spawned (fork)";
(None, events)
| Ok(Worker link, _) ->
action job ~send_back:(fun j -> abort_on_unix_error write_value link (Job_update j));
exit 0
| Error(msg, cleanup_events) ->
log @@ "worker did not spawn: " ^ msg;
(Some(Job.appendFeedback feedback_route (Feedback.Error,None,[],Pp.str msg)), cleanup_events)
else
match create_process_worker procname cancellation_handle job with
| Ok events ->
log "worker spawned (create_process)";
(None, events)
| Error(msg, cleanup_events) ->
log @@ "worker did not spawn: " ^ msg;
(Some(Job.appendFeedback feedback_route (Feedback.Error,None,[],Pp.str msg)), cleanup_events)
type options = int
let setup_plumbing port =
try
let open Unix in
let chan = socket PF_INET SOCK_STREAM 0 in
let address = ADDR_INET (inet_addr_loopback,port) in
log_worker @@ "connecting to " ^ string_of_int port;
connect chan address;
let read_from = chan in
let write_to = chan in
let link = { read_from; write_to } in
match Sel.(pop Todo.(add empty [Sel.On.ocaml_value read_from (fun x -> x)])) with
| Ok (job : Job.t), _ -> (write_value link, job)
| Error exn, _ ->
log_worker @@ "error receiving job: " ^ Printexc.to_string exn;
exit 1
with Unix.Unix_error(code,syscall,param) ->
log_worker @@ Printf.sprintf "error starting: %s: %s: %s" syscall param (Unix.error_message code);
exit 1
let parse_options =
match extra_args with
| [ o ; port ] when o = option_name -> int_of_string port, []
| _ ->
Printf.eprintf "unknown arguments: %s" (String.concat " " extra_args);
exit 2
let log = log_worker
end