Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
append_entries_handler.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
open Core open Cohttp_lwt_unix open Yojson.Basic open State (* Invoked by leader to replicate log entries ($B!x(B5.3); also used as * heartbeat ($B!x(B5.2). * * Receiver implementation: * 1. Reply false if term < currentTerm ($B!x(B5.1) * 2. Reply false if log doesn$B!G(Bt contain an entry at prevLogIndex * whose term matches prevLogTerm ($B!x(B5.3) * 3. If an existing entry conflicts with a new one (same index * but different terms), delete the existing entry and all that * follow it ($B!x(B5.3) * 4. Append any new entries not already in the log * 5. If leaderCommit > commitIndex, set commitIndex = * min(leaderCommit, index of last new entry) *) let append_entries ~(conf : Conf.t) ~logger ~state ~(param : Params.append_entries_request) ~(apply_log : Base.apply_log) ~cb_newer_term ~handle_same_term_as_newer = VolatileState.update_leader_id state.volatile_state ~logger param.leader_id; let persistent_state = state.persistent_state in let persistent_log = state.persistent_log in let volatile_state = state.volatile_state in if PersistentState.detect_newer_term state.persistent_state ~logger ~other_term:param.term then cb_newer_term () else if handle_same_term_as_newer && PersistentState.detect_same_term state.persistent_state ~logger ~other_term:param.term then cb_newer_term () ; (* If leaderCommit > commitIndex, * set commitIndex = min(leaderCommit, index of last new entry) *) if VolatileState.detect_higher_commit_index volatile_state ~logger ~other:param.leader_commit then VolatileState.update_commit_index volatile_state (min param.leader_commit (PersistentLog.last_index persistent_log)); if List.length param.entries > 0 then ( Logger.debug logger (Printf.sprintf "This param isn't empty, so appending entries(lentgh: %d)" (List.length param.entries)); (* If an existing entry conflicts with a new one (same index * but different terms), delete the existing entry and all that * follow it ($B!x(B5.3) * * Append any new entries not already in the log *) PersistentLog.append persistent_log ~term:(PersistentState.current_term persistent_state) ~start:(param.prev_log_index + 1) ~entries:param.entries ); (* All Servers: * - If commitIndex > lastApplied: increment lastApplied, apply * log[lastApplied] to state machine ($B!x(B5.3) *) VolatileState.apply_logs volatile_state ~logger ~f:(fun i -> let log = PersistentLog.get_exn persistent_log i in apply_log ~node_id:conf.node_id ~log_index:log.index ~log_data:log.data) let handle ~conf ~state ~logger ~apply_log ~cb_valid_request ~cb_newer_term ~handle_same_term_as_newer ~(param : Params.append_entries_request) = let persistent_state = state.persistent_state in let persistent_log = state.persistent_log in let stored_prev_log = PersistentLog.get persistent_log param.prev_log_index in cb_valid_request (); let result = if PersistentState.detect_old_leader persistent_state ~logger ~other_term:param.term (* Reply false if term < currentTerm ($B!x(B5.1) *) then false else if (not (param.prev_log_term = -1 && param.prev_log_index = 0)) && match stored_prev_log with | Some l -> l.term <> param.prev_log_term | None -> true then ( let entries_size = List.length param.entries in (* Reply false if log doesn$B!G(Bt contain an entry at prevLogIndex whose term matches prevLogTerm ($B!x(B5.3) *) Logger.warn logger (Printf.sprintf "Received a request that doesn't meet requirement.\nparam:{\n term:%d, leader_id:%d, prev_log_term:%d, prev_log_index:%d, entries_size:%d, leader_commit:%d,\n first_entry:%s,\n last_entry:%s\n},\nstate:%s" param.term param.leader_id param.prev_log_term param.prev_log_index entries_size param.leader_commit (PersistentLogEntry.show (List.nth_exn param.entries 0)) (PersistentLogEntry.show (List.nth_exn param.entries (entries_size - 1))) (PersistentLog.show persistent_log)); false ) else ( append_entries ~conf ~logger ~state ~param ~apply_log ~cb_newer_term ~handle_same_term_as_newer; State.log state ~logger; true ) in let response_body = `Assoc [ ("term", `Int (PersistentState.current_term persistent_state)); ("success", `Bool result); ] |> to_string in Server.respond_string ~status:`OK ~body:response_body ()