package moonpool

  1. Overview
  2. Docs
Pools of threads supported by a pool of domains

Install

Dune Dependency

Authors

Maintainers

Sources

moonpool-0.6.tbz
sha256=3efd095c82a37bba8c7ab6a2532aee3c445ebe1ecaed84ef3ffb560bc65e7633
sha512=e4bcab82e6638299c2d0beb1dbf204f7b43379a5387418c6edff85b9bf90c13ad1bdd8eb44b69cd421268d1bc45bcf918bcf77e1c924348211ac27d6643aac78

doc/src/moonpool/suspend_.ml.html

Source file suspend_.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
type suspension = unit Exn_bt.result -> unit
type task = unit -> unit

type suspension_handler = {
  handle:
    run:(task -> unit) ->
    resume:(suspension -> unit Exn_bt.result -> unit) ->
    suspension ->
    unit;
}
[@@unboxed]

type with_suspend_handler =
  | WSH : {
      on_suspend: unit -> 'state;
          (** on_suspend called when [f()] suspends itself. *)
      run: 'state -> task -> unit;  (** run used to schedule new tasks *)
      resume: 'state -> suspension -> unit Exn_bt.result -> unit;
          (** resume run the suspension. Must be called exactly once. *)
    }
      -> with_suspend_handler

[@@@ifge 5.0]
[@@@ocaml.alert "-unstable"]

module A = Atomic_

type _ Effect.t +=
  | Suspend : suspension_handler -> unit Effect.t
  | Yield : unit Effect.t

let[@inline] yield () = Effect.perform Yield
let[@inline] suspend h = Effect.perform (Suspend h)

let with_suspend (WSH { on_suspend; run; resume }) (f : unit -> unit) : unit =
  let module E = Effect.Deep in
  (* effect handler *)
  let effc : type e. e Effect.t -> ((e, _) E.continuation -> _) option =
    function
    | Suspend h ->
      (* TODO: discontinue [k] if current fiber (if any) is cancelled? *)
      Some
        (fun k ->
          let state = on_suspend () in
          let k' : suspension = function
            | Ok () -> E.continue k ()
            | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt
          in
          h.handle ~run:(run state) ~resume:(resume state) k')
    | Yield ->
      (* TODO: discontinue [k] if current fiber (if any) is cancelled? *)
      Some
        (fun k ->
          let state = on_suspend () in
          let k' : suspension = function
            | Ok () -> E.continue k ()
            | Error (exn, bt) -> E.discontinue_with_backtrace k exn bt
          in
          resume state k' @@ Ok ())
    | _ -> None
  in

  E.try_with f () { E.effc }

(* DLA interop *)
let prepare_for_await () : Dla_.t =
  (* current state *)
  let st : (_ * suspension) option A.t = A.make None in

  let release () : unit =
    match A.exchange st None with
    | None -> ()
    | Some (resume, k) -> resume k @@ Ok ()
  and await () : unit =
    suspend { handle = (fun ~run:_ ~resume k -> A.set st (Some (resume, k))) }
  in

  let t = { Dla_.release; await } in
  t

[@@@ocaml.alert "+unstable"]
[@@@else_]

let[@inline] with_suspend (WSH _) f = f ()
let[@inline] prepare_for_await () = { Dla_.release = ignore; await = ignore }

[@@@endif]
OCaml

Innovation. Community. Security.