package riot
An actor-model multi-core scheduler for OCaml 5
Install
Dune Dependency
Authors
Maintainers
Sources
riot-0.0.5.tbz
sha256=01b7b82ccc656b12b7315960d9df17eb4682b8f1af68e9fee33171fee1f9cf88
sha512=d8831d8a75fe43a7e8d16d2c0bb7d27f6d975133e17c5dd89ef7e575039c59d27c1ab74fbadcca81ddfbc0c74d1e46c35baba35ef825b36ac6c4e49d7a41d0c2
doc/src/riot.scheduler/scheduler.ml.html
Source file scheduler.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
open Core open Util open Time open Net module Tracer = Tracer module Uid = Scheduler_uid type t = { uid : Uid.t; [@warning "-69"] rnd : Random.State.t; run_queue : Proc_queue.t; sleep_set : Proc_set.t; timers : Timer_wheel.t; io_tbl : Io.t; } type pool = { mutable stop : bool; schedulers : t list; processes : Proc_table.t; registry : Proc_registry.t; } module Scheduler = struct let make ~rnd () = let uid = Uid.next () in Log.debug (fun f -> f "Making scheduler with id: %a" Uid.pp uid); { uid; rnd = Random.State.copy rnd; run_queue = Proc_queue.create (); sleep_set = Proc_set.create (); io_tbl = Io.create (); timers = Timer_wheel.create (); } let get_current_scheduler, set_current_scheduler = Thread_local.make ~name:"CURRENT_SCHEDULER" let get_current_process_pid, set_current_process_pid = Thread_local.make ~name:"CURRENT_PID" let get_random_scheduler : pool -> t = fun { schedulers = all_schedulers; _ } -> let sch = get_current_scheduler () in let rnd_idx = Random.State.int sch.rnd (List.length all_schedulers) in List.nth all_schedulers rnd_idx let set_timer sch time mode fn = Timer_wheel.make_timer sch.timers time mode fn let add_to_run_queue sch (proc : Process.t) = Proc_set.remove sch.sleep_set proc; Proc_queue.queue sch.run_queue proc; Log.trace (fun f -> f "Adding process to run_queue queue[%d]: %a" (Proc_queue.size sch.run_queue) Pid.pp proc.pid) let awake_process pool (proc : Process.t) = List.iter (fun sch -> if Scheduler_uid.equal sch.uid proc.sid then add_to_run_queue sch proc) pool.schedulers let handle_receive k (proc : Process.t) (ref : unit Ref.t option) = let open Proc_state in Log.trace (fun f -> f "Process %a: receiving messages" Pid.pp (Process.pid proc)); if Process.has_empty_mailbox proc then ( Log.trace (fun f -> f "Process %a is awaiting for new messages" Pid.pp (Process.pid proc)); Process.mark_as_awaiting_message proc; k Suspend) else let fuel = Process.message_count proc in Log.trace (fun f -> f "Skimming mailbox with %d messages" fuel); let rec go fuel = if fuel = 0 then k Delay else match (ref, Process.next_message proc) with | _, None -> Log.trace (fun f -> f "Emptied the queue, will read from save queue next"); Process.read_save_queue proc; k Delay | Some ref, Some msg when Ref.is_newer ref msg.uid -> Log.trace (fun f -> f "Skipping msg ref=%a msg.uid=%a" Ref.pp ref Ref.pp msg.uid); Process.add_to_save_queue proc msg; go (fuel - 1) | _, Some Message.{ msg; _ } -> k (Continue msg) in go fuel let handle_syscall k sch (proc : Process.t) syscall mode fd = let open Proc_state in Log.trace (fun f -> let mode = match mode with `r -> "r" | `w -> "w" | `rw -> "rw" in f "Registering %a for Syscall(%s,%s,%a)" Pid.pp proc.pid syscall mode Fd.pp fd); Io.register sch.io_tbl proc mode fd; Process.mark_as_awaiting_io proc syscall mode fd; k Yield let perform sch (proc : Process.t) = let open Proc_state in let open Proc_effect in let perform : type a b. (a, b) step_callback = fun k eff -> match eff with | Syscall { name; mode; fd } -> handle_syscall k sch proc name mode fd | Receive { ref } -> handle_receive k proc ref | Yield -> Log.trace (fun f -> f "Process %a: yielding" Pid.pp (Process.pid proc)); k Yield | effect -> Log.trace (fun f -> f "Process %a: unhandled effect" Pid.pp (Process.pid proc)); k (Reperform effect) in { perform } let handle_wait_proc _pool sch proc = if Process.has_messages proc then ( Process.mark_as_runnable proc; Log.debug (fun f -> f "Waking up process %a" Pid.pp proc.pid); add_to_run_queue sch proc) else ( Proc_set.add sch.sleep_set proc; Log.debug (fun f -> f "Hibernated process %a" Pid.pp proc.pid); Log.trace (fun f -> f "sleep_set: %d" (Proc_set.size sch.sleep_set))) let handle_exit_proc pool sch proc reason = Io.unregister_process sch.io_tbl proc; Proc_registry.remove pool.registry (Process.pid proc); (* send monitors a process-down message *) let monitoring_pids = Process.monitors proc in Log.debug (fun f -> f "notifying %d monitors" (List.length monitoring_pids)); List.iter (fun mon_pid -> match Proc_table.get pool.processes mon_pid with | None -> () | Some mon_proc when Process.is_exited mon_proc -> Log.debug (fun f -> f "monitoring process %a is dead, nothing to do" Pid.pp mon_proc.pid) | Some mon_proc -> Log.debug (fun f -> f "notified %a of %a terminating" Pid.pp mon_pid Pid.pp proc.pid); let msg = Process.Messages.(Monitor (Process_down proc.pid)) in Process.send_message mon_proc msg; awake_process pool mon_proc) monitoring_pids; (* mark linked processes as dead *) let linked_pids = Process.links proc in Log.debug (fun f -> f "terminating %d processes linked to %a" (List.length linked_pids) Pid.pp proc.pid); List.iter (fun link_pid -> match Proc_table.get pool.processes link_pid with | None -> () | Some linked_proc when Atomic.get linked_proc.flags.trap_exits -> Log.debug (fun f -> f "%a will trap exits" Pid.pp linked_proc.pid); let msg = Process.Messages.(Exit (proc.pid, reason)) in Process.send_message linked_proc msg; awake_process pool linked_proc | Some linked_proc when Process.is_exited linked_proc -> Log.debug (fun f -> f "linked process %a is already dead, nothing to do" Pid.pp linked_proc.pid) | Some linked_proc -> let reason = Process.(Link_down proc.pid) in Process.mark_as_exited linked_proc reason; Log.debug (fun f -> f "marking linked %a as dead" Pid.pp linked_proc.pid); awake_process pool linked_proc) linked_pids let handle_run_proc _pool sch proc = Log.trace (fun f -> f "Running process %a" Process.pp proc); let exception Terminated_while_running of Process.exit_reason in try Process.mark_as_running proc; let perform = perform sch proc in let cont = Proc_state.run ~reductions:100 ~perform (Process.cont proc) in Process.set_cont proc cont; match cont with | Proc_state.Finished reason -> let reason = match reason with Ok reason -> reason | Error exn -> Exception exn in raise_notrace (Terminated_while_running reason) | _ when Process.is_waiting_io proc -> Log.trace (fun f -> f "Process %a hibernated (will resume): %a" Pid.pp proc.pid Process.pp proc) | Proc_state.Suspended _ | Proc_state.Unhandled _ -> Log.trace (fun f -> f "Process %a suspended (will resume): %a" Pid.pp proc.pid Process.pp proc); add_to_run_queue sch proc with | Process.Process_reviving_is_forbidden _ -> add_to_run_queue sch proc | Terminated_while_running reason -> Process.mark_as_exited proc reason; Log.trace (fun f -> f "Process %a finished" Pid.pp proc.pid); add_to_run_queue sch proc let step_process pool sch (proc : Process.t) = !Tracer.tracer_proc_run (sch.uid |> Scheduler_uid.to_int) proc; match Process.state proc with | Finalized -> failwith "finalized processes should never be stepped on" | Waiting_io _ -> () | Waiting_message -> handle_wait_proc pool sch proc | Exited reason -> handle_exit_proc pool sch proc reason | Running | Runnable -> handle_run_proc pool sch proc let poll_io pool (sch : t) = Io.poll sch.io_tbl @@ fun (proc, mode) -> Log.trace (fun f -> f "io_poll(%a): %a" Fd.Mode.pp mode Process.pp proc); match Process.state proc with | Waiting_io _ -> Process.mark_as_runnable proc; awake_process pool proc | _ -> () let tick_timers _pool (sch : t) = Timer_wheel.tick sch.timers let run pool sch () = Log.trace (fun f -> f "> enter worker loop"); let exception Exit in (try while true do if pool.stop then raise_notrace Exit; for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 10 do match Proc_queue.next sch.run_queue with | Some proc -> set_current_process_pid proc.pid; step_process pool sch proc | None -> () done; poll_io pool sch; tick_timers pool sch done with Exit -> ()); Log.trace (fun f -> f "< exit worker loop") end include Scheduler module Pool = struct let get_pool, set_pool = Thread_local.make ~name:"POOL" let shutdown pool = Log.trace (fun f -> f "shutdown called"); pool.stop <- true let register_process pool _scheduler proc = Proc_table.register_process pool.processes proc let setup () = (* NOTE(@leostera): we want the Net subsystem to be able to write to closed sockets and handle that as a regular value rather than as a signal. *) Sys.set_signal Sys.sigpipe Sys.Signal_ignore let make ?(rnd = Random.State.make_self_init ()) ~domains ~main () = setup (); Log.debug (fun f -> f "Making scheduler pool..."); let schedulers = List.init domains @@ fun _ -> Scheduler.make ~rnd () in let pool = { stop = false; schedulers = [ main ] @ schedulers; processes = Proc_table.create (); registry = Proc_registry.create (); } in let spawn scheduler = Stdlib.Domain.spawn (fun () -> set_pool pool; Scheduler.set_current_scheduler scheduler; try Scheduler.run pool scheduler (); Log.trace (fun f -> f "<<< shutting down scheduler #%a" Scheduler_uid.pp scheduler.uid) with exn -> Log.error (fun f -> f "Scheduler.run exception: %s due to: %s%!" (Printexc.to_string exn) (Printexc.raw_backtrace_to_string (Printexc.get_raw_backtrace ()))); shutdown pool) in Log.debug (fun f -> f "Created %d schedulers" (List.length schedulers)); (pool, List.map spawn schedulers) end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>