package rpc_parallel

  1. Overview
  2. Docs
Type-safe parallel library built on top of Async_rpc

Install

Dune Dependency

Authors

Maintainers

Sources

v0.17.0.tar.gz
sha256=160c3c60b224f3238810858435e8ce5d51376edf6fe2af6cc0ed02edf0166e08

doc/src/rpc_parallel/utils.ml.html

Source file utils.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
open Core
open Poly
open Async

module Worker_id = struct
  let create = Uuid_unix.create

  (* If we do not use the stable sexp serialization, when running
     inline tests, we will create UUIDs that fail tests *)
  module T = Uuid.Stable.V1

  type t = T.t [@@deriving sexp, bin_io]

  include Comparable.Make_binable (T)
  include Hashable.Make_binable (T)
  include Sexpable.To_stringable (T)

  let pp fmt t = String.pp fmt (Sexp.to_string ([%sexp_of: t] t))
end

module Worker_type_id = Unique_id.Int ()

module Internal_connection_state = struct
  type ('worker_state, 'conn_state) t1 =
    { worker_state : 'worker_state
    ; conn_state : 'conn_state
    ; worker_id : Worker_id.t
    }

  type ('worker_state, 'conn_state) t =
    Rpc.Connection.t * ('worker_state, 'conn_state) t1 Set_once.t
end

let try_within ~monitor f =
  let ivar = Ivar.create () in
  Scheduler.within ~monitor (fun () ->
    Monitor.try_with ~run:`Now ~rest:`Raise f
    >>> fun r -> Ivar.fill_exn ivar (Result.map_error r ~f:Error.of_exn));
  Ivar.read ivar
;;

let try_within_exn ~monitor f =
  match%map try_within ~monitor f with
  | Ok x -> x
  | Error e -> Error.raise e
;;

(* Use /proc/PID/exe to get the currently running executable.
   - argv[0] might have been deleted (this is quite common with jenga)
   - `cp /proc/PID/exe dst` works as expected while `cp /proc/self/exe dst` does not *)
let our_binary =
  let our_binary_lazy = lazy (Unix.getpid () |> Pid.to_int |> sprintf "/proc/%d/exe") in
  fun () -> Lazy.force our_binary_lazy
;;

let our_md5 =
  let our_md5_lazy =
    lazy
      (Process.run ~prog:"md5sum" ~args:[ our_binary () ] ()
       >>|? fun our_md5 ->
       let our_md5, _ = String.lsplit2_exn ~on:' ' our_md5 in
       our_md5)
  in
  fun () -> Lazy.force our_md5_lazy
;;

let is_child_env_var = "ASYNC_PARALLEL_IS_CHILD_MACHINE"

let whoami () =
  match Sys.getenv is_child_env_var with
  | Some _ -> `Worker
  | None -> `Master
;;

let clear_env () = Unix.unsetenv is_child_env_var

let validate_env env =
  match List.find env ~f:(fun (key, _) -> key = is_child_env_var) with
  | Some e ->
    Or_error.error
      "Environment variable conflicts with Rpc_parallel machinery"
      e
      [%sexp_of: string * string]
  | None -> Ok ()
;;

(* Don't run tests in the worker if we are running an expect test. A call to
   [Rpc_parallel.For_testing.initialize] will initialize the worker and start the
   Async scheduler. *)
let force_drop_inline_test =
  if Core.am_running_test then [ "FORCE_DROP_INLINE_TEST", "" ] else []
;;

let create_worker_env ~extra =
  let open Or_error.Let_syntax in
  let%map () = validate_env extra in
  extra
  @ force_drop_inline_test
  @ For_testing_internal.worker_environment ()
  @ [ is_child_env_var, "" ]
;;

let to_daemon_fd_redirection = function
  | `Dev_null -> `Dev_null
  | `File_append s -> `File_append s
  | `File_truncate s -> `File_truncate s
;;
OCaml

Innovation. Community. Security.