package oraft

  1. Overview
  2. Docs

Source file oraft.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
open Lwt
open Base
open State

type leader_node = { host : string; port : int }

type current_state = { mode : mode; term : int; leader : leader_node option }

type t = {
  conf : Conf.t;
  process : unit Lwt.t;
  post_command : string -> bool Lwt.t;
  current_state : unit -> current_state;
}

let state (conf : Conf.t) =
  {
    persistent_state = PersistentState.load ~state_dir:conf.state_dir;
    persistent_log = PersistentLog.load ~state_dir:conf.state_dir;
    volatile_state = VolatileState.create ();
  }


let process ~conf ~logger ~apply_log ~state ~state_exec : unit Lwt.t =
  let rec loop state_exec =
    state_exec () >>= fun next ->
    let next_state_exec =
      VolatileState.update_mode state.volatile_state ~logger next;
      match next with
      | FOLLOWER -> Follower.run (Follower.init ~conf ~apply_log ~state)
      | CANDIDATE -> Candidate.run (Candidate.init ~conf ~apply_log ~state)
      | LEADER -> Leader.run (Leader.init ~conf ~apply_log ~state)
    in
    loop next_state_exec
  in
  loop state_exec


let post_command ~(conf : Conf.t) ~logger ~state s =
  let request_json =
    let r : Params.client_command_request = { data = s } in
    Params.client_command_request_to_yojson r
  in
  let request =
    Request_sender.post ~node_id:conf.node_id ~logger ~url_path:"client_command"
      ~request_json
      (* Afford to allow a connection timeout to unavailable server *)
      ~timeout_millis:(conf.request_timeout_millis * 2)
      ~converter:(fun response_json ->
        match Params.client_command_response_of_yojson response_json with
        | Ok param -> Ok (Params.CLIENT_COMMAND_RESPONSE param)
        | Error _ as err -> err)
  in
  match VolatileState.leader_id state.volatile_state with
  | Some node_id ->
      let current_leader_node = Conf.peer_node conf ~node_id in
      request current_leader_node >>= fun result ->
      Logger.debug logger
        (Printf.sprintf "Sending command to node(%d) : %s" node_id s);
      Lwt.return
        ( match result with
        | Some (Params.CLIENT_COMMAND_RESPONSE x) -> x.success
        | Some _ ->
            Logger.error logger "Shouldn't reach here";
            false
        | None -> false
        )
  | None -> Lwt.return false


let start ~conf_file ~apply_log =
  let conf = Conf.from_file conf_file in
  let state = state conf in
  let logger =
    Logger.create ~node_id:conf.node_id ~mode:None ~output_path:conf.log_file
      ~level:conf.log_level
  in
  let post_command = post_command ~conf ~logger ~state in
  let initial_state_exec =
    Follower.run (Follower.init ~conf ~apply_log ~state)
  in
  {
    conf;
    process =
      process ~conf ~logger ~apply_log ~state ~state_exec:initial_state_exec;
    post_command;
    current_state =
      (fun () ->
        let mode = VolatileState.mode state.volatile_state in
        let term = PersistentState.current_term state.persistent_state in
        let leader =
          match VolatileState.leader_id state.volatile_state with
          | Some x ->
              let leader = Conf.peer_node conf ~node_id:x in
              Some { host = leader.host; port = leader.app_port }
          | None -> None
        in
        { mode; term; leader });
  }
OCaml

Innovation. Community. Security.