package picos_mux

  1. Overview
  2. Docs

Source file picos_mux_random.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
open Picos
module Htbl = Picos_aux_htbl

module Collection = struct
  type 'a t = (int, 'a) Htbl.t

  module Key = struct
    type t = int

    let equal = Int.equal
    let hash = Fun.id
  end

  let create () = Htbl.create ~hashed_type:(module Key) ()

  let rec push t value =
    let key = Random.bits () in
    if not (Htbl.try_add t key value) then push t value

  let rec pop_exn t =
    let key = Htbl.find_random_exn t in
    try Htbl.remove_exn t key with Not_found -> pop_exn t

  let is_empty t =
    match Htbl.find_random_exn t with _ -> false | exception Not_found -> true
end

type ready =
  | Spawn of Fiber.t * (Fiber.t -> unit)
  | Current of Fiber.t * (Fiber.t, unit) Effect.Deep.continuation
  | Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
  | Resume of
      Fiber.t
      * ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation
  | Raise of
      Fiber.t
      * (unit, unit) Effect.Deep.continuation
      * exn
      * Printexc.raw_backtrace
  | Return of Fiber.t * (unit, unit) Effect.Deep.continuation

type t = {
  ready : ready Collection.t;
  mutable num_waiters_non_zero : bool;
  num_alive_fibers : int Atomic.t;
  mutable resume :
    Trigger.t ->
    Fiber.t ->
    ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation ->
    unit;
  num_waiters : int ref;
  condition : Condition.t;
  mutex : Mutex.t;
  mutable current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option;
  mutable yield : ((unit, unit) Effect.Deep.continuation -> unit) option;
  mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option;
  mutable handler : (unit, unit) Effect.Deep.handler;
  mutable run : bool;
}

let fiber_key : Fiber.Maybe.t ref Picos_thread.TLS.t =
  Picos_thread.TLS.create ()

let get () =
  match Picos_thread.TLS.get_exn fiber_key with
  | p -> p
  | exception Picos_thread.TLS.Not_set ->
      let p = ref Fiber.Maybe.nothing in
      Picos_thread.TLS.set fiber_key p;
      p

let[@inline] relaxed_wakeup t ~known_not_empty =
  if
    t.num_waiters_non_zero
    && (known_not_empty || not (Collection.is_empty t.ready))
  then begin
    Mutex.lock t.mutex;
    Mutex.unlock t.mutex;
    Condition.signal t.condition
  end

let exec p t ready =
  begin
    p :=
      match ready with
      | Spawn (fiber, _)
      | Raise (fiber, _, _, _)
      | Return (fiber, _)
      | Current (fiber, _)
      | Continue (fiber, _)
      | Resume (fiber, _) ->
          Fiber.Maybe.of_fiber fiber
  end;
  match ready with
  | Spawn (fiber, main) -> Effect.Deep.match_with main fiber t.handler
  | Raise (_, k, exn, bt) -> Effect.Deep.discontinue_with_backtrace k exn bt
  | Return (_, k) -> Effect.Deep.continue k ()
  | Current (fiber, k) -> Effect.Deep.continue k fiber
  | Continue (fiber, k) -> Fiber.continue fiber k ()
  | Resume (fiber, k) -> Fiber.resume fiber k

let rec next p t =
  match Collection.pop_exn t.ready with
  | ready ->
      relaxed_wakeup t ~known_not_empty:false;
      exec p t ready
  | exception Not_found ->
      p := Fiber.Maybe.nothing;
      if Atomic.get t.num_alive_fibers <> 0 then begin
        Mutex.lock t.mutex;
        let n = !(t.num_waiters) + 1 in
        t.num_waiters := n;
        if n = 1 then t.num_waiters_non_zero <- true;
        if Collection.is_empty t.ready && Atomic.get t.num_alive_fibers <> 0
        then begin
          match Condition.wait t.condition t.mutex with
          | () ->
              let n = !(t.num_waiters) - 1 in
              t.num_waiters := n;
              if n = 0 then t.num_waiters_non_zero <- false;
              Mutex.unlock t.mutex;
              next p t
          | exception async_exn ->
              let n = !(t.num_waiters) - 1 in
              t.num_waiters := n;
              if n = 0 then t.num_waiters_non_zero <- false;
              Mutex.unlock t.mutex;
              raise async_exn
        end
        else begin
          let n = !(t.num_waiters) - 1 in
          t.num_waiters := n;
          if n = 0 then t.num_waiters_non_zero <- false;
          Mutex.unlock t.mutex;
          next p t
        end
      end
      else begin
        Mutex.lock t.mutex;
        Mutex.unlock t.mutex;
        Condition.broadcast t.condition
      end

let default_fatal_exn_handler exn =
  prerr_string "Fatal error: exception ";
  prerr_string (Printexc.to_string exn);
  prerr_char '\n';
  Printexc.print_backtrace stderr;
  flush stderr;
  exit 2

let context ?fatal_exn_handler () =
  Select.check_configured ();
  let exnc =
    match fatal_exn_handler with
    | None -> default_fatal_exn_handler
    | Some handler ->
        fun exn ->
          handler exn;
          raise exn
  in
  let t =
    {
      ready = Collection.create ();
      num_waiters_non_zero = false;
      num_alive_fibers = Atomic.make 1 |> Multicore_magic.copy_as_padded;
      resume = Obj.magic ();
      num_waiters = ref 0 |> Multicore_magic.copy_as_padded;
      condition = Condition.create ();
      mutex = Mutex.create ();
      current = Obj.magic ();
      yield = Obj.magic ();
      return = Obj.magic ();
      handler = Obj.magic ();
      run = false;
    }
  in
  t.resume <-
    (fun trigger fiber k ->
      let resume = Resume (fiber, k) in
      Fiber.unsuspend fiber trigger |> ignore;
      Collection.push t.ready resume;
      let non_zero =
        match Mutex.lock t.mutex with
        | () ->
            let non_zero = t.num_waiters_non_zero in
            Mutex.unlock t.mutex;
            non_zero
        | exception Sys_error _ -> false
      in
      if non_zero then Condition.signal t.condition);
  t.current <-
    Some
      (fun k ->
        let p = Picos_thread.TLS.get_exn fiber_key in
        let fiber = Fiber.Maybe.to_fiber !p in
        Collection.push t.ready (Current (fiber, k));
        next p t);
  t.yield <-
    Some
      (fun k ->
        let p = Picos_thread.TLS.get_exn fiber_key in
        let fiber = Fiber.Maybe.to_fiber !p in
        Collection.push t.ready (Continue (fiber, k));
        next p t);
  t.return <-
    Some
      (fun k ->
        let p = Picos_thread.TLS.get_exn fiber_key in
        let fiber = Fiber.Maybe.to_fiber !p in
        Collection.push t.ready (Return (fiber, k));
        next p t);
  t.handler <-
    {
      retc =
        (fun () ->
          Atomic.decr t.num_alive_fibers;
          let p = Picos_thread.TLS.get_exn fiber_key in
          next p t);
      exnc;
      effc =
        (fun (type a) (e : a Effect.t) :
             ((a, _) Effect.Deep.continuation -> _) option ->
          match e with
          | Fiber.Current -> t.current
          | Fiber.Spawn r ->
              let p = Picos_thread.TLS.get_exn fiber_key in
              let fiber = Fiber.Maybe.to_fiber !p in
              if Fiber.is_canceled fiber then t.yield
              else begin
                Atomic.incr t.num_alive_fibers;
                Collection.push t.ready (Spawn (r.fiber, r.main));
                relaxed_wakeup t ~known_not_empty:true;
                t.return
              end
          | Fiber.Yield -> t.yield
          | Computation.Cancel_after r -> begin
              let p = Picos_thread.TLS.get_exn fiber_key in
              let fiber = Fiber.Maybe.to_fiber !p in
              if Fiber.is_canceled fiber then t.yield
              else
                match
                  Select.cancel_after r.computation ~seconds:r.seconds r.exn
                    r.bt
                with
                | () -> t.return
                | exception exn ->
                    let bt = Printexc.get_raw_backtrace () in
                    Some
                      (fun k ->
                        Collection.push t.ready (Raise (fiber, k, exn, bt));
                        next p t)
            end
          | Trigger.Await trigger ->
              Some
                (fun k ->
                  let p = Picos_thread.TLS.get_exn fiber_key in
                  let fiber = Fiber.Maybe.to_fiber !p in
                  if Fiber.try_suspend fiber trigger fiber k t.resume then
                    next p t
                  else begin
                    Collection.push t.ready (Resume (fiber, k));
                    next p t
                  end)
          | _ -> None);
    };
  t

let runner_on_this_thread t =
  Select.check_configured ();
  next (get ()) t

let rec await t =
  if t.num_waiters_non_zero then begin
    match Condition.wait t.condition t.mutex with
    | () -> await t
    | exception async_exn ->
        Mutex.unlock t.mutex;
        raise async_exn
  end
  else Mutex.unlock t.mutex

let run_fiber ?context:t_opt fiber main =
  let t =
    match t_opt with
    | Some t ->
        Select.check_configured ();
        t
    | None -> context ()
  in
  Mutex.lock t.mutex;
  if t.run then begin
    Mutex.unlock t.mutex;
    invalid_arg "already run"
  end
  else begin
    t.run <- true;
    Mutex.unlock t.mutex;
    get () := Fiber.Maybe.of_fiber fiber;
    Effect.Deep.match_with main fiber t.handler;
    Mutex.lock t.mutex;
    await t
  end

let[@inline never] run ?context fiber main computation =
  run_fiber ?context fiber main;
  Computation.peek_exn computation

let run ?context ?forbid main =
  let forbid = match forbid with None -> false | Some forbid -> forbid in
  let computation = Computation.create ~mode:`LIFO () in
  let fiber = Fiber.create ~forbid computation in
  let main _ = Computation.capture computation main () in
  run ?context fiber main computation

let rec run_fiber_on n fiber main runner_main context =
  if n <= 1 then run_fiber ~context fiber main
  else
    let runner =
      try Domain.spawn runner_main
      with exn ->
        let bt = Printexc.get_raw_backtrace () in
        run ~context Fun.id;
        Printexc.raise_with_backtrace exn bt
    in
    match run_fiber_on (n - 1) fiber main runner_main context with
    | result ->
        begin
          match Domain.join runner with
          | None -> ()
          | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
        end;
        result
    | exception exn ->
        let bt = Printexc.get_raw_backtrace () in
        begin
          match Domain.join runner with
          | None -> ()
          | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
        end;
        Printexc.raise_with_backtrace exn bt

let run_fiber_on ?fatal_exn_handler ~n_domains fiber main =
  if n_domains < 1 then invalid_arg "n_domains must be positive";
  let context = context ?fatal_exn_handler () in
  let runner_main =
    if n_domains = 1 then fun () -> None
    else
      let bt_status = Printexc.backtrace_status () in
      fun () ->
        Printexc.record_backtrace bt_status;
        match runner_on_this_thread context with
        | () -> None
        | exception exn ->
            let bt = Printexc.get_raw_backtrace () in
            Some (exn, bt)
  in
  run_fiber_on n_domains fiber main runner_main context

let[@inline never] run_on ?fatal_exn_handler ~n_domains fiber main computation =
  run_fiber_on ?fatal_exn_handler ~n_domains fiber main;
  Computation.peek_exn computation

let run_on ?fatal_exn_handler ~n_domains ?forbid main =
  let forbid = match forbid with None -> false | Some forbid -> forbid in
  let computation = Computation.create ~mode:`LIFO () in
  let fiber = Fiber.create ~forbid computation in
  let main _ = Computation.capture computation main () in
  run_on ?fatal_exn_handler ~n_domains fiber main computation
OCaml

Innovation. Community. Security.