package oraft

  1. Overview
  2. Docs

Source file append_entries_handler.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
open Core
open Cohttp_lwt_unix
open Yojson.Basic
open State

(* Invoked by leader to replicate log entries ($B!x(B5.3); also used as
 * heartbeat ($B!x(B5.2).
 *
 * Receiver implementation:
 * 1. Reply false if term < currentTerm ($B!x(B5.1)
 * 2. Reply false if log doesn$B!G(Bt contain an entry at prevLogIndex
 *     whose term matches prevLogTerm ($B!x(B5.3)
 * 3. If an existing entry conflicts with a new one (same index
 *    but different terms), delete the existing entry and all that
 *    follow it ($B!x(B5.3)
 * 4. Append any new entries not already in the log
 * 5. If leaderCommit > commitIndex, set commitIndex =
 *    min(leaderCommit, index of last new entry)
 *)

let append_entries ~(conf : Conf.t) ~logger ~state
    ~(param : Params.append_entries_request) ~(apply_log : Base.apply_log)
    ~cb_newer_term ~handle_same_term_as_newer =
  VolatileState.update_leader_id state.volatile_state ~logger param.leader_id;
  let persistent_state = state.persistent_state in
  let persistent_log = state.persistent_log in
  let volatile_state = state.volatile_state in
  if PersistentState.detect_newer_term state.persistent_state ~logger
       ~other_term:param.term
  then cb_newer_term ()
  else if handle_same_term_as_newer &&
    PersistentState.detect_same_term state.persistent_state ~logger
        ~other_term:param.term
  then cb_newer_term ()
  ;

  (* If leaderCommit > commitIndex,
   * set commitIndex = min(leaderCommit, index of last new entry) *)
  if VolatileState.detect_higher_commit_index volatile_state ~logger
       ~other:param.leader_commit
  then
    VolatileState.update_commit_index volatile_state
      (min param.leader_commit (PersistentLog.last_index persistent_log));
  if List.length param.entries > 0
  then (
    Logger.debug logger
      (Printf.sprintf "This param isn't empty, so appending entries(lentgh: %d)"
         (List.length param.entries));
    (* If an existing entry conflicts with a new one (same index
     *  but different terms), delete the existing entry and all that
     *  follow it ($B!x(B5.3)
     *
     * Append any new entries not already in the log *)
    PersistentLog.append persistent_log
      ~term:(PersistentState.current_term persistent_state)
      ~start:(param.prev_log_index + 1) ~entries:param.entries
  );
  (* All Servers:
   * - If commitIndex > lastApplied: increment lastApplied, apply
   *   log[lastApplied] to state machine ($B!x(B5.3)
   *)
  VolatileState.apply_logs volatile_state ~logger ~f:(fun i ->
      let log = PersistentLog.get_exn persistent_log i in
      apply_log ~node_id:conf.node_id ~log_index:log.index ~log_data:log.data)


let handle ~conf ~state ~logger ~apply_log ~cb_valid_request
    ~cb_newer_term ~handle_same_term_as_newer
    ~(param : Params.append_entries_request) =
  let persistent_state = state.persistent_state in
  let persistent_log = state.persistent_log in
  let stored_prev_log = PersistentLog.get persistent_log param.prev_log_index in
  cb_valid_request ();
  let result =
    if PersistentState.detect_old_leader persistent_state ~logger
         ~other_term:param.term
       (* Reply false if term < currentTerm ($B!x(B5.1) *)
    then false
    else if (not (param.prev_log_term = -1 && param.prev_log_index = 0))
            &&
            match stored_prev_log with
            | Some l -> l.term <> param.prev_log_term
            | None -> true
    then (
      let entries_size = List.length param.entries in
      (* Reply false if log doesn$B!G(Bt contain an entry at prevLogIndex whose term matches prevLogTerm ($B!x(B5.3) *)
      Logger.warn logger
        (Printf.sprintf
          "Received a request that doesn't meet requirement.\nparam:{\n  term:%d, leader_id:%d, prev_log_term:%d, prev_log_index:%d, entries_size:%d, leader_commit:%d,\n  first_entry:%s,\n  last_entry:%s\n},\nstate:%s"
          param.term param.leader_id param.prev_log_term param.prev_log_index
          entries_size
          param.leader_commit
          (PersistentLogEntry.show (List.nth_exn param.entries 0))
          (PersistentLogEntry.show (List.nth_exn param.entries (entries_size - 1)))
          (PersistentLog.show persistent_log));
      false
    )
    else (
      append_entries ~conf ~logger ~state ~param ~apply_log ~cb_newer_term ~handle_same_term_as_newer;
      State.log state ~logger;
      true
    )
  in
  let response_body =
    `Assoc
      [
        ("term", `Int (PersistentState.current_term persistent_state));
        ("success", `Bool result);
      ]
    |> to_string
  in
  Server.respond_string ~status:`OK ~body:response_body ()
OCaml

Innovation. Community. Security.