package rpc_parallel

  1. Overview
  2. Docs

Source file parallel_intf.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
open! Core
open! Async

(** See README for more details *)

(** A [('worker, 'query, 'response) Function.t] is a type-safe function ['query ->
    'response Deferred.t] that can only be run on a ['worker]. Under the hood it
    represents an Async Rpc protocol that we know a ['worker] will implement. *)
module type Function = sig
  type ('worker, 'query, +'response) t

  module Direct_pipe : sig
    type nonrec ('worker, 'query, 'response) t =
      ( 'worker
      , 'query * ('response Rpc.Pipe_rpc.Pipe_message.t -> Rpc.Pipe_rpc.Pipe_response.t)
      , Rpc.Pipe_rpc.Id.t )
        t
  end

  val map : ('worker, 'query, 'a) t -> f:('a -> 'b) -> ('worker, 'query, 'b) t

  val contra_map
    :  ('worker, 'a, 'response) t
    -> f:('b -> 'a)
    -> ('worker, 'b, 'response) t

  (** Common functions that are implemented by all workers *)

  (** This implementation will add another [Log.Output] for [Log.Global] that transfers
      log messages to the returned pipe. You can subscribe to a worker's log more than
      once and from different processes, as each call simply adds a new [Log.Output].
      Closing the pipe will remove the corresponding [Log.Output].

      NOTE: You will never get any log messages before this implementation has run (there
      is no queuing of log messages). As a consequence, you will never get any log
      messages written in a worker's init functions. *)
  val async_log : (_, unit, Log.Message.Stable.V2.t Pipe.Reader.t) t

  (** A given process can have multiple worker servers running (of the same or different
      worker types). This implementation closes the server on which it is run. All
      existing open connections will remain open, but no further connections to this
      worker server will be accepted.

      NOTE: calling [close_server] on a worker process that is only running one worker
      server will leave a stranded worker process if no other cleanup has been setup
      (e.g. setting up [on_client_disconnect] or [Connection.close_finished] handlers) *)
  val close_server : (_, unit, unit) t
end

module type Worker = sig
  type ('worker, 'query, 'response) _function

  (** A [Worker.t] type is defined [with bin_io] so it is possible to create functions
      that take a worker as an argument. *)
  type t [@@deriving bin_io, sexp_of]

  (** A type alias to make the [Connection] signature more readable *)
  type worker = t

  type 'a functions

  (** Accessor for the functions implemented by this worker type *)
  val functions : t functions

  type worker_state_init_arg
  type connection_state_init_arg

  module Id : Identifiable

  val id : t -> Id.t

  (** [serve arg] will start an Rpc server in process implementing all the functions
      of the given worker. *)
  val serve
    :  ?max_message_size:int
    -> ?handshake_timeout:Time.Span.t
    -> ?heartbeat_config:Rpc.Connection.Heartbeat_config.t
    -> worker_state_init_arg
    -> worker Deferred.t

  module Connection : sig
    type t [@@deriving sexp_of]

    (** The [id] of the connected worker *)
    val worker_id : t -> Id.t

    (** Run functions implemented by this worker *)
    val run
      :  t
      -> f:(worker, 'query, 'response) _function
      -> arg:'query
      -> 'response Or_error.t Deferred.t

    val run_exn
      :  t
      -> f:(worker, 'query, 'response) _function
      -> arg:'query
      -> 'response Deferred.t

    (** Connect to a given worker, returning a type wrapped [Rpc.Connection.t] that can be
        used to run functions. *)
    val client : worker -> connection_state_init_arg -> t Or_error.t Deferred.t

    val client_exn : worker -> connection_state_init_arg -> t Deferred.t

    (** [with_client worker init_arg f] connects to the [worker]'s server, initializes the
        connection state with [init_arg]  and runs [f] until an exception is thrown or
        until the returned Deferred is determined.

        NOTE: You should be careful when using this with [Pipe_rpc].
        See [Rpc.Connection.with_close] for more information. *)
    val with_client
      :  worker
      -> connection_state_init_arg
      -> f:(t -> 'a Deferred.t)
      -> 'a Or_error.t Deferred.t


    val close : t -> unit Deferred.t
    val close_finished : t -> unit Deferred.t
    val close_reason : t -> on_close:[ `started | `finished ] -> Info.t Deferred.t
    val is_closed : t -> bool
  end

  module Shutdown_on (M : T1) : sig
    type _ t =
      | Connection_closed
        : (connection_state_init_arg:connection_state_init_arg
           -> Connection.t M.t Deferred.t)
            t
      (** An initial connection to the worker is established. The worker shuts itself down
          when [Rpc.Connection.close_finished] on this connection, which is likely when
          the master process exits or explicitly calls [Rpc.Connection.close], but can
          also result from network problems or long async cycles. *)
      | Heartbeater_connection_timeout : worker M.t Deferred.t t
      (** A "heartbeater" connection is established between the worker and its master. The
          worker shuts itself down when [Rpc.Connection.close_finished] on this
          connection, which is likely when the master process exits, but can also result
          from network problems or long async cycles. *)
      | Called_shutdown_function : worker M.t Deferred.t t
      (** WARNING! Worker's spawned with this variant do not shutdown when the master
          process exits. The worker only shuts itself down on an explicit shutdown
          request. *)
  end

  (** The various [spawn] functions create a new worker process that implements the
      functions specified in the [Worker_spec].

      [name] will be attached to certain error messages and is useful for debugging.

      [env] extends the environment of the spawned worker process.

      [connection_timeout] is used for various internal timeouts. This may need be to
      increased if the init arg is really large (serialization and deserialization
      takes more than [connection_timeout]).

      [cd] changes the current working directory of a spawned worker process.

      [shutdown_on] specifies when a worker should shut itself down.

      [on_failure exn] will be called in the spawning process upon the worker process
      raising a background exception. All exceptions raised before functions return will be
      returned to the caller. [on_failure] will be called in [Monitor.current ()] at the
      time of this spawn call. The worker initiates shutdown upon sending the exception
      to the master process.

      [worker_state_init_arg] (below) will be passed to [init_worker_state] of the given
      [Worker_spec] module. This initializes a persistent worker state for all connections
      to this worker. *)
  type 'a with_spawn_args =
    ?how:How_to_run.t (** default [How_to_run.local] *)
    -> ?name:string
    -> ?env:(string * string) list
    -> ?connection_timeout:Time.Span.t (** default 10 sec *)
    -> ?cd:string (** default / *)
    -> on_failure:(Error.t -> unit)
    -> 'a

  (** The spawned worker process daemonizes. Any initialization errors that wrote to
      stderr (Rpc_parallel internal initialization, not user initialization code) will be
      captured and rewritten to the spawning process's stderr with the prefix
      "[WORKER %NAME% STDERR]".

      [redirect_stdout] and [redirect_stderr] specify stdout and stderr of the worker
      process. *)
  val spawn
    : (?umask:int (** defaults to use existing umask *)
       -> shutdown_on:'a Shutdown_on(Or_error).t
       -> redirect_stdout:Fd_redirection.t
       -> redirect_stderr:Fd_redirection.t
       -> worker_state_init_arg
       -> 'a)
        with_spawn_args

  val spawn_exn
    : (?umask:int (** defaults to use existing umask *)
       -> shutdown_on:'a Shutdown_on(Monad.Ident).t
       -> redirect_stdout:Fd_redirection.t
       -> redirect_stderr:Fd_redirection.t
       -> worker_state_init_arg
       -> 'a)
        with_spawn_args

  module Spawn_in_foreground_result : sig
    type 'a t = ('a * Process.t) Or_error.t
  end

  (** Similar to [spawn] but the worker process does not daemonize. If the process was
      spawned on a remote host, the ssh [Process.t] is returned.

      Remember to call [Process.wait] on the returned [Process.t] to avoid zombie
      processes. *)
  val spawn_in_foreground
    : (shutdown_on:'a Shutdown_on(Spawn_in_foreground_result).t
       -> worker_state_init_arg
       -> 'a)
        with_spawn_args

  module Spawn_in_foreground_exn_result : sig
    type 'a t = 'a * Process.t
  end

  val spawn_in_foreground_exn
    : (shutdown_on:'a Shutdown_on(Spawn_in_foreground_exn_result).t
       -> worker_state_init_arg
       -> 'a)
        with_spawn_args

  (** [shutdown] attempts to connect to a worker. Upon success, [Shutdown.shutdown 0] is
      run in the worker. If you want strong guarantees that a worker did shutdown, consider
      using [spawn_in_foreground] and inspecting the [Process.t]. *)
  val shutdown : t -> unit Or_error.t Deferred.t

  module Deprecated : sig
    (** This is nearly identical to calling [spawn ~shutdown_on:Heartbeater_connection_timeout] and
        then [Connection.client]. The only difference is that this function handles
        shutting down the worker when [Connection.client] returns an error.

        Uses of [spawn_and_connect] that disregard [t] can likely be replaced with [spawn
        ~shutdown_on:Connection_closed]. If [t] is used for reconnecting, then you can use [spawn]
        followed by [Connection.client]. *)
    val spawn_and_connect
      : (?umask:int
         -> redirect_stdout:Fd_redirection.t
         -> redirect_stderr:Fd_redirection.t
         -> connection_state_init_arg:connection_state_init_arg
         -> worker_state_init_arg
         -> (t * Connection.t) Or_error.t Deferred.t)
          with_spawn_args

    val spawn_and_connect_exn
      : (?umask:int
         -> redirect_stdout:Fd_redirection.t
         -> redirect_stderr:Fd_redirection.t
         -> connection_state_init_arg:connection_state_init_arg
         -> worker_state_init_arg
         -> (t * Connection.t) Deferred.t)
          with_spawn_args
  end

  (** This module is used for internal testing of the rpc_parallel library. *)
  module For_internal_testing : sig
    module Spawn_in_foreground_result : sig
      type 'a t =
        ( 'a * Process.t
        , Error.t * [ `Worker_process of Unix.Exit_or_signal.t Deferred.t option ] )
          Result.t
    end

    val spawn_in_foreground
      : (shutdown_on:'a Shutdown_on(Spawn_in_foreground_result).t
         -> worker_state_init_arg
         -> 'a)
          with_spawn_args
  end
end

module type Functions = sig
  type worker
  type worker_state_init_arg
  type worker_state
  type connection_state_init_arg
  type connection_state
  type 'worker functions

  val functions : worker functions

  (** [init_worker_state] is called with the [init_arg] passed to [spawn] or [serve] *)
  val init_worker_state : worker_state_init_arg -> worker_state Deferred.t

  (** [init_connection_state] is called with the [init_arg] passed to [Connection.client]

      [connection] should only be used to register [close_finished] callbacks, not to
      dispatch.  *)
  val init_connection_state
    :  connection:Rpc.Connection.t
    -> worker_state:worker_state
    -> connection_state_init_arg
    -> connection_state Deferred.t
end

module type Creator = sig
  type ('worker, 'query, 'response) _function
  type ('worker, 'query, 'response) _direct
  type worker
  type worker_state
  type connection_state

  (** [create_rpc ?name ~f ~bin_input ~bin_output ()] will create an [Rpc.Rpc.t] with
      [name] if specified and use [f] as an implementation for this Rpc. It returns back a
      [_function], a type-safe Rpc protocol. *)
  val create_rpc
    :  ?name:string
    -> f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'response Deferred.t)
    -> bin_input:'query Bin_prot.Type_class.t
    -> bin_output:'response Bin_prot.Type_class.t
    -> unit
    -> (worker, 'query, 'response) _function

  (** [create_pipe ?name ~f ~bin_input ~bin_output ()] will create an [Rpc.Pipe_rpc.t]
      with [name] if specified. The implementation for this Rpc is a function that creates
      a [Pipe.Reader.t] and a [Pipe.Writer.t], then calls [f arg ~writer] and returns the
      reader.

      Notice that [aborted] is not exposed. The pipe is closed upon aborted. *)
  val create_pipe
    :  ?name:string
    -> f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'response Pipe.Reader.t Deferred.t)
    -> bin_input:'query Bin_prot.Type_class.t
    -> bin_output:'response Bin_prot.Type_class.t
    -> unit
    -> (worker, 'query, 'response Pipe.Reader.t) _function

  (** [create_direct_pipe ?name ~f ~bin_input ~bin_output ()] will create an
      [Rpc.Pipe_rpc.t] with [name] if specified. *)
  val create_direct_pipe
    :  ?name:string
    -> f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'response Rpc.Pipe_rpc.Direct_stream_writer.t
          -> unit Deferred.t)
    -> bin_input:'query Bin_prot.Type_class.t
    -> bin_output:'response Bin_prot.Type_class.t
    -> unit
    -> (worker, 'query, 'response) _direct

  (** [create_one_way ?name ~f ~bin_msg ()] will create an [Rpc.One_way.t] with [name] if
      specified and use [f] as an implementation. *)
  val create_one_way
    :  ?name:string
    -> f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> unit)
    -> bin_input:'query Bin_prot.Type_class.t
    -> unit
    -> (worker, 'query, unit) _function

  (** [create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response ()] generates a
      function allowing you to send a [query] and a pipe of [update]s to a worker. The
      worker will send back a [response]. It is up to you whether to send a [response]
      before or after finishing with the pipe; Rpc_parallel doesn't care. *)
  val create_reverse_pipe
    :  ?name:string
    -> f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'update Pipe.Reader.t
          -> 'response Deferred.t)
    -> bin_query:'query Bin_prot.Type_class.t
    -> bin_update:'update Bin_prot.Type_class.t
    -> bin_response:'response Bin_prot.Type_class.t
    -> unit
    -> (worker, 'query * 'update Pipe.Reader.t, 'response) _function

  (** [create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response ()] generates a
      function allowing you to send a [query] and a direct stream of [update]s to a
      worker. The worker will send back a [response]. It is up to you whether to send a
      [response] before or after finishing with the pipe; Rpc_parallel doesn't care. *)
  val create_reverse_direct_pipe
    :  ?name:string
    -> f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'update Pipe.Reader.t
          -> 'response Deferred.t)
    -> bin_query:'query Bin_prot.Type_class.t
    -> bin_update:'update Bin_prot.Type_class.t
    -> bin_response:'response Bin_prot.Type_class.t
    -> unit
    -> ( worker
       , 'query
         * ('update Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Or_error.t Deferred.t)
       , 'response )
         _function

  (** [of_async_rpc ~f rpc] is the analog to [create_rpc] but instead of creating an Rpc
      protocol, it uses the supplied one *)
  val of_async_rpc
    :  f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'response Deferred.t)
    -> ('query, 'response) Rpc.Rpc.t
    -> (worker, 'query, 'response) _function

  (** [of_async_pipe_rpc ~f rpc] is the analog to [create_pipe] but instead of creating a
      Pipe rpc protocol, it uses the supplied one.

      Notice that [aborted] is not exposed. The pipe is closed upon aborted. *)
  val of_async_pipe_rpc
    :  f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'response Pipe.Reader.t Deferred.t)
    -> ('query, 'response, Error.t) Rpc.Pipe_rpc.t
    -> (worker, 'query, 'response Pipe.Reader.t) _function

  (** [of_async_direct_pipe_rpc ~f rpc] is the analog to [create_direct_pipe] but instead
      of creating a Pipe rpc protocol, it uses the supplied one. *)
  val of_async_direct_pipe_rpc
    :  f:(worker_state:worker_state
          -> conn_state:connection_state
          -> 'query
          -> 'response Rpc.Pipe_rpc.Direct_stream_writer.t
          -> unit Deferred.t)
    -> ('query, 'response, Error.t) Rpc.Pipe_rpc.t
    -> (worker, 'query, 'response) _direct

  (** [of_async_one_way_rpc ~f rpc] is the analog to [create_one_way] but instead of
      creating a One_way rpc protocol, it uses the supplied one *)
  val of_async_one_way_rpc
    :  f:(worker_state:worker_state -> conn_state:connection_state -> 'query -> unit)
    -> 'query Rpc.One_way.t
    -> (worker, 'query, unit) _function
end

(** Specification for the creation of a worker type *)
module type Worker_spec = sig
  type ('worker, 'query, 'response) _function
  type ('worker, 'query, 'response) _direct

  (** A type to encapsulate all the functions that can be run on this worker. Using a
      record type here is often the most convenient and readable. *)
  type 'worker functions

  (** State associated with each [Worker.t]. If this state is mutable, you must
      think carefully when making multiple connections to the same spawned worker. *)
  module Worker_state : sig
    type t
    type init_arg [@@deriving bin_io]
  end

  (** State associated with each connection to a [Worker.t] *)
  module Connection_state : sig
    type t
    type init_arg [@@deriving bin_io]
  end

  (** The functions that can be run on this worker type *)
  module Functions
      (C : Creator
       with type worker_state = Worker_state.t
        and type connection_state = Connection_state.t
        and type ('w, 'q, 'r) _function := ('w, 'q, 'r) _function
        and type ('w, 'q, 'r) _direct := ('w, 'q, 'r) _direct) :
    Functions
    with type worker := C.worker
     and type 'a functions := 'a functions
     and type worker_state := Worker_state.t
     and type worker_state_init_arg := Worker_state.init_arg
     and type connection_state := Connection_state.t
     and type connection_state_init_arg := Connection_state.init_arg
end

module type Parallel = sig
  module Function : Function

  module type Worker = Worker with type ('w, 'q, 'r) _function := ('w, 'q, 'r) Function.t
  module type Functions = Functions

  module type Creator =
    Creator
    with type ('w, 'q, 'r) _function := ('w, 'q, 'r) Function.t
     and type ('w, 'q, 'r) _direct := ('w, 'q, 'r) Function.Direct_pipe.t

  module type Worker_spec =
    Worker_spec
    with type ('w, 'q, 'r) _function := ('w, 'q, 'r) Function.t
     and type ('w, 'q, 'r) _direct := ('w, 'q, 'r) Function.Direct_pipe.t

  (** module Worker = Make(T)

      The [Worker] module has specialized functions to spawn workers and run functions on
      workers. *)
  module Make (S : Worker_spec) :
    Worker
    with type 'a functions := 'a S.functions
     and type worker_state_init_arg := S.Worker_state.init_arg
     and type connection_state_init_arg := S.Connection_state.init_arg

  (** [start_app command] should be called from the top-level in order to start the
      parallel application. This function will parse certain environment variables and
      determine whether to start as a master or a worker.

      [rpc_max_message_size], [rpc_handshake_timeout], [rpc_heartbeat_config] can be used
      to alter the rpc defaults. These rpc settings will be used for all connections.
      This can be useful if you have long async jobs. *)
  val start_app
    :  ?rpc_max_message_size:int
    -> ?rpc_handshake_timeout:Time.Span.t
    -> ?rpc_heartbeat_config:Rpc.Connection.Heartbeat_config.t
    -> Command.t
    -> unit

  (** Use [State.get] to query whether the current process has been initialized as an rpc
      parallel master ([start_app] or [init_master_exn] has been called). We return a
      [State.t] rather than a [bool] so that you can require evidence at the type level.
      If you want to certify, as a precondition, for some function that [start_app] was
      used, require a [State.t] as an argument. If you don't need the [State.t] anymore,
      just pattern match on it. *)
  module State : sig
    type t = private [< `started ]

    val get : unit -> t option
  end

  module For_testing : sig
    (** [initialize [%here]] must be called at the top level of any files that have inline
        or expect tests that use [Rpc_parallel]. Further, these calls must come before the
        definitions of the tests, but after the definitions of any workers used in the
        tests.

        For example:

        {[

          let () = Rpc_parallel.For_testing.initialize [%here]

          let%expect_test "" =
            run_code_with_rpc_parallel ();
            [%expect {| output |}]
          ;;

        ]}
    *)
    val initialize : Source_code_position.t -> unit
  end

  (** If you want more direct control over your executable, you can use the [Expert]
      module instead of [start_app]. If you use [Expert], you are responsible for starting
      the master and worker rpc servers. [worker_command_args] will be the arguments sent
      to each spawned worker. Running your executable with these args must follow a code
      path that calls [worker_init_before_async_exn] and then [start_worker_server_exn].
      An easy way to do this is to use [worker_command]. *)
  module Expert : sig
    (** [start_master_server_exn] must be called in the single master process. It is
        necessary to be able to spawn workers. Raises if the process was spawned.

        If [pass_name] is [false], the [?name] argument to spawned workers will not be
        propagated into the worker's command line. This override is only needed to support
        the "deprecated option" for implementing worker commands described below. *)
    val start_master_server_exn
      :  ?rpc_max_message_size:int
      -> ?rpc_handshake_timeout:Time.Span.t
      -> ?rpc_heartbeat_config:Rpc.Connection.Heartbeat_config.t
      -> ?pass_name:bool (** default: true *)
      -> worker_command_args:string list
      -> unit
      -> unit

    (** You have two options for implementing the worker process, a simple one and a
        deprecated one.

        Simple option: just make sure [worker_command] is somewhere in the command
        hierarchy of the same executable in which [start_master_server_exn] is called,
        with a subcommand path equal to [worker_command_args]. It is possible for multiple
        calls to [start_master_server_exn] to share the same [worker_command_args].

        Deprecated option: implement something at least as complicated yourself using
        [worker_init_before_async_exn] and [start_worker_server_exn]. This option may go
        away in the future. *)

    val worker_command : Command.t

    module Worker_env : sig
      type t
    end

    (** [worker_init_before_async_exn] must be called in a spawned worker process before
        the async scheduler has started. You must not read from stdin before this function
        call.

        This has the side effect of calling [chdir]. *)
    val worker_init_before_async_exn : unit -> Worker_env.t

    (** [start_worker_server_exn] must be called in each spawned process. It is illegal to
        call both [start_master_server_exn] and [start_worker_server_exn] in the same
        process. Raises if the process was not spawned.

        This has the side effect of scheduling a job that completes the daemonization of
        this process (if the process should daemonize). This includes redirecting stdout
        and stderr according to [redirect_stdout] and [redirect_stderr]. All writes to
        stdout before this job runs are blackholed. All writes to stderr before this job
        runs are redirected to the spawning process's stderr. *)
    val start_worker_server_exn : Worker_env.t -> unit
  end
end
OCaml

Innovation. Community. Security.