package devkit
Development kit - general purpose library
Install
Dune Dependency
Authors
Maintainers
Sources
devkit-1.3.tbz
sha256=dae965685dceed47ad8e9844f12fe707dafdf2c3bdd46d0431d5b4d1e7754b23
sha512=b94ade804d751db87434042bbaa821fa8e82e233820a76806f910e2da040094b137e88a3579911a1626930912622b064c776ddbcb6991fb7111021ebf6553fdc
doc/src/devkit.core/parallel.ml.html
Source file parallel.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
open Printf open ExtLib open Prelude let log = Log.from "parallel" module type WorkerT = sig type task type result end module type Workers = sig type task type result type t val create : (task -> result) -> int -> t val perform : t -> ?autoexit:bool -> task Enum.t -> (result -> unit) -> unit val stop : ?wait:int -> t -> unit end (** @return list of reaped and live pids *) let reap l = let open Unix in List.partition (fun pid -> try pid = fst (waitpid [WNOHANG] pid) with | Unix_error (ECHILD,_,_) -> true (* exited *) | exn -> log #warn ~exn "Worker PID %d lost (wait)" pid; true) l let hard_kill l = let open Unix in let (_,live) = reap l in live |> List.iter begin fun pid -> try kill pid Sys.sigkill; log #warn "Worker PID %d killed with SIGKILL" pid with | Unix_error (ESRCH,_,_) -> () | exn -> log #warn ~exn "Worker PID %d (SIGKILL)" pid end let killall signo pids = pids |> List.iter begin fun pid -> try Unix.kill pid signo with exn -> log #warn ~exn "PID %d lost (trying to send signal %d)" pid signo end let do_stop ?wait pids = let rec reap_loop timeout l = let (_,live) = reap l in match timeout, live with | _, [] -> `Done | Some 0, l -> hard_kill l; `Killed (List.length l) | _, l -> Nix.sleep 1.; reap_loop (Option.map pred timeout) l in killall Sys.sigterm pids; reap_loop wait pids module Forks(T:WorkerT) = struct type task = T.task type result = T.result type instance = { mutable ch : (in_channel * out_channel) option; pid : int; } type t = { mutable running : instance list; execute : (task -> result); mutable alive : bool; mutable gone : int; } let worker (execute : task -> result) = let main_read, child_write = Unix.pipe () in let child_read, main_write = Unix.pipe () in match Nix.fork () with | `Child -> (* child *) Unix.close main_read; Unix.close main_write; Unix.set_close_on_exec child_read; Unix.set_close_on_exec child_write; let output = Unix.out_channel_of_descr child_write in let input = Unix.in_channel_of_descr child_read in let rec loop () = match Nix.restart (fun () -> ExtUnixAll.(poll [| child_read, Poll.(pollin + pollpri); |] (-1.))) () with | [] | _ :: _ :: _ -> assert false | [ _fd, revents; ] -> assert (not (ExtUnixAll.Poll.(is_set revents pollpri))); assert (ExtUnixAll.Poll.(is_inter revents (pollin + pollhup))); match (Marshal.from_channel input : task) with | exception End_of_file -> () | exception exn -> log #error ~exn "Parallel.worker failed to unmarshal task" | v -> let r = execute v in Marshal.to_channel output (r : result) []; flush output; loop () in begin try loop () with exn -> log #error ~exn ~backtrace:true "Parallel.worker aborting on uncaught exception" end; close_in_noerr input; close_out_noerr output; exit 0 | `Forked pid -> Unix.close child_read; Unix.close child_write; (* prevent sharing these pipes with other children *) Unix.set_close_on_exec main_write; Unix.set_close_on_exec main_read; let cout = Unix.out_channel_of_descr main_write in let cin = Unix.in_channel_of_descr main_read in { ch = Some (cin, cout); pid; } let create execute n = let running = List.init n (fun _ -> worker execute) in { running; execute; alive=true; gone=0; } let close_ch w = match w.ch with | Some (cin,cout) -> w.ch <- None; close_in_noerr cin; close_out_noerr cout | None -> () let stop ?wait t = let gone () = if t.gone = 0 then "" else sprintf " (%d workers vanished)" t.gone in log #info "Stopping %d workers%s" (List.length t.running) (gone ()); t.alive <- false; let l = t.running |> List.map (fun w -> close_ch w; w.pid) in Nix.sleep 0.1; (* let idle workers detect EOF and exit peacefully (frequent io-in-signal-handler deadlock problem) *) t.running <- []; match do_stop ?wait l with | `Done -> log #info "Stopped %d workers properly%s" (List.length l) (gone ()) | `Killed killed -> log #info "Timeouted, killing %d (of %d) workers with SIGKILL%s" killed (List.length l) (gone ()) let perform t ?(autoexit=false) tasks finish = match t.running with | [] -> Enum.iter (fun x -> finish (t.execute x)) tasks (* no workers *) | _ -> let workers = ref 0 in t.running |> List.iter begin fun w -> match w.ch with | None -> () | Some (_,cout) -> match Enum.get tasks with | None -> () | Some x -> incr workers; Marshal.to_channel cout (x : task) []; flush cout end; (* Printf.printf "workers %u\n%!" !workers; *) let events = ExtUnixAll.Poll.(pollin + pollpri) in while !workers > 0 && t.alive do let fds = List.filter_map (function {ch=Some (cin,_); _} -> Some (Unix.descr_of_in_channel cin) | _ -> None) t.running in let r = Nix.restart (fun () -> ExtUnixAll.poll (Array.of_list (List.map (fun fd -> fd, events) fds)) (-1.)) () in assert (not (List.exists (fun (_fd, revents) -> ExtUnixAll.Poll.(is_set revents pollpri)) r)); let channels = r |> List.map (fun (fd, _revents) -> t.running |> List.find (function {ch=Some (cin,_); _} -> Unix.descr_of_in_channel cin = fd | _ -> false)) in let answers = channels |> List.filter_map begin fun w -> match w.ch with | None -> None | Some (cin,cout) -> try match (Marshal.from_channel cin : result) with | exception exn -> log #warn ~exn "no result from PID %d" w.pid; t.gone <- t.gone + 1; decr workers; (* close pipes and forget dead child, do not reap zombie so that premature exit is visible in process list *) close_ch w; t.running <- List.filter (fun w' -> w'.pid <> w.pid) t.running; None | answer -> begin match Enum.get tasks with | None -> if autoexit then close_ch w; decr workers | Some x -> Marshal.to_channel cout (x : task) []; flush cout; end; Some answer with | exn -> log #warn ~exn "perform (from PID %d)" w.pid; decr workers; None end in List.iter finish answers; done; match t.gone with | 0 -> log #info "Finished" | n -> log #warn "Finished, %d workers vanished" n end let invoke (f : 'a -> 'b) x : unit -> 'b = let input, output = Unix.pipe() in match Nix.fork () with | exception _ -> Unix.close input; Unix.close output; (let v = f x in fun () -> v) | `Child -> Unix.close input; let output = Unix.out_channel_of_descr output in Marshal.to_channel output (try `Res(f x) with e -> `Exn e) []; close_out output; exit 0 | `Forked pid -> Unix.close output; let input = Unix.in_channel_of_descr input in fun () -> let v = Marshal.from_channel input in ignore (Nix.restart (Unix.waitpid []) pid); close_in input; match v with `Res x -> x | `Exn e -> raise e (* (* example *) open Printf module W = Workers(struct type task = string type result = string list end) let execute s = for i = 1 to 100_000 do Thread.delay 0. done; printf "%u : %s\n%!" (Unix.getpid()) s; [s;s;s;s] let () = let workers = W.create execute 4 in print_endline "go"; let e = Enum.init 100 (sprintf "<%u>") in let f l = printf "got [%s]\n%!" (Stre.list Prelude.id l) in for i = 1 to 2 do W.perform workers (Enum.clone e) f; Thread.delay 1. done; print_endline "Done" *) let rec launch_forks f = function | [] -> () | x::xs -> match Nix.fork () with | `Child -> f x | `Forked _ -> launch_forks f xs (** keep the specifed number of workers running *) let run_forks_simple ?(revive=false) ?wait_stop f args = let workers = Hashtbl.create 1 in let launch f x = match Nix.fork () with | `Child -> let () = try f x with exn -> log #warn ~exn "worker failed" in exit 0 | `Forked pid -> Hashtbl.add workers pid x; pid in args |> List.iter (fun x -> let (_:int) = launch f x in ()); let pids () = Hashtbl.keys workers |> List.of_enum in let rec loop pause = Nix.sleep pause; let total = Hashtbl.length workers in if total = 0 && not revive then log #info "All workers dead, stopping" else match Daemon.should_exit () with | true -> log #info "Stopping %d workers" total; begin match do_stop ?wait:wait_stop (Hashtbl.keys workers |> List.of_enum) with | `Done -> log #info "Stopped %d workers" total | `Killed n -> log #info "Killed %d (of %d) workers with SIGKILL" n total end | false -> let (dead,_live) = reap (pids ()) in match dead with | [] -> loop (max 1. (pause /. 2.)) | dead when revive -> let pause = min 10. (pause *. 1.5) in dead |> List.iter begin fun pid -> match Hashtbl.find workers pid with | exception Not_found -> log #warn "WUT? Not my worker %d" pid | x -> Hashtbl.remove workers pid; match launch f x with | exception exn -> log #error ~exn "restart" | pid' -> log #info "worker %d exited, replaced with %d" pid pid'; end; loop pause | dead -> log #info "%d child workers exited (PIDs: %s)" (List.length dead) (Stre.list string_of_int dead); List.iter (Hashtbl.remove workers) dead; loop pause in loop 1. let run_workers_enum workers ?wait_stop (type t) (type u) (f : t -> u) (g : u -> unit) enum = assert (workers > 0); let module Worker = struct type task = t type result = u end in let module W = Forks(Worker) in let worker x = (* sane signal handler FIXME restore? *) Signal.set_exit Daemon.signal_exit; f x in let proc = W.create worker workers in Nix.handle_sig_exit_with ~exit:true (fun () -> W.stop ?wait:wait_stop proc); (* FIXME: output in signal handler *) W.perform ~autoexit:true proc enum g; W.stop proc let run_workers workers ?wait_stop (type t) (f : t -> unit) l = run_workers_enum workers ?wait_stop f id (List.enum l) let run_forks ?wait_stop ?revive ?wait ?workers (type t) (f : t -> unit) l = let wait_stop = if wait_stop = None then wait else wait_stop in match workers with | None -> run_forks_simple ?wait_stop ?revive f l | Some n -> run_workers n ?wait_stop f l let run_forks' f l = match l with | [] -> () | [x] -> f x | l -> run_forks f l
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>