package picos

  1. Overview
  2. Docs
Pico scheduler framework

Install

Dune Dependency

Authors

Maintainers

Sources

picos-0.1.0.tbz
sha256=0f2dcc67ddd127c68f388f2c36a8725a15723e6aeba7d1ddfcf4e016b54a4674
sha512=bee2a99458a451be285e2f13cc3a9deda8eed4e118bcdfc51c256d2da5bae92eec3386c318fe42dcf451421543b519dc064967158b3f417c9b7b44ce97c5fb75

doc/src/picos.fifos/picos_fifos.ml.html

Source file picos_fifos.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
open Picos
module Queue = Picos_mpsc_queue

(* As a minor optimization, we avoid allocating closures, which take slightly
   more memory than values of this type. *)
type ready =
  | Spawn of Fiber.t * (unit -> unit)
  | Continue of Fiber.t * (unit, unit) Effect.Deep.continuation
  | Resume of Fiber.t * (Exn_bt.t option, unit) Effect.Deep.continuation

type t = {
  ready : ready Queue.t;
  needs_wakeup : bool Atomic.t;
  num_alive_fibers : int Atomic.t;
  mutex : Mutex.t;
  condition : Condition.t;
  resume :
    Trigger.t ->
    Fiber.t ->
    (Exn_bt.t option, unit) Effect.Deep.continuation ->
    unit;
  retc : unit -> unit;
}

let rec spawn t n forbid computation = function
  | [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore
  | main :: mains ->
      let fiber = Fiber.create ~forbid computation in
      Queue.push t.ready (Spawn (fiber, main));
      spawn t (n + 1) forbid computation mains

let continue = Some (fun k -> Effect.Deep.continue k ())

let rec next t =
  match Queue.pop_exn t.ready with
  | Spawn (fiber, main) ->
      let current =
        (* The current handler must never propagate cancelation, but it would be
           possible to continue some other fiber and resume the current fiber
           later. *)
        Some (fun k -> Effect.Deep.continue k fiber)
      and yield =
        Some
          (fun k ->
            Queue.push t.ready (Continue (fiber, k));
            next t)
      and discontinue = Some (fun k -> Fiber.continue fiber k ()) in
      let[@alert "-handler"] effc (type a) :
          a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
        | Fiber.Current ->
            (* We handle [Current] first as it is perhaps the most latency
               sensitive effect. *)
            current
        | Fiber.Spawn r ->
            (* We check cancelation status once and then either perform the
               whole operation or discontinue the fiber. *)
            if Fiber.is_canceled fiber then discontinue
            else begin
              spawn t 0 r.forbid r.computation r.mains;
              continue
            end
        | Fiber.Yield -> yield
        | Computation.Cancel_after r ->
            (* We check cancelation status once and then either perform the
               whole operation or discontinue the fiber. *)
            if Fiber.is_canceled fiber then discontinue
            else begin
              Picos_select.cancel_after r.computation ~seconds:r.seconds
                r.exn_bt;
              continue
            end
        | Trigger.Await trigger ->
            Some
              (fun k ->
                if Fiber.try_suspend fiber trigger fiber k t.resume then next t
                else Fiber.resume fiber k)
        | _ -> None
      in
      Effect.Deep.match_with main () { retc = t.retc; exnc = raise; effc }
  | Continue (fiber, k) -> Fiber.continue fiber k ()
  | Resume (fiber, k) -> Fiber.resume fiber k
  | exception Queue.Empty ->
      if Atomic.get t.num_alive_fibers <> 0 then begin
        if Atomic.get t.needs_wakeup then begin
          Mutex.lock t.mutex;
          match
            if Atomic.get t.needs_wakeup then Condition.wait t.condition t.mutex
          with
          | () -> Mutex.unlock t.mutex
          | exception exn ->
              Mutex.unlock t.mutex;
              raise exn
        end
        else Atomic.set t.needs_wakeup true;
        next t
      end

let run ~forbid main =
  let ready = Queue.create ()
  and needs_wakeup = Atomic.make false
  and num_alive_fibers = Atomic.make 1
  and mutex = Mutex.create ()
  and condition = Condition.create () in
  let rec t =
    { ready; needs_wakeup; num_alive_fibers; mutex; condition; resume; retc }
  and retc () =
    Atomic.decr t.num_alive_fibers;
    next t
  and resume trigger fiber k =
    let resume = Resume (fiber, k) in
    if Fiber.unsuspend fiber trigger then
      (* The fiber has not been canceled, so we queue the fiber normally. *)
      Queue.push t.ready resume
    else
      (* The fiber has been canceled, so we give priority to it in this
         scheduler. *)
      Queue.push_head t.ready resume;
    (* As the trigger might have been signaled from another domain or systhread
       outside of the scheduler, we check whether the scheduler needs to be
       woken up and take care of it if necessary. *)
    if
      Atomic.get t.needs_wakeup
      && Atomic.compare_and_set t.needs_wakeup true false
    then begin
      Mutex.lock t.mutex;
      Mutex.unlock t.mutex;
      Condition.broadcast t.condition
    end
  in
  let computation = Computation.create () in
  let fiber = Fiber.create ~forbid computation in
  let main = Computation.capture computation main in
  Queue.push t.ready (Spawn (fiber, main));
  next t;
  Computation.await computation
OCaml

Innovation. Community. Security.