package rpc_parallel
Type-safe parallel library built on top of Async_rpc
Install
Dune Dependency
Authors
Maintainers
Sources
v0.17.0.tar.gz
sha256=160c3c60b224f3238810858435e8ce5d51376edf6fe2af6cc0ed02edf0166e08
doc/src/rpc_parallel/utils.ml.html
Source file utils.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
open Core open Poly open Async module Worker_id = struct let create = Uuid_unix.create (* If we do not use the stable sexp serialization, when running inline tests, we will create UUIDs that fail tests *) module T = Uuid.Stable.V1 type t = T.t [@@deriving sexp, bin_io] include Comparable.Make_binable (T) include Hashable.Make_binable (T) include Sexpable.To_stringable (T) let pp fmt t = String.pp fmt (Sexp.to_string ([%sexp_of: t] t)) end module Worker_type_id = Unique_id.Int () module Internal_connection_state = struct type ('worker_state, 'conn_state) t1 = { worker_state : 'worker_state ; conn_state : 'conn_state ; worker_id : Worker_id.t } type ('worker_state, 'conn_state) t = Rpc.Connection.t * ('worker_state, 'conn_state) t1 Set_once.t end let try_within ~monitor f = let ivar = Ivar.create () in Scheduler.within ~monitor (fun () -> Monitor.try_with ~run:`Now ~rest:`Raise f >>> fun r -> Ivar.fill_exn ivar (Result.map_error r ~f:Error.of_exn)); Ivar.read ivar ;; let try_within_exn ~monitor f = match%map try_within ~monitor f with | Ok x -> x | Error e -> Error.raise e ;; (* Use /proc/PID/exe to get the currently running executable. - argv[0] might have been deleted (this is quite common with jenga) - `cp /proc/PID/exe dst` works as expected while `cp /proc/self/exe dst` does not *) let our_binary = let our_binary_lazy = lazy (Unix.getpid () |> Pid.to_int |> sprintf "/proc/%d/exe") in fun () -> Lazy.force our_binary_lazy ;; let our_md5 = let our_md5_lazy = lazy (Process.run ~prog:"md5sum" ~args:[ our_binary () ] () >>|? fun our_md5 -> let our_md5, _ = String.lsplit2_exn ~on:' ' our_md5 in our_md5) in fun () -> Lazy.force our_md5_lazy ;; let is_child_env_var = "ASYNC_PARALLEL_IS_CHILD_MACHINE" let whoami () = match Sys.getenv is_child_env_var with | Some _ -> `Worker | None -> `Master ;; let clear_env () = Unix.unsetenv is_child_env_var let validate_env env = match List.find env ~f:(fun (key, _) -> key = is_child_env_var) with | Some e -> Or_error.error "Environment variable conflicts with Rpc_parallel machinery" e [%sexp_of: string * string] | None -> Ok () ;; (* Don't run tests in the worker if we are running an expect test. A call to [Rpc_parallel.For_testing.initialize] will initialize the worker and start the Async scheduler. *) let force_drop_inline_test = if Core.am_running_test then [ "FORCE_DROP_INLINE_TEST", "" ] else [] ;; let create_worker_env ~extra = let open Or_error.Let_syntax in let%map () = validate_env extra in extra @ force_drop_inline_test @ For_testing_internal.worker_environment () @ [ is_child_env_var, "" ] ;; let to_daemon_fd_redirection = function | `Dev_null -> `Dev_null | `File_append s -> `File_append s | `File_truncate s -> `File_truncate s ;;
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>