Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
oraft.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
open Lwt open Base open State type leader_node = { host : string; port : int } type current_state = { mode : mode; term : int; leader : leader_node option } type t = { conf : Conf.t; process : unit Lwt.t; post_command : string -> bool Lwt.t; current_state : unit -> current_state; } let state (conf : Conf.t) = { persistent_state = PersistentState.load ~state_dir:conf.state_dir; persistent_log = PersistentLog.load ~state_dir:conf.state_dir; volatile_state = VolatileState.create (); } let process ~conf ~logger ~apply_log ~state ~state_exec : unit Lwt.t = let rec loop state_exec = state_exec () >>= fun next -> let next_state_exec = VolatileState.update_mode state.volatile_state ~logger next; match next with | FOLLOWER -> Follower.run (Follower.init ~conf ~apply_log ~state) | CANDIDATE -> Candidate.run (Candidate.init ~conf ~apply_log ~state) | LEADER -> Leader.run (Leader.init ~conf ~apply_log ~state) in loop next_state_exec in loop state_exec let post_command ~(conf : Conf.t) ~logger ~state s = let request_json = let r : Params.client_command_request = { data = s } in Params.client_command_request_to_yojson r in let request = Request_sender.post ~node_id:conf.node_id ~logger ~url_path:"client_command" ~request_json (* Afford to allow a connection timeout to unavailable server *) ~timeout_millis:(conf.request_timeout_millis * 2) ~converter:(fun response_json -> match Params.client_command_response_of_yojson response_json with | Ok param -> Ok (Params.CLIENT_COMMAND_RESPONSE param) | Error _ as err -> err) in match VolatileState.leader_id state.volatile_state with | Some node_id -> let current_leader_node = Conf.peer_node conf ~node_id in request current_leader_node >>= fun result -> Logger.debug logger (Printf.sprintf "Sending command to node(%d) : %s" node_id s); Lwt.return ( match result with | Some (Params.CLIENT_COMMAND_RESPONSE x) -> x.success | Some _ -> Logger.error logger "Shouldn't reach here"; false | None -> false ) | None -> Lwt.return false let start ~conf_file ~apply_log = let conf = Conf.from_file conf_file in let state = state conf in let logger = Logger.create ~node_id:conf.node_id ~mode:None ~output_path:conf.log_file ~level:conf.log_level in let post_command = post_command ~conf ~logger ~state in let initial_state_exec = Follower.run (Follower.init ~conf ~apply_log ~state) in { conf; process = process ~conf ~logger ~apply_log ~state ~state_exec:initial_state_exec; post_command; current_state = (fun () -> let mode = VolatileState.mode state.volatile_state in let term = PersistentState.current_term state.persistent_state in let leader = match VolatileState.leader_id state.volatile_state with | Some x -> let leader = Conf.peer_node conf ~node_id:x in Some { host = leader.host; port = leader.app_port } | None -> None in { mode; term; leader }); }