package riot

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file supervisor.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
open Runtime

type child_spec =
  | Child : {
      initial_state : 'state;
      start_link : 'state -> (Pid.t, [> `Exit of exn ]) result;
    }
      -> child_spec

let child_spec ~start_link initial_state = Child { start_link; initial_state }

type strategy = One_for_one | One_for_all | Rest_for_one | Simple_one_for_one
type timestamp = float

type state = {
  strategy : strategy;
  restart_limit : int;
  restart_period : int;
  child_specs : child_spec list;
  children : (Pid.t * child_spec) list;
  restarts : timestamp list;
}
[@@warning "-69"]

let init_child spec =
  let (Child { start_link; initial_state }) = spec in
  let pid = start_link initial_state |> Result.get_ok in
  Log.info (fun f ->
      let this = self () in
      f "Supervisor %a started child %a" Pid.pp this Pid.pp pid);
  (pid, spec)

let init_children state = List.map init_child state.child_specs

let add_restart state =
  let now = Unix.gettimeofday () in
  { state with restarts = now :: state.restarts }

let max_restarts_reached state =
  List.length state.restarts > state.restart_limit

let restart_child pid state =
  let state = add_restart state in
  if max_restarts_reached state then `terminate
  else (
    Log.info (fun f -> f "child %a is down" Pid.pp pid);
    let spec = List.assoc pid state.children in
    let children = init_child spec :: List.remove_assoc pid state.children in
    `continue { state with children })

type Message.t +=
  | List_children_req : { reply : Pid.t; ref : unit Ref.t } -> Message.t
  | List_children_res : { children : Pid.t list; ref : unit Ref.t } -> Message.t

let rec loop state =
  Log.debug (fun f -> f "supervisor loop");
  match receive () with
  | Process.Messages.Exit (pid, Normal) when List.mem_assoc pid state.children
    ->
      Log.info (fun f -> f "child %a stopped normally" Pid.pp pid);
      let state =
        { state with children = List.remove_assoc pid state.children }
      in
      loop state
  | Process.Messages.Exit (pid, reason) when List.mem_assoc pid state.children
    ->
      Log.info (fun f ->
          f "child %a stopped: %a" Pid.pp pid Process.pp_reason reason);
      handle_child_exit pid reason state
  | List_children_req { reply; ref } ->
      let children = List.map (fun (pid, _) -> pid) state.children in
      send reply (List_children_res { children; ref });
      loop state
  | _ -> loop state

and handle_child_exit pid _reason state =
  match restart_child pid state with
  | `continue state -> loop state
  | `terminate ->
      Log.info (fun f ->
          f "Supervisor %a reached max restarts of %d" Pid.pp (self ())
            state.restart_limit)

let start_supervisor state =
  Log.info (fun f ->
      f "Initializing supervisor %a with %d child specs" Pid.pp (self ())
        (List.length state.child_specs));
  process_flag (Trap_exit true);
  let state = { state with children = init_children state } in
  loop state

let start_link ?(strategy = One_for_one) ?(restart_limit = 1)
    ?(restart_period = 5) ~child_specs () =
  let state =
    {
      strategy;
      restart_limit;
      restart_period;
      child_specs;
      children = [];
      restarts = [];
    }
  in
  let sup_pid = spawn_link (fun () -> start_supervisor state) in
  Ok sup_pid

let children pid =
  let ref = Ref.make () in
  send pid (List_children_req { reply = self (); ref });
  let rec wait_response () =
    match receive ~ref () with
    | List_children_res { children; ref = ref' } when Ref.equal ref ref' ->
        children
    | _ -> wait_response ()
  in
  wait_response ()
OCaml

Innovation. Community. Security.