package hack_parallel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file multiWorker.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
(**
 * Copyright (c) 2015, Facebook, Inc.
 * All rights reserved.
 *
 * This source code is licensed under the BSD-style license found in the
 * LICENSE file in the "hack" directory of this source tree. An additional grant
 * of patent rights can be found in the PATENTS file in the same directory.
 *
*)

open Hack_core

type 'a nextlist = 'a list Hack_bucket.next

type 'a bucket = 'a Hack_bucket.bucket =
  | Job of 'a
  | Wait
  | Done

let single_threaded_call job merge neutral next =
  let x = ref (next()) in
  let acc = ref neutral in
  (* This is a just a sanity check that the job is serializable and so
   * that the same code will work both in single threaded and parallel
   * mode.
  *)
  let _ = Marshal.to_string job [Marshal.Closures] in
  while !x <> Done do
    match !x with
    | Wait ->
        (* this state should never be reached in single threaded mode, since
           there is no hope for ever getting out of this state *)
        failwith "stuck!"
    | Job l ->
        let res = job neutral l in
        acc := merge res !acc;
        x := next()
    | Done -> ()
  done;
  !acc

let multi_threaded_call
    (type a) (type b) (type c)
    workers (job: c -> a -> b)
    (merge: b -> c -> c)
    (neutral: c)
    (next: a Hack_bucket.next) =
  let rec dispatch workers handles acc =
    (* 'worker' represents available workers. *)
    (* 'handles' represents pendings jobs. *)
    (* 'acc' are the accumulated results. *)
    match workers with
    | [] when handles = [] -> acc
    | [] ->
        (* No worker available: wait for some workers to finish. *)
        collect [] handles acc
    | worker :: workers ->
        (* At least one worker is available... *)
        match next () with
        | Wait -> collect (worker :: workers) handles acc
        | Done ->
            (* ... but no more job to be distributed, let's collect results. *)
            dispatch [] handles acc
        | Job bucket ->
            (* ... send a job to the worker.*)
            let handle =
              Worker.call worker
                (fun xl -> job neutral xl)
                bucket in
            dispatch workers (handle :: handles) acc
  and collect workers handles acc =
    let { Worker.readys; waiters } = Worker.select handles in
    let workers = List.map ~f:Worker.get_worker readys @ workers in
    (* Collect the results. *)
    let acc =
      List.fold_left
        ~f:(fun acc h -> merge (Worker.get_result h) acc)
        ~init:acc
        readys in
    (* And continue.. *)
    dispatch workers waiters acc in
  dispatch workers [] neutral

let call workers ~job ~merge ~neutral ~next =
  match workers with
  | None -> single_threaded_call job merge neutral next
  | Some workers -> multi_threaded_call workers job merge neutral next

let next workers =
  Hack_bucket.make
    ~num_workers: (match workers with Some w -> List.length w | None -> 1)
OCaml

Innovation. Community. Security.