package devkit
Development kit - general purpose library
Install
Dune Dependency
Authors
Maintainers
Sources
devkit-1.20240429.tbz
sha256=222f8ac131b1d970dab7eeb2714bfd6b9338b88b1082e6e01c136ae19e7eaef4
sha512=c9e6d93e3d21e5530c0f4d5baca51bf1f0a5d19248f8af7678d0665bb5cdf295d7aaaaa3e50eb2e44b8720e55097cc675af4dc8ec45acf9da39feb3eae1405d5
doc/src/devkit.core/parallel.ml.html
Source file parallel.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
open Printf open ExtLib open Prelude let log = Log.from "parallel" module type WorkerT = sig type task type result end module type Workers = sig type task type result type t val create : (task -> result) -> int -> t val perform : t -> ?autoexit:bool -> task Enum.t -> (result -> unit) -> unit val stop : ?wait:int -> t -> unit end (** @return list of reaped and live pids *) let reap l = let open Unix in List.partition (fun pid -> try pid = fst (waitpid [WNOHANG] pid) with | Unix_error (ECHILD,_,_) -> true (* exited *) | exn -> log #warn ~exn "Worker PID %d lost (wait)" pid; true) l let hard_kill1 pid = let open Unix in try kill pid Sys.sigkill; log #warn "Worker PID %d killed with SIGKILL" pid with | Unix_error (ESRCH,_,_) -> () | exn -> log #warn ~exn "Worker PID %d (SIGKILL)" pid let hard_kill l = let (_,live) = reap l in List.iter hard_kill1 live let killall signo pids = pids |> List.iter begin fun pid -> try Unix.kill pid signo with exn -> log #warn ~exn "PID %d lost (trying to send signal %d)" pid signo end let do_stop ?wait pids = let rec reap_loop timeout l = let (_,live) = reap l in match timeout, live with | _, [] -> `Done | Some 0, l -> hard_kill l; `Killed (List.length l) | _, l -> Nix.sleep 1.; reap_loop (Option.map pred timeout) l in killall Sys.sigterm pids; reap_loop wait pids module Forks(T:WorkerT) = struct type task = T.task type result = T.result type instance = { mutable ch : (in_channel * out_channel) option; pid : int; } type t = { mutable running : instance list; execute : (task -> result); mutable alive : bool; mutable gone : int; } let worker (execute : task -> result) = let main_read, child_write = Unix.pipe () in let child_read, main_write = Unix.pipe () in match Nix.fork () with | `Child -> (* child *) Unix.close main_read; Unix.close main_write; Unix.set_close_on_exec child_read; Unix.set_close_on_exec child_write; let output = Unix.out_channel_of_descr child_write in let input = Unix.in_channel_of_descr child_read in let rec loop () = match Nix.restart (fun () -> ExtUnix.All.(poll [| child_read, Poll.(pollin + pollpri); |] (-1.))) () with | [] | _ :: _ :: _ -> assert false | [ _fd, revents; ] -> assert (not (ExtUnix.All.Poll.(is_set revents pollpri))); assert (ExtUnix.All.Poll.(is_inter revents (pollin + pollhup))); match (Marshal.from_channel input : task) with | exception End_of_file -> () | exception exn -> log #error ~exn "Parallel.worker failed to unmarshal task" | v -> let r = execute v in Marshal.to_channel output (r : result) []; flush output; loop () in begin try loop () with exn -> log #error ~exn ~backtrace:true "Parallel.worker aborting on uncaught exception" end; close_in_noerr input; close_out_noerr output; exit 0 | `Forked pid -> Unix.close child_read; Unix.close child_write; (* prevent sharing these pipes with other children *) Unix.set_close_on_exec main_write; Unix.set_close_on_exec main_read; let cout = Unix.out_channel_of_descr main_write in let cin = Unix.in_channel_of_descr main_read in { ch = Some (cin, cout); pid; } let create execute n = let running = List.init n (fun _ -> worker execute) in { running; execute; alive=true; gone=0; } let close_ch w = match w.ch with | Some (cin,cout) -> w.ch <- None; close_in_noerr cin; close_out_noerr cout | None -> () let stop ?wait t = let gone () = if t.gone = 0 then "" else sprintf " (%d workers vanished)" t.gone in log #info "Stopping %d workers%s" (List.length t.running) (gone ()); t.alive <- false; let l = t.running |> List.map (fun w -> close_ch w; w.pid) in Nix.sleep 0.1; (* let idle workers detect EOF and exit peacefully (frequent io-in-signal-handler deadlock problem) *) t.running <- []; match do_stop ?wait l with | `Done -> log #info "Stopped %d workers properly%s" (List.length l) (gone ()) | `Killed killed -> log #info "Timeouted, killing %d (of %d) workers with SIGKILL%s" killed (List.length l) (gone ()) let perform t ?(autoexit=false) tasks finish = match t.running with | [] -> Enum.iter (fun x -> finish (t.execute x)) tasks (* no workers *) | _ -> let workers = ref 0 in t.running |> List.iter begin fun w -> match w.ch with | None -> () | Some (_,cout) -> match Enum.get tasks with | None -> () | Some x -> incr workers; Marshal.to_channel cout (x : task) []; flush cout end; (* Printf.printf "workers %u\n%!" !workers; *) let events = ExtUnix.All.Poll.(pollin + pollpri) in while !workers > 0 && t.alive do let fds = List.filter_map (function {ch=Some (cin,_); _} -> Some (Unix.descr_of_in_channel cin) | _ -> None) t.running in let r = Nix.restart (fun () -> ExtUnix.All.poll (Array.of_list (List.map (fun fd -> fd, events) fds)) (-1.)) () in assert (not (List.exists (fun (_fd, revents) -> ExtUnix.All.Poll.(is_set revents pollpri)) r)); let channels = r |> List.map (fun (fd, _revents) -> t.running |> List.find (function {ch=Some (cin,_); _} -> Unix.descr_of_in_channel cin = fd | _ -> false)) in let answers = channels |> List.filter_map begin fun w -> match w.ch with | None -> None | Some (cin,cout) -> try match (Marshal.from_channel cin : result) with | exception exn -> log #warn ~exn "no result from PID %d" w.pid; t.gone <- t.gone + 1; decr workers; (* close pipes and forget dead child, do not reap zombie so that premature exit is visible in process list *) close_ch w; t.running <- List.filter (fun w' -> w'.pid <> w.pid) t.running; None | answer -> begin match Enum.get tasks with | None -> if autoexit then close_ch w; decr workers | Some x -> Marshal.to_channel cout (x : task) []; flush cout; end; Some answer with | exn -> log #warn ~exn "perform (from PID %d)" w.pid; decr workers; None end in List.iter finish answers; done; match t.gone with | 0 -> log #info "Finished" | n -> log #warn "Finished, %d workers vanished" n end let invoke (f : 'a -> 'b) x : unit -> 'b = let input, output = Unix.pipe() in match Nix.fork () with | exception _ -> Unix.close input; Unix.close output; (let v = f x in fun () -> v) | `Child -> Unix.close input; let output = Unix.out_channel_of_descr output in Marshal.to_channel output (try `Res(f x) with e -> `Exn e) []; close_out output; exit 0 | `Forked pid -> Unix.close output; let input = Unix.in_channel_of_descr input in fun () -> let v = Marshal.from_channel input in ignore (Nix.restart (Unix.waitpid []) pid); close_in input; match v with `Res x -> x | `Exn e -> raise e (* (* example *) open Printf module W = Workers(struct type task = string type result = string list end) let execute s = for i = 1 to 100_000 do Thread.delay 0. done; printf "%u : %s\n%!" (Unix.getpid()) s; [s;s;s;s] let () = let workers = W.create execute 4 in print_endline "go"; let e = Enum.init 100 (sprintf "<%u>") in let f l = printf "got [%s]\n%!" (Stre.list Prelude.id l) in for i = 1 to 2 do W.perform workers (Enum.clone e) f; Thread.delay 1. done; print_endline "Done" *) let rec launch_forks f = function | [] -> () | x::xs -> match Nix.fork () with | `Child -> f x | `Forked _ -> launch_forks f xs (** keep the specifed number of workers running *) let run_forks_simple ?(revive=false) ?wait_stop f args = let workers = Hashtbl.create 1 in let launch f x = match Nix.fork () with | `Child -> let () = try f x with exn -> log #error ~exn ~backtrace:true "worker failed" in exit 0 | `Forked pid -> Hashtbl.add workers pid x; pid in args |> List.iter (fun x -> let (_:int) = launch f x in ()); let pids () = Hashtbl.keys workers |> List.of_enum in let rec loop pause = Nix.sleep pause; let total = Hashtbl.length workers in if total = 0 && not revive then log #info "All workers dead, stopping" else match Daemon.should_exit () with | true -> log #info "Stopping %d workers" total; begin match do_stop ?wait:wait_stop (Hashtbl.keys workers |> List.of_enum) with | `Done -> log #info "Stopped %d workers" total | `Killed n -> log #info "Killed %d (of %d) workers with SIGKILL" n total end | false -> let (dead,_live) = reap (pids ()) in match dead with | [] -> loop (max 1. (pause /. 2.)) | dead when revive -> let pause = min 10. (pause *. 1.5) in dead |> List.iter begin fun pid -> match Hashtbl.find workers pid with | exception Not_found -> log #warn "WUT? Not my worker %d" pid | x -> Hashtbl.remove workers pid; match launch f x with | exception exn -> log #error ~exn "restart" | pid' -> log #info "worker %d exited, replaced with %d" pid pid'; end; loop pause | dead -> log #info "%d child workers exited (PIDs: %s)" (List.length dead) (Stre.list string_of_int dead); List.iter (Hashtbl.remove workers) dead; loop pause in loop 1. let run_workers_enum workers ?wait_stop (type t) (type u) (f : t -> u) (g : u -> unit) enum = assert (workers > 0); let module Worker = struct type task = t type result = u end in let module W = Forks(Worker) in let worker x = (* sane signal handler FIXME restore? *) Signal.set_exit Daemon.signal_exit; f x in let proc = W.create worker workers in Nix.handle_sig_exit_with ~exit:true (fun () -> W.stop ?wait:wait_stop proc); (* FIXME: output in signal handler *) W.perform ~autoexit:true proc enum g; W.stop proc let run_workers workers ?wait_stop (type t) (f : t -> unit) l = run_workers_enum workers ?wait_stop f id (List.enum l) let run_forks ?wait_stop ?revive ?wait ?workers (type t) (f : t -> unit) l = let wait_stop = if wait_stop = None then wait else wait_stop in match workers with | None -> run_forks_simple ?wait_stop ?revive f l | Some n -> run_workers n ?wait_stop f l let run_forks' f l = match l with | [] -> () | [x] -> f x | l -> run_forks f l module Services = struct type t = { mutable pids : int list; work : int -> unit Lwt.t; } let start n work = let rec start_forked i = if i >= n then Lwt.return_nil else ( match Nix.fork () with | `Child -> let%lwt () = work i in exit 0 | `Forked pid -> log#debug "Starting worker %d with pid %d" i pid; Lwt.map (fun pids -> pid :: pids) (start_forked (i + 1)) ) in Lwt.map (fun pids -> { pids; work }) (start_forked 0) let wait pid = try%lwt Lwt.map fst (Lwt_unix.waitpid [] pid) with | Unix.Unix_error (ECHILD, _, _) -> Lwt.return pid | exn -> log#warn ~exn "Worker PID %d lost (wait)" pid; Lwt.return pid let kill ~timeout pid = let graceful = Unix.kill pid Sys.sigterm; let%lwt _ = wait pid in log#debug "Worker PID %d killed with SIGTERM" pid; Lwt.return_unit in let ungraceful = let%lwt () = Lwt_unix.sleep timeout in hard_kill1 pid; Lwt.return_unit in Lwt.pick [ graceful; ungraceful ] let rolling_restart ?wait ~timeout workers = let%lwt pids = Lwt_list.mapi_s begin fun i pid -> log#debug "Restarting worker %d with PID %d\n%!" i pid; let%lwt () = kill ~timeout pid in Option.may Unix.sleep wait; match Nix.fork () with | `Child -> let%lwt () = workers.work i in exit 0 | `Forked pid' -> log#debug "Worker %d started with PID %d\n%!" i pid'; Lwt.return pid' end workers.pids in workers.pids <- pids; Lwt.return_unit let stop ~timeout { pids; _ } = log#info "Stopping workers"; Lwt_list.iteri_p begin fun i pid -> log#debug "Stopping worker %d with PID %d" i pid; kill ~timeout pid end pids end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>