package parany

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file parany.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439

open Printf
module A = Array
module Fn = Filename
module Ht = Hashtbl

exception End_of_input

module Shm = struct

  let init () =
    Unix.(socketpair PF_UNIX SOCK_DGRAM 0)

  let unmarshal_from_file fn =
    let input = open_in_bin fn in
    let res = Marshal.from_channel input in
    close_in input;
    res

  let marshal_to_file fn v =
    let out = open_out_bin fn in
    Marshal.to_channel out v Marshal.[No_sharing; Closures];
    close_out out

  let rec send_loop sock buff n =
    try
      let sent = Unix.send sock buff 0 n [] in
      assert(sent = n)
    with Unix.Unix_error(ENOBUFS, _, _) ->
      (* send on a UDP socket never blocks on Mac OS X
         and probably several of the BSDs *)
      (* eprintf "sleep\n%!"; *)
      let _ = Unix.select [] [] [] 0.001 in (* wait *)
      (* We should use nanosleep for precision, if only it
         was provided by the Unix module... *)
      send_loop sock buff n

  let raw_send sock str =
    let n = String.length str in
    let buff = Bytes.unsafe_of_string str in
    send_loop sock buff n

  let send fn queue to_send =
    marshal_to_file fn to_send;
    raw_send queue fn

  let raw_receive sock buff =
    let n = Bytes.length buff in
    let received = Unix.recv sock buff 0 n [] in
    assert(received > 0);
    Bytes.sub_string buff 0 received

  let receive queue buff =
    let fn = raw_receive queue buff in
    if fn = "EOF" then
      raise End_of_input
    else
      let res = unmarshal_from_file fn in
      Sys.remove fn;
      res

end

(* feeder process loop *)
let feed_them_all tmp csize ncores demux queue =
  (* let pid = Unix.getpid () in
   * eprintf "feeder(%d) started\n%!" pid; *)
  let in_count = ref 0 in
  let prfx = Filename.temp_file ~temp_dir:tmp "iparany_" "" in
  let to_send = ref [] in
  try
    while true do
      for _ = 1 to csize do
        to_send := (demux ()) :: !to_send
      done;
      let fn = sprintf "%s_%d" prfx !in_count in
      Shm.send fn queue !to_send;
      (* eprintf "feeder(%d) sent one\n%!" pid; *)
      to_send := [];
      incr in_count
    done
  with End_of_input ->
    begin
      (* if needed, send remaining jobs (< csize) *)
      (if !to_send <> [] then
         let fn = sprintf "%s_%d" prfx !in_count in
         Shm.send fn queue !to_send);
      (* send an EOF to each worker *)
      for _ = 1 to ncores do
        Shm.raw_send queue "EOF"
      done;
      (* eprintf "feeder(%d) finished\n%!" pid; *)
      Sys.remove prfx;
      Unix.close queue
    end

(* worker process loop *)
let go_to_work prfx jobs_queue work results_queue =
  (* let pid = Unix.getpid () in
   * eprintf "worker(%d) started\n%!" pid; *)
  try
    let out_count = ref 0 in
    let buff = Bytes.create 80 in
    while true do
      let xs = Shm.receive jobs_queue buff in
      let ys = List.rev_map work xs in
      (* eprintf "worker(%d) did one\n%!" pid; *)
      let fn = sprintf "%s_%d" prfx !out_count in
      Shm.send fn results_queue ys;
      incr out_count
    done
  with End_of_input ->
    (* resource cleanup was moved to an at_exit-registered function,
       so that cleanup is done even in the case of an uncaught exception
       and the muxer doesn't enter an infinite loop *)
    ()

let fork_out f =
  match Unix.fork () with
  | -1 -> failwith "Parany.fork_out: fork failed"
  | 0 -> let () = f () in exit 0
  | _pid -> ()

(* demux and index items *)
let idemux (demux: unit -> 'a) =
  let demux_count = ref 0 in
  function () ->
    let res = (!demux_count, demux ()) in
    incr demux_count;
    res

(* work ignoring item index *)
let iwork (work: 'a -> 'b) ((i, x): int * 'a): int * 'b =
  (i, work x)

(* mux items in the right order *)
let imux (mux: 'b -> unit) =
  let mux_count = ref 0 in
  (* weak type variable avoidance *)
  let wait_list = Ht.create 11 in
  function (i, res) ->
    if !mux_count = i then
      begin
        (* unpile as much as possible *)
        mux res;
        incr mux_count;
        if Ht.length wait_list > 0 then
          try
            while true do
              let next = Ht.find wait_list !mux_count in
              Ht.remove wait_list !mux_count;
              mux next;
              incr mux_count
            done
          with Not_found -> () (* no more or index hole *)
      end
    else
      (* put somewhere into the pile *)
      Ht.add wait_list i res

(* once initialized, my_rank will be in [0:ncores-1] *)
let my_rank = ref (-1)

let get_rank () =
  !my_rank

let run ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
    ?(preserve = false) ?(core_pin = false) ?(csize = 1) nprocs
    ~demux ~work ~mux =
  (* the (type a b) annotation unfortunately implies OCaml >= 4.03.0 *)
  let demux_work_mux (type a b)
      ~(demux: unit -> a) ~(work: a -> b) ~(mux: b -> unit): unit =
    (* create queues *)
    let jobs_in, jobs_out = Shm.init () in
    let res_in, res_out = Shm.init () in
    (* start feeder *)
    (* eprintf "father(%d) starting feeder\n%!" pid; *)
    flush_all (); (* prevent duplicated I/O *)
    Gc.compact (); (* like parmap: reclaim memory prior to forking *)
    let tmp = Filename.get_temp_dir_name () in
    fork_out (fun () -> feed_them_all tmp csize nprocs demux jobs_in);
    (* start workers *)
    for worker_rank = 0 to nprocs - 1 do
      my_rank := worker_rank;
      (* eprintf "father(%d) starting a worker\n%!" pid; *)
      fork_out (fun () ->
          init worker_rank; (* per-process optional setup *)
          at_exit finalize; (* register optional finalize fun *)
          (* parmap also does core pinning _after_ having called
             the per-process init function *)
          if core_pin then Cpu.setcore worker_rank;
          let prfx = Filename.temp_file ~temp_dir:tmp (sprintf "oparany%d_" worker_rank) "" in
          at_exit (fun () ->
              (* tell collector to stop *)
              (* eprintf "worker(%d) finished\n%!" pid; *)
              Shm.raw_send res_in "EOF";
              Unix.close res_in;
              Sys.remove prfx
            );
          go_to_work prfx jobs_out work res_in
        )
    done;
    (* collect results *)
    let finished = ref 0 in
    let buff = Bytes.create 80 in
    while !finished < nprocs do
      try
        while true do
          let xs = Shm.receive res_out buff in
          (* eprintf "father(%d) collecting one\n%!" pid; *)
          List.iter mux xs
        done
      with End_of_input -> incr finished
    done;
    (* free resources *)
    List.iter Unix.close [jobs_in; jobs_out; res_in; res_out]
  in
  if nprocs <= 1 then
    (* sequential version *)
    try
      while true do
        mux (work (demux ()))
      done
    with End_of_input -> ()
  else
    begin
      (* parallel version *)
      assert(csize >= 1);
      let max_cores = Cpu.numcores () in
      assert(nprocs <= max_cores);
      (* let pid = Unix.getpid () in
       * eprintf "father(%d) started\n%!" pid; *)
      (if preserve then
         (* In some cases, it is necessary for the user to preserve the
            input order in the output. In this case, we still compute things
            potentially out of order (for parallelization efficiency);
            but we will order back the results in input order
            (for user's convenience) *)
         demux_work_mux
           ~demux:(idemux demux) ~work:(iwork work) ~mux:(imux mux)
       else
         (* by default, to maximize parallel efficiency we don't care about the
            order in which jobs are computed. *)
         demux_work_mux ~demux ~work ~mux
      );
      (* eprintf "father(%d) finished\n%!" pid; *)
    end

(* Wrapper for near-compatibility with Parmap *)
module Parmap = struct

  let tail_rec_map f l =
    List.rev (List.rev_map f l)

  let tail_rec_mapi f l =
    let i = ref 0 in
    let res =
      List.rev_map (fun x ->
          let j = !i in
          let y = f j x in
          incr i;
          y
        ) l in
    List.rev res

  let parmap ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
      ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l =
    if ncores <= 1 then tail_rec_map f l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      let output = ref [] in
      let mux x =
        output := x :: !output in
      (* parallel work *)
      run ~init ~finalize ~preserve ~core_pin ~csize ncores
        ~demux ~work:f ~mux;
      if preserve then
        List.rev !output
      else
        !output

  let parmapi ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
      ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l =
    if ncores <= 1 then tail_rec_mapi f l
    else
      let input = ref l in
      let i = ref 0 in
      let demux () =
        match !input with
        | [] -> raise End_of_input
        | x :: xs ->
          begin
            let j = !i in
            input := xs;
            let res = (j, x) in
            incr i;
            res
          end in
      let output = ref [] in
      let f' (i, x) = f i x in
      let mux x =
        output := x :: !output in
      (* parallel work *)
      run ~init ~finalize ~preserve ~core_pin ~csize ncores
        ~demux ~work:f' ~mux;
      if preserve then
        List.rev !output
      else
        !output

  let pariter ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
      ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores f l =
    if ncores <= 1 then List.iter f l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      (* parallel work *)
      run ~init ~finalize ~preserve ~core_pin ~csize ncores
        ~demux ~work:f ~mux:ignore

  let parfold ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
      ?(preserve = false) ?(core_pin = false) ?(csize = 1) ncores
      f g init_acc l =
    if ncores <= 1 then
      List.fold_left (fun acc x -> g acc (f x)) init_acc l
    else
      let input = ref l in
      let demux () = match !input with
        | [] -> raise End_of_input
        | x :: xs -> (input := xs; x) in
      let output = ref init_acc in
      let mux x =
        output := g !output x in
      (* parallel work *)
      run ~init ~finalize ~preserve ~core_pin ~csize ncores
        ~demux ~work:f ~mux;
      !output

  (* preserves array input order *)
  let array_parmap
      ?(init = fun (_rank: int) -> ())
      ?(finalize = fun () -> ())
      ?(core_pin = false) ?(csize = 1) ncores f init_acc a =
    let n = A.length a in
    let res = A.make n init_acc in
    run ~init ~finalize
      ~preserve:false (* input-order is preserved explicitely below *)
      ~core_pin ~csize ncores
      ~demux:(
        let in_count = ref 0 in
        fun () ->
          if !in_count = n then
            raise End_of_input
          else
            let i = !in_count in
            incr in_count;
            i)
      ~work:(fun i -> (i, f (A.unsafe_get a i)))
      ~mux:(fun (i, y) -> A.unsafe_set res i y);
    res

  let array_pariter
      ?(init = fun (_rank: int) -> ())
      ?(finalize = fun () -> ())
      ?(core_pin = false) ?(csize = 1) ncores f a =
    run ~init ~finalize
      ~preserve:false
      ~core_pin ~csize ncores
      ~demux:(
        let n = A.length a in
        let in_count = ref 0 in
        fun () ->
          if !in_count = n then
            raise End_of_input
          else
            let i = !in_count in
            incr in_count;
            i)
      ~work:(fun i -> f (A.unsafe_get a i))
      ~mux:(fun () -> ())

  let array_pariteri
      ?(init = fun (_rank: int) -> ())
      ?(finalize = fun () -> ())
      ?(core_pin = false) ?(csize = 1) ncores f a =
    run ~init ~finalize
      ~preserve:false
      ~core_pin ~csize ncores
      ~demux:(
        let n = A.length a in
        let in_count = ref 0 in
        fun () ->
          if !in_count = n then
            raise End_of_input
          else
            let i = !in_count in
            incr in_count;
            i)
      ~work:(fun i -> f i (A.unsafe_get a i))
      ~mux:(fun () -> ())

  (* let parfold_compat
   *     ?(init = fun (_rank: int) -> ()) ?(finalize = fun () -> ())
   *     ?(ncores: int option) ?(chunksize: int option) (f: 'a -> 'b -> 'b)
   *     (l: 'a list) (init_acc: 'b) (acc_fun: 'b -> 'b -> 'b): 'b =
   *   let nprocs = match ncores with
   *     | None -> 1 (\* if the user doesn't know the number of cores to use,
   *                    we don't know better *\)
   *     | Some x -> x in
   *   let csize = match chunksize with
   *     | None -> 1
   *     | Some x -> x in
   *   if nprocs <= 1 then
   *     List.fold_left (fun acc x -> f x acc) init_acc l
   *   else
   *     let input = ref l in
   *     let demux () = match !input with
   *       | [] -> raise End_of_input
   *       | _ ->
   *         let this_chunk, rest = BatList.takedrop csize !input in
   *         input := rest;
   *         this_chunk in
   *     let work xs =
   *       List.fold_left (fun acc x -> f x acc) init_acc xs in
   *     let output = ref init_acc in
   *     let mux x =
   *       output := acc_fun !output x in
   *     (\* parallel work *\)
   *     run ~init ~finalize
   *       (\* leave csize=1 bellow *\)
   *       ~preserve:false ~core_pin:false ~csize:1 nprocs ~demux ~work ~mux;
   *     !output *)

end
OCaml

Innovation. Community. Security.