package rpc_parallel

  1. Overview
  2. Docs

Source file map_reduce.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
open Core
open Poly
open Async
module Heap = Pairing_heap

module Half_open_interval = struct
  module T = struct
    type t = int * int [@@deriving sexp]

    let create_exn l u =
      if l >= u
      then
        failwiths
          ~here:[%here]
          "Lower bound must be less than upper bound"
          (l, u)
          [%sexp_of: int * int];
      l, u
    ;;

    let lbound t = fst t
    let ubound t = snd t
    let intersects t1 t2 = lbound t1 < ubound t2 && lbound t2 < ubound t1

    let compare t1 t2 =
      if t1 = t2
      then 0
      else (
        if intersects t1 t2
        then
          failwiths
            ~here:[%here]
            "Cannot compare unequal intersecting intervals"
            (t1, t2)
            [%sexp_of: t * t];
        Int.compare (lbound t1) (lbound t2))
    ;;
  end

  include T
  include Comparable.Make (T)
end

let append_index reader =
  let index = ref 0 in
  Pipe.map reader ~f:(fun item ->
    let item_index = !index in
    index := !index + 1;
    item, item_index)
;;

type packed_remote = Packed_remote : 'remote Remote_executable.t -> packed_remote

module Config = struct
  type t =
    { local : int
    ; remote : (packed_remote * int) list
    ; cd : string option
    ; redirect_stderr : [ `Dev_null | `File_append of string ]
    ; redirect_stdout : [ `Dev_null | `File_append of string ]
    }

  let default_cores () = (ok_exn Linux_ext.cores) ()

  let create ?(local = 0) ?(remote = []) ?cd ~redirect_stderr ~redirect_stdout () =
    let local, remote =
      if local = 0 && List.is_empty remote
      then default_cores (), remote
      else local, remote
    in
    { local
    ; remote = List.map remote ~f:(fun (remote, n) -> Packed_remote remote, n)
    ; cd
    ; redirect_stderr
    ; redirect_stdout
    }
  ;;
end

(* Wrappers for generic worker *)
module type Worker = sig
  type t
  type param_type
  type run_input_type
  type run_output_type

  val spawn_config_exn : Config.t -> param_type -> t list Deferred.t
  val run_exn : t -> run_input_type -> run_output_type Deferred.t
  val shutdown_exn : t -> unit Deferred.t
end

module type Rpc_parallel_worker_spec = sig
  type state_type

  module Param : Binable
  module Run_input : Binable
  module Run_output : Binable

  val init : Param.t -> state_type Deferred.t
  val execute : state_type -> Run_input.t -> Run_output.t Deferred.t
end

module Make_rpc_parallel_worker (S : Rpc_parallel_worker_spec) = struct
  module Parallel_worker = struct
    module T = struct
      type 'worker functions =
        { execute : ('worker, S.Run_input.t, S.Run_output.t) Parallel.Function.t }

      module Worker_state = struct
        type init_arg = S.Param.t [@@deriving bin_io]
        type t = S.state_type
      end

      module Connection_state = struct
        type init_arg = unit [@@deriving bin_io]
        type t = unit
      end

      module Functions
          (C : Parallel.Creator
           with type worker_state := Worker_state.t
            and type connection_state := Connection_state.t) =
      struct
        let execute =
          C.create_rpc
            ~f:(fun ~worker_state ~conn_state:() -> S.execute worker_state)
            ~bin_input:S.Run_input.bin_t
            ~bin_output:S.Run_output.bin_t
            ()
        ;;

        let functions = { execute }
        let init_worker_state = S.init
        let init_connection_state ~connection:_ ~worker_state:_ () = return ()
      end
    end

    include Parallel.Make (T)
  end

  type t = Parallel_worker.Connection.t
  type param_type = S.Param.t
  type run_input_type = S.Run_input.t
  type run_output_type = S.Run_output.t

  let spawn_exn how param ?cd ~redirect_stderr ~redirect_stdout =
    Parallel_worker.spawn_exn
      ~how
      ?cd
      ~shutdown_on:Connection_closed
      ~redirect_stderr
      ~redirect_stdout
      param
      ~on_failure:Error.raise
      ~connection_state_init_arg:()
  ;;

  let spawn_config_exn
        { Config.local; remote; cd; redirect_stderr; redirect_stdout }
        param
    =
    if local < 0
    then failwiths ~here:[%here] "config.local must be nonnegative" local Int.sexp_of_t;
    (match List.find remote ~f:(fun (_remote, n) -> n < 0) with
     | Some remote ->
       failwiths
         ~here:[%here]
         "remote number of workers must be nonnegative"
         (snd remote)
         Int.sexp_of_t
     | None -> ());
    if local = 0 && not (List.exists remote ~f:(fun (_remote, n) -> n > 0))
    then
      failwiths
        ~here:[%here]
        "total number of workers must be positive"
        (local, List.map remote ~f:snd)
        [%sexp_of: int * int list];
    let spawn_n where n =
      Deferred.List.init n ~f:(fun _i ->
        spawn_exn
          where
          param
          ?cd
          ~redirect_stderr:(redirect_stderr :> Fd_redirection.t)
          ~redirect_stdout:(redirect_stdout :> Fd_redirection.t))
    in
    let%map local_workers, remote_workers =
      Deferred.both
        (spawn_n How_to_run.local local)
        (Deferred.List.concat_map
           ~how:`Parallel
           remote
           ~f:(fun (Packed_remote remote, n) -> spawn_n (How_to_run.remote remote) n))
    in
    local_workers @ remote_workers
  ;;

  let run_exn t input =
    Parallel_worker.Connection.run_exn t ~f:Parallel_worker.functions.execute ~arg:input
  ;;

  let shutdown_exn conn = Parallel_worker.Connection.close conn
end

(* Map *)

module type Map_function = sig
  module Param : Binable
  module Input : Binable
  module Output : Binable

  module Worker :
    Worker
    with type param_type = Param.t
    with type run_input_type = Input.t
    with type run_output_type = Output.t
end

module type Map_function_with_init_spec = sig
  type state_type

  module Param : Binable
  module Input : Binable
  module Output : Binable

  val init : Param.t -> state_type Deferred.t
  val map : state_type -> Input.t -> Output.t Deferred.t
end

module Make_map_function_with_init (S : Map_function_with_init_spec) = struct
  module Param = S.Param
  module Input = S.Input
  module Output = S.Output

  module Worker = Make_rpc_parallel_worker (struct
      type state_type = S.state_type

      module Param = Param
      module Run_input = Input
      module Run_output = Output

      let init = S.init
      let execute = S.map
    end)
end

module type Map_function_spec = sig
  module Input : Binable
  module Output : Binable

  val map : Input.t -> Output.t Deferred.t
end

module Make_map_function (S : Map_function_spec) = Make_map_function_with_init (struct
    type state_type = unit

    module Param = struct
      type t = unit [@@deriving bin_io]
    end

    module Input = S.Input
    module Output = S.Output

    let init = return
    let map () = S.map
  end)

(* Map-combine *)

module type Map_reduce_function = sig
  module Param : Binable
  module Accum : Binable
  module Input : Binable

  module Worker :
    Worker
    with type param_type = Param.t
    with type run_input_type =
           [ `Map of Input.t
           | `Combine of Accum.t * Accum.t
           | `Map_right_combine of Accum.t * Input.t (* combine accum (map input) *)
           ]
    with type run_output_type = Accum.t
end

module type Map_reduce_function_with_init_spec = sig
  type state_type

  module Param : Binable
  module Accum : Binable
  module Input : Binable

  val init : Param.t -> state_type Deferred.t
  val map : state_type -> Input.t -> Accum.t Deferred.t
  val combine : state_type -> Accum.t -> Accum.t -> Accum.t Deferred.t
end

module Make_map_reduce_function_with_init (S : Map_reduce_function_with_init_spec) =
struct
  module Param = S.Param
  module Accum = S.Accum
  module Input = S.Input

  module Worker = Make_rpc_parallel_worker (struct
      type state_type = S.state_type

      module Param = Param

      module Run_input = struct
        type t =
          [ `Map of Input.t
          | `Combine of Accum.t * Accum.t
          | `Map_right_combine of Accum.t * Input.t
          ]
        [@@deriving bin_io]
      end

      module Run_output = Accum

      let init = S.init

      let execute state = function
        | `Map input -> S.map state input
        | `Combine (accum1, accum2) -> S.combine state accum1 accum2
        | `Map_right_combine (accum1, input) ->
          let%bind accum2 = S.map state input in
          S.combine state accum1 accum2
      ;;
    end)
end

module type Map_reduce_function_spec = sig
  module Accum : Binable
  module Input : Binable

  val map : Input.t -> Accum.t Deferred.t
  val combine : Accum.t -> Accum.t -> Accum.t Deferred.t
end

module Make_map_reduce_function (S : Map_reduce_function_spec) =
  Make_map_reduce_function_with_init (struct
    type state_type = unit

    module Param = struct
      type t = unit [@@deriving bin_io]
    end

    module Accum = S.Accum
    module Input = S.Input

    let init = return
    let map () = S.map
    let combine () = S.combine
  end)

let map_unordered (type param a b) config input_reader ~m ~(param : param) =
  let module Map_function = (val m : Map_function
                             with type Param.t = param
                              and type Input.t = a
                              and type Output.t = b)
  in
  let%bind workers = Map_function.Worker.spawn_config_exn config param in
  let input_with_index_reader = append_index input_reader in
  let output_reader, output_writer = Pipe.create () in
  let consumer =
    Pipe.add_consumer input_with_index_reader ~downstream_flushed:(fun () ->
      Pipe.downstream_flushed output_writer)
  in
  let rec map_loop worker =
    match%bind Pipe.read ~consumer input_with_index_reader with
    | `Eof -> Map_function.Worker.shutdown_exn worker
    | `Ok (input, index) ->
      let%bind output = Map_function.Worker.run_exn worker input in
      let%bind () = Pipe.write output_writer (output, index) in
      map_loop worker
  in
  don't_wait_for
    (let%map () = Deferred.all_unit (List.map workers ~f:map_loop) in
     Pipe.close output_writer);
  return output_reader
;;

let map config input_reader ~m ~param =
  let%bind mapped_reader = map_unordered config input_reader ~m ~param in
  let new_reader, new_writer = Pipe.create () in
  let expecting_index = ref 0 in
  let out_of_order_output =
    Heap.create ~cmp:(fun (_, index1) (_, index2) -> Int.compare index1 index2) ()
  in
  (* Pops in-order output until we reach a gap. *)
  let rec write_out_of_order_output () =
    match Heap.top out_of_order_output with
    | Some (output, index) when index = !expecting_index ->
      expecting_index := !expecting_index + 1;
      Heap.remove_top out_of_order_output;
      let%bind () = Pipe.write new_writer output in
      write_out_of_order_output ()
    | _ -> Deferred.unit
  in
  don't_wait_for
    (let%map () =
       Pipe.iter mapped_reader ~f:(fun ((output, index) as output_and_index) ->
         if index = !expecting_index
         then (
           expecting_index := !expecting_index + 1;
           let%bind () = Pipe.write new_writer output in
           write_out_of_order_output ())
         else if index > !expecting_index
         then (
           Heap.add out_of_order_output output_and_index;
           Deferred.unit)
         else assert false)
     in
     Pipe.close new_writer);
  return new_reader
;;

let find_map (type param a b) config input_reader ~m ~(param : param) =
  let module Map_function = (val m : Map_function
                             with type Param.t = param
                              and type Input.t = a
                              and type Output.t = b option)
  in
  let%bind workers = Map_function.Worker.spawn_config_exn config param in
  let found_value = ref None in
  let rec find_loop worker =
    match%bind Pipe.read input_reader with
    | `Eof -> Map_function.Worker.shutdown_exn worker
    | `Ok input ->
      (* Check result and exit early if we've found something. *)
      (match !found_value with
       | Some _ -> Map_function.Worker.shutdown_exn worker
       | None ->
         (match%bind Map_function.Worker.run_exn worker input with
          | Some value ->
            found_value := Some value;
            Map_function.Worker.shutdown_exn worker
          | None -> find_loop worker))
  in
  let%map () = Deferred.all_unit (List.map workers ~f:find_loop) in
  !found_value
;;

let map_reduce_commutative (type param a accum) config input_reader ~m ~(param : param) =
  let module Map_reduce_function = (val m : Map_reduce_function
                                    with type Param.t = param
                                     and type Input.t = a
                                     and type Accum.t = accum)
  in
  let%bind workers = Map_reduce_function.Worker.spawn_config_exn config param in
  let rec map_and_combine_loop worker acc =
    match%bind Pipe.read input_reader with
    | `Eof -> return acc
    | `Ok input ->
      let%bind acc =
        match acc with
        | Some acc ->
          Map_reduce_function.Worker.run_exn worker (`Map_right_combine (acc, input))
        | None -> Map_reduce_function.Worker.run_exn worker (`Map input)
      in
      map_and_combine_loop worker (Some acc)
  in
  let combined_acc = ref None in
  let rec combine_loop worker acc =
    match !combined_acc with
    | Some other_acc ->
      combined_acc := None;
      Map_reduce_function.Worker.run_exn worker (`Combine (other_acc, acc))
      >>= combine_loop worker
    | None ->
      combined_acc := Some acc;
      Map_reduce_function.Worker.shutdown_exn worker
  in
  let%map () =
    Deferred.all_unit
      (List.map workers ~f:(fun worker ->
         match%bind map_and_combine_loop worker None with
         | Some acc -> combine_loop worker acc
         | None -> Map_reduce_function.Worker.shutdown_exn worker))
  in
  !combined_acc
;;

let map_reduce (type param a accum) config input_reader ~m ~(param : param) =
  let module Map_reduce_function = (val m : Map_reduce_function
                                    with type Param.t = param
                                     and type Input.t = a
                                     and type Accum.t = accum)
  in
  let%bind workers = Map_reduce_function.Worker.spawn_config_exn config param in
  let input_with_index_reader = append_index input_reader in
  let module H = Half_open_interval in
  let acc_map = ref H.Map.empty in
  let rec combine_loop
            worker
            key
            acc
            (dir : [ `Left | `Left_nothing_right | `Right | `Right_nothing_left ])
    =
    match dir with
    | (`Left | `Left_nothing_right) as dir' ->
      (match H.Map.closest_key !acc_map `Less_than key with
       | Some (left_key, left_acc) when H.ubound left_key = H.lbound key ->
         (* combine acc_{left_lbound, left_ubound} acc_{this_lbound, this_ubound}
            -> acc_{left_lbound, this_ubound} *)
         (* We need to remove both nodes from the tree to indicate that we are working on
            combining them. *)
         acc_map := H.Map.remove (H.Map.remove !acc_map key) left_key;
         let%bind new_acc =
           Map_reduce_function.Worker.run_exn worker (`Combine (left_acc, acc))
         in
         let new_key = H.create_exn (H.lbound left_key) (H.ubound key) in
         acc_map := H.Map.set !acc_map ~key:new_key ~data:new_acc;
         (* Continue searching in the same direction. (See above comment.) *)
         combine_loop worker new_key new_acc `Left
       | _ ->
         (match dir' with
          | `Left -> combine_loop worker key acc `Right_nothing_left
          | `Left_nothing_right -> Deferred.unit))
    | (`Right | `Right_nothing_left) as dir' ->
      (match H.Map.closest_key !acc_map `Greater_than key with
       | Some (right_key, right_acc) when H.lbound right_key = H.ubound key ->
         (* combine acc_{this_lbound, this_ubound} acc_{right_lbound, right_ubound}
            -> acc_{this_lbound, right_ubound} *)
         acc_map := H.Map.remove (H.Map.remove !acc_map key) right_key;
         let%bind new_acc =
           Map_reduce_function.Worker.run_exn worker (`Combine (acc, right_acc))
         in
         let new_key = H.create_exn (H.lbound key) (H.ubound right_key) in
         acc_map := H.Map.set !acc_map ~key:new_key ~data:new_acc;
         combine_loop worker new_key new_acc `Right
       | _ ->
         (match dir' with
          | `Right -> combine_loop worker key acc `Left_nothing_right
          | `Right_nothing_left -> Deferred.unit))
  in
  let rec map_and_combine_loop worker =
    match%bind Pipe.read input_with_index_reader with
    | `Eof -> Map_reduce_function.Worker.shutdown_exn worker
    | `Ok (input, index) ->
      let key = H.create_exn index (index + 1) in
      let%bind () =
        match H.Map.closest_key !acc_map `Less_than key with
        | Some (left_key, left_acc) when H.ubound left_key = H.lbound key ->
          (* combine acc_{left_lbound, left_ubound} (map a_index)
             -> acc_{left_lbound, index + 1} *)
          acc_map := H.Map.remove !acc_map left_key;
          let%bind acc =
            Map_reduce_function.Worker.run_exn
              worker
              (`Map_right_combine (left_acc, input))
          in
          let key = H.create_exn (H.lbound left_key) (H.ubound key) in
          acc_map := H.Map.set !acc_map ~key ~data:acc;
          combine_loop worker key acc `Left
        | _ ->
          (* map a_index -> acc_{index, index + 1} *)
          let%bind acc = Map_reduce_function.Worker.run_exn worker (`Map input) in
          acc_map := H.Map.set !acc_map ~key ~data:acc;
          combine_loop worker key acc `Left
      in
      map_and_combine_loop worker
  in
  let%map () = Deferred.all_unit (List.map workers ~f:map_and_combine_loop) in
  assert (Map.length !acc_map <= 1);
  Option.map (Map.min_elt !acc_map) ~f:snd
;;
OCaml

Innovation. Community. Security.