package picos_mux
Sample schedulers for Picos
Install
Dune Dependency
Authors
Maintainers
Sources
picos-0.5.0.tbz
sha256=862d61383e2df93a876bedcffb1fd1ddc0f96c50b0e9c07943a2aee1f0e182be
sha512=87805379017ef4a7f2c11b954625a3757a0f1431bb9ba59132202de278b3e41adbe0cdc20e3ab23b7c9a8c5a15faeb7ec79348e7d80f2b14274b00df0893b8c0
doc/src/picos_mux.fifo/picos_mux_fifo.ml.html
Source file picos_mux_fifo.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
open Picos let[@inline never] quota_non_positive () = invalid_arg "quota must be positive" (* As a minor optimization, we avoid allocating closures, which take slightly more memory than values of this type. *) type ready = | Spawn of Fiber.t * (Fiber.t -> unit) | Continue of Fiber.t * (unit, unit) Effect.Deep.continuation | Resume of Fiber.t * ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation | Return of Fiber.Maybe.t * (unit, unit) Effect.Deep.continuation module Mpscq = Picos_aux_mpscq type t = { ready : ready Mpscq.t; needs_wakeup : bool Atomic.t; num_alive_fibers : int Atomic.t; mutex : Mutex.t; condition : Condition.t; resume : Trigger.t -> Fiber.t -> ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation -> unit; current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option; yield : ((unit, unit) Effect.Deep.continuation -> unit) option; return : ((unit, unit) Effect.Deep.continuation -> unit) option; discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option; handler : (unit, unit) Effect.Deep.handler; quota : int; mutable fiber : Fiber.Maybe.t; mutable remaining_quota : int; } let rec next t = match Mpscq.pop_exn t.ready with | Spawn (fiber, main) -> t.fiber <- Fiber.Maybe.of_fiber fiber; t.remaining_quota <- t.quota; Effect.Deep.match_with main fiber t.handler | Return (fiber, k) -> t.fiber <- fiber; t.remaining_quota <- t.quota; Effect.Deep.continue k () | Continue (fiber, k) -> t.fiber <- Fiber.Maybe.of_fiber fiber; t.remaining_quota <- t.quota; Fiber.continue fiber k () | Resume (fiber, k) -> t.fiber <- Fiber.Maybe.of_fiber fiber; t.remaining_quota <- t.quota; Fiber.resume fiber k | exception Mpscq.Empty -> t.fiber <- Fiber.Maybe.nothing; if Atomic.get t.num_alive_fibers <> 0 then begin if Atomic.get t.needs_wakeup then begin Mutex.lock t.mutex; match if Atomic.get t.needs_wakeup then (* We assume that there is no poll point after the above [Mutex.lock] and before the below [Condition.wait] is ready to be woken up by a [Condition.broadcast]. *) Condition.wait t.condition t.mutex with | () -> Mutex.unlock t.mutex | exception exn -> Mutex.unlock t.mutex; raise exn end else Atomic.set t.needs_wakeup true; next t end let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = let quota = match quota with | None -> Int.max_int | Some quota -> if quota <= 0 then quota_non_positive (); quota in Select.check_configured (); let ready = Mpscq.create ~padded:true () and needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded and num_alive_fibers = Atomic.make 1 |> Multicore_magic.copy_as_padded and mutex = Mutex.create () and condition = Condition.create () in let rec t = { ready; fiber = Fiber.Maybe.nothing; needs_wakeup; num_alive_fibers; mutex; condition; resume; current; yield; return; discontinue; handler; quota; remaining_quota = quota; } and current = (* The current handler must never propagate cancelation, but it would be possible to continue some other fiber and resume the current fiber later. *) Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in Effect.Deep.continue k fiber) and yield = Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in Mpscq.push t.ready (Continue (fiber, k)); next t) and return = Some (fun k -> let remaining_quota = t.remaining_quota - 1 in if 0 < remaining_quota then begin t.remaining_quota <- remaining_quota; Effect.Deep.continue k () end else begin Mpscq.push t.ready (Return (t.fiber, k)); next t end) and discontinue = Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in Fiber.continue fiber k ()) and handler = { retc; exnc; effc } and[@alert "-handler"] effc : type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function | Fiber.Current -> (* We handle [Current] first as it is perhaps the most latency sensitive effect. *) t.current | Fiber.Spawn r -> (* We check cancelation status once and then either perform the whole operation or discontinue the fiber. *) let fiber = Fiber.Maybe.to_fiber t.fiber in if Fiber.is_canceled fiber then t.discontinue else begin Atomic.incr t.num_alive_fibers; Mpscq.push t.ready (Spawn (r.fiber, r.main)); t.return end | Fiber.Yield -> t.yield | Computation.Cancel_after r -> begin (* We check cancelation status once and then either perform the whole operation or discontinue the fiber. *) let fiber = Fiber.Maybe.to_fiber t.fiber in if Fiber.is_canceled fiber then t.discontinue else match Select.cancel_after r.computation ~seconds:r.seconds r.exn r.bt with | () -> t.return | exception exn -> let bt = Printexc.get_raw_backtrace () in Some (fun k -> Effect.Deep.discontinue_with_backtrace k exn bt) end | Trigger.Await trigger -> Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in if Fiber.try_suspend fiber trigger fiber k t.resume then next t else let remaining_quota = t.remaining_quota - 1 in if 0 < remaining_quota then begin t.remaining_quota <- remaining_quota; Fiber.resume fiber k end else begin Mpscq.push t.ready (Resume (fiber, k)); next t end) | _ -> None and retc () = Atomic.decr t.num_alive_fibers; next t and resume trigger fiber k = let resume = Resume (fiber, k) in if Fiber.unsuspend fiber trigger then (* The fiber has not been canceled, so we queue the fiber normally. *) Mpscq.push t.ready resume else (* The fiber has been canceled, so we give priority to it in this scheduler. *) Mpscq.push_head t.ready resume; (* As the trigger might have been signaled from another domain or systhread outside of the scheduler, we check whether the scheduler needs to be woken up and take care of it if necessary. *) if Atomic.get t.needs_wakeup && Atomic.compare_and_set t.needs_wakeup true false then begin begin match Mutex.lock t.mutex with | () -> Mutex.unlock t.mutex | exception Sys_error _ -> (* This should mean that [resume] was called from a signal handler running on the scheduler thread. If the assumption about not having poll points holds, the [Condition.broadcast] should now be able to wake up the [Condition.wait] in the scheduler. *) () end; Condition.broadcast t.condition end in Mpscq.push t.ready (Spawn (fiber, main)); next t let run ?quota ?fatal_exn_handler ?(forbid = false) main = let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in let main _ = Computation.capture computation main () in run_fiber ?quota ?fatal_exn_handler fiber main; Computation.await computation
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>