package parany
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file parany.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
open Printf module A = Array module Fn = Filename module Ht = Hashtbl exception End_of_input module Shm = struct let init () = Unix.(socketpair PF_UNIX SOCK_DGRAM 0) let unmarshal_from_file fn = let input = open_in_bin fn in let res = Marshal.from_channel input in close_in input; res let marshal_to_file fn v = let out = open_out_bin fn in Marshal.to_channel out v Marshal.[No_sharing; Closures]; close_out out let rec send_loop sock buff n = try let sent = Unix.send sock buff 0 n [] in assert(sent = n) with Unix.Unix_error(ENOBUFS, _, _) -> (* send on a UDP socket never blocks on Mac OS X and probably several of the BSDs *) (* eprintf "sleep\n%!"; *) let _ = Unix.select [] [] [] 0.001 in (* wait *) (* We should use nanosleep for precision, if only it was provided by the Unix module... *) send_loop sock buff n let raw_send sock str = let n = String.length str in let buff = Bytes.unsafe_of_string str in send_loop sock buff n let send fn queue to_send = marshal_to_file fn to_send; raw_send queue fn let raw_receive sock buff = let n = Bytes.length buff in let received = Unix.recv sock buff 0 n [] in assert(received > 0); Bytes.sub_string buff 0 received let receive queue buff = let fn = raw_receive queue buff in if fn = "EOF" then raise End_of_input else let res = unmarshal_from_file fn in Sys.remove fn; res end (* feeder process loop *) let feed_them_all tmp csize ncores demux queue = (* let pid = Unix.getpid () in * eprintf "feeder(%d) started\n%!" pid; *) let in_count = ref 0 in let prfx = Filename.temp_file ~temp_dir:tmp "iparany_" "" in let to_send = ref [] in try while true do for _ = 1 to csize do to_send := (demux ()) :: !to_send done; let fn = sprintf "%s_%d" prfx !in_count in Shm.send fn queue !to_send; (* eprintf "feeder(%d) sent one\n%!" pid; *) to_send := []; incr in_count done with End_of_input -> begin (* if needed, send remaining jobs (< csize) *) (if !to_send <> [] then let fn = sprintf "%s_%d" prfx !in_count in Shm.send fn queue !to_send); (* send an EOF to each worker *) for _ = 1 to ncores do Shm.raw_send queue "EOF" done; (* eprintf "feeder(%d) finished\n%!" pid; *) Sys.remove prfx; Unix.close queue end (* worker process loop *) let go_to_work prfx jobs_queue work results_queue = (* let pid = Unix.getpid () in * eprintf "worker(%d) started\n%!" pid; *) try let out_count = ref 0 in let buff = Bytes.create 80 in while true do let xs = Shm.receive jobs_queue buff in let ys = List.rev_map work xs in (* eprintf "worker(%d) did one\n%!" pid; *) let fn = sprintf "%s_%d" prfx !out_count in Shm.send fn results_queue ys; incr out_count done with End_of_input -> (* resource cleanup was moved to an at_exit-registered function, so that cleanup is done even in the case of an uncaught exception and the muxer doesn't enter an infinite loop *) () let fork_out f = match Unix.fork () with | -1 -> failwith "Parany.fork_out: fork failed" | 0 -> let () = f () in exit 0 | _pid -> () (* demux and index items *) let idemux (demux: unit -> 'a) = let demux_count = ref 0 in function () -> let res = (!demux_count, demux ()) in incr demux_count; res (* work ignoring item index *) let iwork (work: 'a -> 'b) ((i, x): int * 'a): int * 'b = (i, work x) (* mux items in the right order *) let imux (mux: 'b -> unit) = let mux_count = ref 0 in (* weak type variable avoidance *) let wait_list = Ht.create 11 in function (i, res) -> if !mux_count = i then begin (* unpile as much as possible *) mux res; incr mux_count; if Ht.length wait_list > 0 then try while true do let next = Ht.find wait_list !mux_count in Ht.remove wait_list !mux_count; mux next; incr mux_count done with Not_found -> () (* no more or index hole *) end else (* put somewhere into the pile *) Ht.add wait_list i res (* once initialized, my_rank will be in [0:ncores-1] *) let my_rank = ref (-1) let get_rank () = !my_rank let run ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(preserve = false) ?(core_pin = false) ?(csize = 1) nprocs ~demux ~work ~mux = (* the (type a b) annotation unfortunately implies OCaml >= 4.03.0 *) let demux_work_mux (type a b) ~(demux: unit -> a) ~(work: a -> b) ~(mux: b -> unit): unit = (* create queues *) let jobs_in, jobs_out = Shm.init () in let res_in, res_out = Shm.init () in (* start feeder *) (* eprintf "father(%d) starting feeder\n%!" pid; *) flush_all (); (* prevent duplicated I/O *) Gc.compact (); (* like parmap: reclaim memory prior to forking *) let tmp = Filename.get_temp_dir_name () in fork_out (fun () -> feed_them_all tmp csize nprocs demux jobs_in); (* start workers *) for worker_rank = 0 to nprocs - 1 do my_rank := worker_rank; (* eprintf "father(%d) starting a worker\n%!" pid; *) fork_out (fun () -> init worker_rank; (* per-process optional setup *) at_exit finalize; (* register optional finalize fun *) (* parmap also does core pinning _after_ having called the per-process init function *) if core_pin then Cpu.setcore worker_rank; let prfx = Filename.temp_file ~temp_dir:tmp (sprintf "oparany%d_" worker_rank) "" in at_exit (fun () -> (* tell collector to stop *) (* eprintf "worker(%d) finished\n%!" pid; *) Shm.raw_send res_in "EOF"; Unix.close res_in; Sys.remove prfx ); go_to_work prfx jobs_out work res_in ) done; (* collect results *) let finished = ref 0 in let buff = Bytes.create 80 in while !finished < nprocs do try while true do let xs = Shm.receive res_out buff in (* eprintf "father(%d) collecting one\n%!" pid; *) List.iter mux xs done with End_of_input -> incr finished done; (* free resources *) List.iter Unix.close [jobs_in; jobs_out; res_in; res_out] in if nprocs <= 1 then (* sequential version *) try while true do mux (work (demux ())) done with End_of_input -> () else begin (* parallel version *) assert(csize >= 1); let max_cores = Cpu.numcores () in assert(nprocs <= max_cores); (* let pid = Unix.getpid () in * eprintf "father(%d) started\n%!" pid; *) (if preserve then (* In some cases, it is necessary for the user to preserve the input order in the output. In this case, we still compute things potentially out of order (for parallelization efficiency); but we will order back the results in input order (for user's convenience) *) demux_work_mux ~demux:(idemux demux) ~work:(iwork work) ~mux:(imux mux) else (* by default, to maximize parallel efficiency we don't care about the order in which jobs are computed. *) demux_work_mux ~demux ~work ~mux ); (* eprintf "father(%d) finished\n%!" pid; *) end (* Wrapper for near-compatibility with Parmap *) module Parmap = struct let tail_rec_map f l = List.rev (List.rev_map f l) let tail_rec_mapi f l = let i = ref 0 in let res = List.rev_map (fun x -> let j = !i in let y = f j x in incr i; y ) l in List.rev res let parmap ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l = if ncores <= 1 then tail_rec_map f l else let input = ref l in let demux () = match !input with | [] -> raise End_of_input | x :: xs -> (input := xs; x) in let output = ref [] in let mux x = output := x :: !output in (* parallel work *) run ~init ~finalize ~preserve ~core_pin ~csize ncores ~demux ~work:f ~mux; if preserve then List.rev !output else !output let parmapi ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l = if ncores <= 1 then tail_rec_mapi f l else let input = ref l in let i = ref 0 in let demux () = match !input with | [] -> raise End_of_input | x :: xs -> begin let j = !i in input := xs; let res = (j, x) in incr i; res end in let output = ref [] in let f' (i, x) = f i x in let mux x = output := x :: !output in (* parallel work *) run ~init ~finalize ~preserve ~core_pin ~csize ncores ~demux ~work:f' ~mux; if preserve then List.rev !output else !output let pariter ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l = if ncores <= 1 then List.iter f l else let input = ref l in let demux () = match !input with | [] -> raise End_of_input | x :: xs -> (input := xs; x) in (* parallel work *) run ~init ~finalize ~preserve ~core_pin ~csize ncores ~demux ~work:f ~mux:ignore let parfold ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f g init_acc l = if ncores <= 1 then List.fold_left (fun acc x -> g acc (f x)) init_acc l else let input = ref l in let demux () = match !input with | [] -> raise End_of_input | x :: xs -> (input := xs; x) in let output = ref init_acc in let mux x = output := g !output x in (* parallel work *) run ~init ~finalize ~preserve ~core_pin ~csize ncores ~demux ~work:f ~mux; !output (* preserves array input order *) let array_parmap ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(core_pin = false) ?(csize = 1) ncores f init_acc a = let n = A.length a in let res = A.make n init_acc in run ~init ~finalize ~preserve:false (* input-order is preserved explicitely below *) ~core_pin ~csize ncores ~demux:( let in_count = ref 0 in fun () -> if !in_count = n then raise End_of_input else let i = !in_count in incr in_count; i) ~work:(fun i -> (i, f (A.unsafe_get a i))) ~mux:(fun (i, y) -> A.unsafe_set res i y); res let array_pariter ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(core_pin = false) ?(csize = 1) ncores f a = run ~init ~finalize ~preserve:false ~core_pin ~csize ncores ~demux:( let n = A.length a in let in_count = ref 0 in fun () -> if !in_count = n then raise End_of_input else let i = !in_count in incr in_count; i) ~work:(fun i -> f (A.unsafe_get a i)) ~mux:(fun () -> ()) let array_pariteri ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) ?(core_pin = false) ?(csize = 1) ncores f a = run ~init ~finalize ~preserve:false ~core_pin ~csize ncores ~demux:( let n = A.length a in let in_count = ref 0 in fun () -> if !in_count = n then raise End_of_input else let i = !in_count in incr in_count; i) ~work:(fun i -> f i (A.unsafe_get a i)) ~mux:(fun () -> ()) (* let parfold_compat * ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ()) * ?(ncores: int option) ?(chunksize: int option) (f: 'a -> 'b -> 'b) * (l: 'a list) (init_acc: 'b) (acc_fun: 'b -> 'b -> 'b): 'b = * let nprocs = match ncores with * | None -> 1 (\* if the user doesn't know the number of cores to use, * we don't know better *\) * | Some x -> x in * let csize = match chunksize with * | None -> 1 * | Some x -> x in * if nprocs <= 1 then * List.fold_left (fun acc x -> f x acc) init_acc l * else * let input = ref l in * let demux () = match !input with * | [] -> raise End_of_input * | _ -> * let this_chunk, rest = BatList.takedrop csize !input in * input := rest; * this_chunk in * let work xs = * List.fold_left (fun acc x -> f x acc) init_acc xs in * let output = ref init_acc in * let mux x = * output := acc_fun !output x in * (\* parallel work *\) * run ~init ~finalize * (\* leave csize=1 bellow *\) * ~preserve:false ~core_pin:false ~csize:1 nprocs ~demux ~work ~mux; * !output *) end