package riot

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file scheduler.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
open Core
open Util
open Time
open Net
module Tracer = Tracer
module Uid = Scheduler_uid

type t = {
  uid : Uid.t; [@warning "-69"]
  rnd : Random.State.t;
  run_queue : Proc_queue.t;
  sleep_set : Proc_set.t;
  timers : Timer_wheel.t;
  io_tbl : Io.t;
}

type pool = {
  mutable stop : bool;
  schedulers : t list;
  processes : Proc_table.t;
  registry : Proc_registry.t;
}

module Scheduler = struct
  let make ~rnd () =
    let uid = Uid.next () in
    Log.debug (fun f -> f "Making scheduler with id: %a" Uid.pp uid);
    {
      uid;
      rnd = Random.State.copy rnd;
      run_queue = Proc_queue.create ();
      sleep_set = Proc_set.create ();
      io_tbl = Io.create ();
      timers = Timer_wheel.create ();
    }

  let get_current_scheduler, set_current_scheduler =
    Thread_local.make ~name:"CURRENT_SCHEDULER"

  let get_current_process_pid, set_current_process_pid =
    Thread_local.make ~name:"CURRENT_PID"

  let get_random_scheduler : pool -> t =
   fun { schedulers = all_schedulers; _ } ->
    let sch = get_current_scheduler () in
    let rnd_idx = Random.State.int sch.rnd (List.length all_schedulers) in
    List.nth all_schedulers rnd_idx

  let set_timer sch time mode fn =
    Timer_wheel.make_timer sch.timers time mode fn

  let add_to_run_queue sch (proc : Process.t) =
    Proc_set.remove sch.sleep_set proc;
    Proc_queue.queue sch.run_queue proc;
    Log.trace (fun f ->
        f "Adding process to run_queue queue[%d]: %a"
          (Proc_queue.size sch.run_queue)
          Pid.pp proc.pid)

  let awake_process pool (proc : Process.t) =
    List.iter
      (fun sch ->
        if Scheduler_uid.equal sch.uid proc.sid then add_to_run_queue sch proc)
      pool.schedulers

  let handle_receive k (proc : Process.t) (ref : unit Ref.t option) =
    let open Proc_state in
    Log.trace (fun f ->
        f "Process %a: receiving messages" Pid.pp (Process.pid proc));
    if Process.has_empty_mailbox proc then (
      Log.trace (fun f ->
          f "Process %a is awaiting for new messages" Pid.pp (Process.pid proc));
      Process.mark_as_awaiting_message proc;
      k Suspend)
    else
      let fuel = Process.message_count proc in
      Log.trace (fun f -> f "Skimming mailbox with %d messages" fuel);
      let rec go fuel =
        if fuel = 0 then k Delay
        else
          match (ref, Process.next_message proc) with
          | _, None ->
              Log.trace (fun f ->
                  f "Emptied the queue, will read from save queue next");
              Process.read_save_queue proc;
              k Delay
          | Some ref, Some msg when Ref.is_newer ref msg.uid ->
              Log.trace (fun f ->
                  f "Skipping msg ref=%a msg.uid=%a" Ref.pp ref Ref.pp msg.uid);
              Process.add_to_save_queue proc msg;
              go (fuel - 1)
          | _, Some Message.{ msg; _ } -> k (Continue msg)
      in
      go fuel

  let handle_syscall k sch (proc : Process.t) syscall mode fd =
    let open Proc_state in
    Log.trace (fun f ->
        let mode = match mode with `r -> "r" | `w -> "w" | `rw -> "rw" in
        f "Registering %a for Syscall(%s,%s,%a)" Pid.pp proc.pid syscall mode
          Fd.pp fd);
    Io.register sch.io_tbl proc mode fd;
    Process.mark_as_awaiting_io proc syscall mode fd;
    k Yield

  let perform sch (proc : Process.t) =
    let open Proc_state in
    let open Proc_effect in
    let perform : type a b. (a, b) step_callback =
     fun k eff ->
      match eff with
      | Syscall { name; mode; fd } -> handle_syscall k sch proc name mode fd
      | Receive { ref } -> handle_receive k proc ref
      | Yield ->
          Log.trace (fun f ->
              f "Process %a: yielding" Pid.pp (Process.pid proc));
          k Yield
      | effect ->
          Log.trace (fun f ->
              f "Process %a: unhandled effect" Pid.pp (Process.pid proc));
          k (Reperform effect)
    in
    { perform }

  let handle_wait_proc _pool sch proc =
    if Process.has_messages proc then (
      Process.mark_as_runnable proc;
      Log.debug (fun f -> f "Waking up process %a" Pid.pp proc.pid);
      add_to_run_queue sch proc)
    else (
      Proc_set.add sch.sleep_set proc;
      Log.debug (fun f -> f "Hibernated process %a" Pid.pp proc.pid);
      Log.trace (fun f -> f "sleep_set: %d" (Proc_set.size sch.sleep_set)))

  let handle_exit_proc pool sch proc reason =
    Io.unregister_process sch.io_tbl proc;

    Proc_registry.remove pool.registry (Process.pid proc);

    (* send monitors a process-down message *)
    let monitoring_pids = Process.monitors proc in
    Log.debug (fun f -> f "notifying %d monitors" (List.length monitoring_pids));
    List.iter
      (fun mon_pid ->
        match Proc_table.get pool.processes mon_pid with
        | None -> ()
        | Some mon_proc when Process.is_exited mon_proc ->
            Log.debug (fun f ->
                f "monitoring process %a is dead, nothing to do" Pid.pp
                  mon_proc.pid)
        | Some mon_proc ->
            Log.debug (fun f ->
                f "notified %a of %a terminating" Pid.pp mon_pid Pid.pp proc.pid);
            let msg = Process.Messages.(Monitor (Process_down proc.pid)) in
            Process.send_message mon_proc msg;
            awake_process pool mon_proc)
      monitoring_pids;

    (* mark linked processes as dead *)
    let linked_pids = Process.links proc in
    Log.debug (fun f ->
        f "terminating %d processes linked to %a" (List.length linked_pids)
          Pid.pp proc.pid);
    List.iter
      (fun link_pid ->
        match Proc_table.get pool.processes link_pid with
        | None -> ()
        | Some linked_proc when Atomic.get linked_proc.flags.trap_exits ->
            Log.debug (fun f -> f "%a will trap exits" Pid.pp linked_proc.pid);
            let msg = Process.Messages.(Exit (proc.pid, reason)) in
            Process.send_message linked_proc msg;
            awake_process pool linked_proc
        | Some linked_proc when Process.is_exited linked_proc ->
            Log.debug (fun f ->
                f "linked process %a is already dead, nothing to do" Pid.pp
                  linked_proc.pid)
        | Some linked_proc ->
            let reason = Process.(Link_down proc.pid) in
            Process.mark_as_exited linked_proc reason;
            Log.debug (fun f ->
                f "marking linked %a as dead" Pid.pp linked_proc.pid);
            awake_process pool linked_proc)
      linked_pids

  let handle_run_proc _pool sch proc =
    Log.trace (fun f -> f "Running process %a" Process.pp proc);
    let exception Terminated_while_running of Process.exit_reason in
    try
      Process.mark_as_running proc;
      let perform = perform sch proc in
      let cont = Proc_state.run ~reductions:100 ~perform (Process.cont proc) in
      Process.set_cont proc cont;
      match cont with
      | Proc_state.Finished reason ->
          let reason =
            match reason with Ok reason -> reason | Error exn -> Exception exn
          in
          raise_notrace (Terminated_while_running reason)
      | _ when Process.is_waiting_io proc ->
          Log.trace (fun f ->
              f "Process %a hibernated (will resume): %a" Pid.pp proc.pid
                Process.pp proc)
      | Proc_state.Suspended _ | Proc_state.Unhandled _ ->
          Log.trace (fun f ->
              f "Process %a suspended (will resume): %a" Pid.pp proc.pid
                Process.pp proc);
          add_to_run_queue sch proc
    with
    | Process.Process_reviving_is_forbidden _ -> add_to_run_queue sch proc
    | Terminated_while_running reason ->
        Process.mark_as_exited proc reason;
        Log.trace (fun f -> f "Process %a finished" Pid.pp proc.pid);
        add_to_run_queue sch proc

  let step_process pool sch (proc : Process.t) =
    !Tracer.tracer_proc_run (sch.uid |> Scheduler_uid.to_int) proc;
    match Process.state proc with
    | Finalized -> failwith "finalized processes should never be stepped on"
    | Waiting_io _ -> ()
    | Waiting_message -> handle_wait_proc pool sch proc
    | Exited reason -> handle_exit_proc pool sch proc reason
    | Running | Runnable -> handle_run_proc pool sch proc

  let poll_io pool (sch : t) =
    Io.poll sch.io_tbl @@ fun (proc, mode) ->
    Log.trace (fun f -> f "io_poll(%a): %a" Fd.Mode.pp mode Process.pp proc);
    match Process.state proc with
    | Waiting_io _ ->
        Process.mark_as_runnable proc;
        awake_process pool proc
    | _ -> ()

  let tick_timers _pool (sch : t) = Timer_wheel.tick sch.timers

  let run pool sch () =
    Log.trace (fun f -> f "> enter worker loop");
    let exception Exit in
    (try
       while true do
         if pool.stop then raise_notrace Exit;

         for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 10 do
           match Proc_queue.next sch.run_queue with
           | Some proc ->
               set_current_process_pid proc.pid;
               step_process pool sch proc
           | None -> ()
         done;

         poll_io pool sch;
         tick_timers pool sch
       done
     with Exit -> ());
    Log.trace (fun f -> f "< exit worker loop")
end

include Scheduler

module Pool = struct
  let get_pool, set_pool = Thread_local.make ~name:"POOL"

  let shutdown pool =
    Log.trace (fun f -> f "shutdown called");
    pool.stop <- true

  let register_process pool _scheduler proc =
    Proc_table.register_process pool.processes proc

  let setup () =
    (* NOTE(@leostera): we want the Net subsystem to be able to write to closed
       sockets and handle that as a regular value rather than as a signal. *)
    Sys.set_signal Sys.sigpipe Sys.Signal_ignore

  let make ?(rnd = Random.State.make_self_init ()) ~domains ~main () =
    setup ();

    Log.debug (fun f -> f "Making scheduler pool...");
    let schedulers = List.init domains @@ fun _ -> Scheduler.make ~rnd () in
    let pool =
      {
        stop = false;
        schedulers = [ main ] @ schedulers;
        processes = Proc_table.create ();
        registry = Proc_registry.create ();
      }
    in
    let spawn scheduler =
      Stdlib.Domain.spawn (fun () ->
          set_pool pool;
          Scheduler.set_current_scheduler scheduler;
          try
            Scheduler.run pool scheduler ();
            Log.trace (fun f ->
                f "<<< shutting down scheduler #%a" Scheduler_uid.pp
                  scheduler.uid)
          with exn ->
            Log.error (fun f ->
                f "Scheduler.run exception: %s due to: %s%!"
                  (Printexc.to_string exn)
                  (Printexc.raw_backtrace_to_string
                     (Printexc.get_raw_backtrace ())));
            shutdown pool)
    in
    Log.debug (fun f -> f "Created %d schedulers" (List.length schedulers));
    (pool, List.map spawn schedulers)
end
OCaml

Innovation. Community. Security.