package riot
An actor-model multi-core scheduler for OCaml 5
Install
Dune Dependency
Authors
Maintainers
Sources
riot-0.0.5.tbz
sha256=01b7b82ccc656b12b7315960d9df17eb4682b8f1af68e9fee33171fee1f9cf88
sha512=d8831d8a75fe43a7e8d16d2c0bb7d27f6d975133e17c5dd89ef7e575039c59d27c1ab74fbadcca81ddfbc0c74d1e46c35baba35ef825b36ac6c4e49d7a41d0c2
doc/src/riot.core/process.ml.html
Source file process.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
open Util type exit_reason = | Normal | Exit_signal | Bad_link | Link_down of Pid.t | Exception of exn module Messages = struct type monitor = Process_down of Pid.t type Message.t += Monitor of monitor | Exit of Pid.t * exit_reason end type state = | Runnable | Waiting_message | Waiting_io of { syscall : string; mode : [ `r | `rw | `w ]; fd : Fd.t } | Running | Exited of exit_reason | Finalized type process_flags = { trap_exits : bool Atomic.t } type process_flag = Trap_exit of bool let default_flags () = { trap_exits = Atomic.make false } type t = { pid : Pid.t; sid : Scheduler_uid.t; flags : process_flags; state : state Atomic.t; mutable cont : exit_reason Proc_state.t; mailbox : Mailbox.t; save_queue : Mailbox.t; mutable read_save_queue : bool; (** the save queue is a temporary queue used for storing messages during a selective receive *) links : Pid.t list Atomic.t; monitors : Pid.t list Atomic.t; } (** The process descriptor. *) let make sid fn = let cont = Proc_state.make fn Proc_effect.Yield in let pid = Pid.next () in Log.debug (fun f -> f "Making process with pid: %a" Pid.pp pid); let proc = { pid; sid; cont; state = Atomic.make Runnable; links = Atomic.make []; monitors = Atomic.make []; mailbox = Mailbox.create (); save_queue = Mailbox.create (); read_save_queue = false; flags = default_flags (); } in proc let rec pp ppf t = Format.fprintf ppf "Process %a { state = %a; messages = %d; flags = %a }" Pid.pp t.pid pp_state (Atomic.get t.state) (Mailbox.size t.save_queue + Mailbox.size t.mailbox) pp_flags t.flags and pp_state ppf (state : state) = match state with | Runnable -> Format.fprintf ppf "Runnable" | Waiting_message -> Format.fprintf ppf "Waiting_message" | Waiting_io { syscall; mode; fd } -> let mode = match mode with `r -> "r" | `w -> "w" | `rw -> "rw" in Format.fprintf ppf "Waiting_io(%s,%s,%a)" syscall mode Fd.pp fd | Running -> Format.fprintf ppf "Running" | Exited e -> Format.fprintf ppf "Exited(%a)" pp_reason e | Finalized -> Format.fprintf ppf "Finalized" and pp_reason ppf (t : exit_reason) = match t with | Normal -> Format.fprintf ppf "Normal" | Link_down pid -> Format.fprintf ppf "Link_down(%a)" Pid.pp pid | Exit_signal -> Format.fprintf ppf "Exit_signal" | Bad_link -> Format.fprintf ppf "Bad_link" | Exception exn -> Format.fprintf ppf "Exception: %s" (Printexc.to_string exn) and pp_flags ppf (t : process_flags) = Format.fprintf ppf "{ trap_exits=%b }" (Atomic.get t.trap_exits) let cont t = t.cont let pid { pid; _ } = pid let sid { sid; _ } = sid let state t = Atomic.get t.state let monitors t = Atomic.get t.monitors let links t = Atomic.get t.links let is_alive t = match Atomic.get t.state with | Runnable | Waiting_message | Waiting_io _ | Running -> true | Finalized | Exited _ -> false let is_exited t = match Atomic.get t.state with Finalized | Exited _ -> true | _ -> false let is_waiting t = match Atomic.get t.state with | Waiting_io _ | Waiting_message -> true | _ -> false let is_waiting_io t = match Atomic.get t.state with Waiting_io _ -> true | _ -> false let is_runnable t = Atomic.get t.state = Runnable let is_running t = Atomic.get t.state = Running let is_finalized t = Atomic.get t.state = Finalized let has_empty_mailbox t = Mailbox.is_empty t.save_queue && Mailbox.is_empty t.mailbox let has_messages t = not (has_empty_mailbox t) let message_count t = Mailbox.size t.mailbox + Mailbox.size t.save_queue let should_awake t = is_alive t && has_messages t exception Process_reviving_is_forbidden of t let rec mark_as_awaiting_io t syscall mode fd = if is_exited t then raise (Process_reviving_is_forbidden t); let old_state = Atomic.get t.state in if Atomic.compare_and_set t.state old_state (Waiting_io { syscall; mode; fd }) then Log.trace (fun f -> f "Process %a: marked as waiting for io" Pid.pp t.pid) else mark_as_awaiting_io t syscall mode fd let rec mark_as_awaiting_message t = if is_exited t then raise (Process_reviving_is_forbidden t); let old_state = Atomic.get t.state in if Atomic.compare_and_set t.state old_state Waiting_message then Log.trace (fun f -> f "Process %a: marked as waiting for message" Pid.pp t.pid) else mark_as_awaiting_message t let rec mark_as_running t = if is_exited t then raise (Process_reviving_is_forbidden t); let old_state = Atomic.get t.state in if Atomic.compare_and_set t.state old_state Running then Log.trace (fun f -> f "Process %a: marked as running" Pid.pp t.pid) else mark_as_running t let rec mark_as_runnable t = if is_exited t then raise (Process_reviving_is_forbidden t); let old_state = Atomic.get t.state in if Atomic.compare_and_set t.state old_state Runnable then Log.trace (fun f -> f "Process %a: marked as runnable" Pid.pp t.pid) else mark_as_runnable t let rec mark_as_exited t reason = if is_exited t then () else let old_state = Atomic.get t.state in if Atomic.compare_and_set t.state old_state (Exited reason) then Log.trace (fun f -> f "Process %a: marked as exited with reason %a" Pid.pp t.pid pp_reason reason) else mark_as_exited t reason let rec mark_as_finalized t = let old_state = Atomic.get t.state in if Atomic.compare_and_set t.state old_state Finalized then Log.trace (fun f -> f "Process %a: marked as finalized" Pid.pp t.pid) else mark_as_finalized t (** `set_flag` is only called by `Riot.process_flag` which runs only on the current process, which means we already have a lock on it. *) let rec set_flag t flag = match flag with | Trap_exit v -> let old_flag = Atomic.get t.flags.trap_exits in if Atomic.compare_and_set t.flags.trap_exits old_flag v then () else set_flag t flag let set_cont t c = t.cont <- c let rec add_link t link = let old_links = Atomic.get t.links in let new_links = link :: old_links in if Atomic.compare_and_set t.links old_links new_links then ( Log.trace (fun f -> f "Process %a: adding link to %a" Pid.pp t.pid Pid.pp link); ()) else add_link t link let rec add_monitor t monitor = let old_monitors = Atomic.get t.monitors in let new_monitors = monitor :: old_monitors in if Atomic.compare_and_set t.monitors old_monitors new_monitors then ( Log.trace (fun f -> f "Process %a: adding monitor to %a" Pid.pp t.pid Pid.pp monitor); ()) else add_monitor t monitor let next_message t = if t.read_save_queue then ( match Mailbox.next t.save_queue with | Some m -> Log.trace (fun f -> f "Process %a: found message in save queue" Pid.pp t.pid); Some m | None -> t.read_save_queue <- false; None) else match Mailbox.next t.mailbox with | Some m -> Log.trace (fun f -> f "Process %a: found message in mailbox" Pid.pp t.pid); Some m | None -> None let add_to_save_queue t msg = Mailbox.queue t.save_queue msg let read_save_queue t = t.read_save_queue <- true let send_message t msg = if is_alive t then ( let envelope = Message.envelope msg in Mailbox.queue t.mailbox envelope; if is_waiting t then mark_as_runnable t)
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>