From 65b09482606f309fa9c2ae35eda80f4641b74408 Mon Sep 17 00:00:00 2001 From: Willfrit Date: Thu, 23 May 2019 22:01:49 +0200 Subject: [PATCH] continue new raft --- app_test/raft.lua | 57 ++++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/app_test/raft.lua b/app_test/raft.lua index 26ad760..4d17dcc 100644 --- a/app_test/raft.lua +++ b/app_test/raft.lua @@ -22,8 +22,8 @@ function state_leader_init() match_index_tmp[i] = 0 end return { - next_index = {}, -- Array - match_index = {} -- Array + next_index = next_index_tmp, -- Array + match_index = match_index_tmp -- Array } end @@ -47,7 +47,7 @@ heart_time = nil local pers_file = io.open("pers.json", "r") if pers_file ~= nil then persistent_state = json.decode(pers_file:read("*a")) - pers_file.close() + pers_file:close() end volatile_state_leader = state_leader_init() @@ -56,26 +56,29 @@ volatile_state_leader = state_leader_init() function save_persistent_state() pers_file = io.open("pers.json", "w+") pers_file:write(json.encode(persistent_state)) - pers_file.close() + pers_file:close() end function send_vote_request(node) + print("Send request to "..json.encode(node)) + last_log_index = #persistent_state.log last_log_term = 0 if last_log_index > 0 then last_log_term = persistent_state.log[#persistent_state.log].term end local term, vote_granted = urpc.call(node, { - "request_vote", persistent_state.current_term, job.position, last_log_index, last_log_term + "request_vote", persistent_state.current_term, job.position, last_log_index, last_log_term }, rpc_timeout) - if term == nil then + if term == nil then -- Timeout occur retry term, vote_granted = send_vote_request(node) - end -- Timeout occur retry + end return term, vote_granted end function send_append_entry(node_index, node, entry) - local next_index = volatile_state_leader.next_index[i] + print("Send append entry to "..json.encode(node).." my volatile leader state "..json.encode(volatile_state_leader)) + local next_index = volatile_state_leader.next_index[node_index] local prev_log_index = next_index - 1 local prev_log_term = 0 if #persistent_state.log > 1 then @@ -91,6 +94,7 @@ function send_append_entry(node_index, node, entry) end function heartbeat() + -- CRASH POINT 1 2 3 : RECOVERY 0.5 : AFTER 3 for i, n in pairs(job.nodes) do if i ~= job.position then events.thread(function () send_append_entry(i, n, nil) end) @@ -99,6 +103,8 @@ function heartbeat() end function become_leader() + -- cancelled timout election + election_time = misc.time() heartbeat() events.periodic(heartbeat_timeout, function() heartbeat() end) -- No client simulation for now @@ -108,26 +114,33 @@ end function append_entry(term, leader_id, prev_log_index, prev_log_term, entry, leader_commit) print("APPEND ENTRY FROM "..leader_id.." Term : "..term.." Entry : "..json.encode(entry)) set_election_timeout() - local success = false - if prev_log_index > 1 and then - + -- HEARTBEAT + if entry == nil then + return persistent_state.current_term, true end - + local success = false if success then save_persistent_state() -- Save persistant state end return persistent_state.current_term, success end -function request_vote(term, candidate_id, prev_log_index, prev_log_term) +function request_vote(term, candidate_id, last_log_index, last_log_term) print("REQUEST VOTE FROM "..candidate_id.." Term : "..term) - set_election_timeout() - local vote_granted = nil - - if success then + + if term < persistent_state.current_term then + return persistent_state.current_term, false + end + local vote_granted = false + if ((persistent_state.voted_for == nil or persistent_state.voted_for == candidate_id) + and last_log_index >= #persistent_state.log) or term > persistent_state.current_term then + persistent_state.voted_for = candidate_id save_persistent_state() -- Save persistant state + + vote_granted = true + set_election_timeout() end - return persistent_state.current_term, persistent_state.voted_for + return persistent_state.current_term, vote_granted end -- Timeout functions @@ -143,15 +156,17 @@ end -- Trigger functions function trigger_election_timeout() + print("Election trigger") persistent_state.current_term = persistent_state.current_term + 1 persistent_state.voted_for = job.position save_persistent_state() set_election_timeout() - local nb_vote = 0 + local nb_vote = 1 for i, n in pairs(job.nodes) do if i ~= job.position then events.thread(function () - local term, vote_granted = send_vote_request(node) + local term, vote_granted = send_vote_request(n) + print("vote request result "..term.." : "..json.encode(vote_granted).." from "..json.encode(n)) if vote_granted == true then nb_vote = nb_vote + 1 if nb_vote > majority_threshold then -- Become the leader @@ -171,6 +186,8 @@ events.run(function() -- Election manage set_election_timeout() + + events.sleep(10) events.exit() end)