package hack_parallel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file scheduler.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
(** Copyright (c) 2016-present, Facebook, Inc.
    Modified work Copyright (c) 2018-2019 Rijnard van Tonder
    This source code is licensed under the MIT license found in the
    LICENSE file in the root directory of this source tree. *)

open Hack_parallel_intf.Std

module Daemon = Daemon


type t = {
  workers: Worker.t list;
  number_of_workers: int;
  bucket_multiplier: int;
}


let entry =
  Worker.register_entry_point ~restore:(fun _ -> ())


let create
    ?(number_of_workers = 1)
    ?(bucket_multiplier = 10)
    () =
  let heap_handle = Memory.get_heap_handle () in
  let workers =
    Hack_parallel_intf.Std.Worker.make
      ~saved_state:()
      ~entry
      ~nbr_procs:number_of_workers
      ~heap_handle
      ~gc_control:Memory.worker_garbage_control
  in
  Memory.connect heap_handle;
  { workers; number_of_workers; bucket_multiplier }


let map_reduce
    { workers; number_of_workers; bucket_multiplier }
    ?bucket_size
    ~init
    ~map
    ~reduce
    work =
  let number_of_workers =
    match bucket_size with
    | Some exact_size when exact_size > 0 ->
      (List.length work / exact_size) + 1
    | _ ->
      let bucket_multiplier = Core_kernel.Int.min bucket_multiplier (1 + (List.length work / 400)) in
      number_of_workers * bucket_multiplier
  in
  MultiWorker.call
    (Some workers)
    ~job:map
    ~merge:reduce
    ~neutral:init
    ~next:(Bucket.make ~num_workers:number_of_workers work)


let iter scheduler ~f work =
  map_reduce
    scheduler
    ~init:()
    ~map:(fun _ work -> f work)
    ~reduce:(fun _ _ -> ())
    work


let single_job { workers; _ } ~f work =
  let rec wait_until_ready handle =
    let { Worker.readys; _ } = Worker.select [handle] in
    match readys with
    | [] -> wait_until_ready handle
    | ready :: _ -> ready
  in
  match workers with
  | worker::_ ->
    Worker.call worker f work
    |> wait_until_ready
    |> Worker.get_result
  | [] ->
    failwith "This service contains no workers"


let mock () =
  Memory.get_heap_handle () |> ignore;
  { workers = []; number_of_workers = 1; bucket_multiplier = 1 }


let destroy _ =
  Worker.killall ()
OCaml

Innovation. Community. Security.