From fdd7ad41b84b51555eab05418ca7c1696ed3b789 Mon Sep 17 00:00:00 2001 From: Willfrit Date: Fri, 31 May 2019 14:00:58 +0200 Subject: [PATCH] Raft for repport --- app_test/raft_election_anim.lua | 52 +++--- app_test/raft_election_crash_anim.lua | 251 ++++++++++++++++++++++++++ topos_raft/raft_topo_fail.xml | 1 - 3 files changed, 274 insertions(+), 30 deletions(-) create mode 100644 app_test/raft_election_crash_anim.lua diff --git a/app_test/raft_election_anim.lua b/app_test/raft_election_anim.lua index 9a8b000..49d5550 100644 --- a/app_test/raft_election_anim.lua +++ b/app_test/raft_election_anim.lua @@ -21,6 +21,7 @@ function aSendData(to, data) end majority_threshold = #job.nodes / 2 +thread_heartbeat = nil volatile_state = { state = "follower", -- follower, candidate or leader @@ -66,6 +67,11 @@ function stepdown(term) volatile_state.state = "follower" aUpdateState() set_election_timeout() + + -- If I was leader but obviously not anymore - remove pediodic heartbeat + if thread_heartbeat ~= nil then + events.kill(thread_heartbeat) + end end function send_vote_request(node, node_index) @@ -79,7 +85,7 @@ function send_vote_request(node, node_index) end local term, vote_granted = urpc.call(node, { - "request_vote", persistent_state.current_term, job.position, last_log_index, last_log_term + "request_vote", persistent_state.current_term, job.position }, rpc_timeout) if term == nil then -- Timeout occur retry print("RPC VOTE REQUEST Timeout retried - resend") @@ -92,19 +98,11 @@ end function send_append_entry(node_index, node, entry) aSendData(node_index, "SEND APPEND ENTRY") - -- Used only for log replication - -- local next_index = volatile_state_leader.next_index[node_index] - local next_index = 1 -- No log replication then hardcoded - local prev_log_index = next_index - 1 - local prev_log_term = 0 - if #persistent_state.log > 1 then - local prev_log_term = persistent_state.log[next_index - 1].term - end - local term, success = urpc.call(node, { - "append_entry", persistent_state.current_term, job.position, prev_log_index, prev_log_term, entry, volatile_state.commit_index + "append_entry", persistent_state.current_term, job.position, entry }, rpc_timeout) if term == nil then -- Timeout + print("RPC append entry - Timeout retried - resend") term, success = send_append_entry(node_index, node, entry) end aReceiveData(node_index, "RECEIVED APPEND ENTRY") @@ -112,17 +110,14 @@ function send_append_entry(node_index, node, entry) end function heartbeat() - -- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : AFTER 2 - if volatile_state.state == "leader" then - for i, n in pairs(job.nodes) do - if i ~= job.position then - events.thread(function () - term, success = send_append_entry(i, n, nil) - if term > persistent_state.current_term then - stepdown(term) - end - end) - end + for i, n in pairs(job.nodes) do + if i ~= job.position then + events.thread(function () + term, success = send_append_entry(i, n, nil) + if term > persistent_state.current_term then + stepdown(term) + end + end) end end end @@ -134,7 +129,7 @@ function become_leader() election_time = misc.time() -- trigger the heartbeart directly and periodically heartbeat() - events.periodic(heartbeat_timeout, function() heartbeat() end) + thread_heartbeat = events.periodic(heartbeat_timeout, function() heartbeat() end) -- No client simulation for now (because no replication log) end @@ -143,7 +138,7 @@ end -- Append Entry RPC function used by leader for the heartbeat (avoiding new election) - entry == nil means heartbeat -- Also normally used for log replication (not present here) -function append_entry(term, leader_id, prev_log_index, prev_log_term, entry, leader_commit) +function append_entry(term, leader_id, entry) aReceiveData(leader_id, " RCP APPEND ENTRY") if term > persistent_state.current_term then stepdown(term) @@ -164,7 +159,7 @@ function append_entry(term, leader_id, prev_log_index, prev_log_term, entry, lea end -- Vote Request RPC function, called by candidate to get votes -function request_vote(term, candidate_id, last_log_index, last_log_term) +function request_vote(term, candidate_id) aReceiveData(candidate_id, " RCP REQUEST VOTE") -- It the candidate is late - don't grant the vote @@ -180,8 +175,7 @@ function request_vote(term, candidate_id, last_log_index, last_log_term) -- Condition to grant the vote : -- (If the node doesn't already grant the vote to and other) and -- (log of the candidate is updated - not usefull if only election) - if ((persistent_state.voted_for == nil or persistent_state.voted_for == candidate_id) - and last_log_index >= #persistent_state.log) then + if persistent_state.voted_for == nil or persistent_state.voted_for == candidate_id then -- Save the candidate vote persistent_state.voted_for = candidate_id save_persistent_state() @@ -206,7 +200,7 @@ end -- Trigger function function trigger_election_timeout() - print("Election Trigger - upgrade term and become candidate ") + print("Election Trigger -> term+1, become candidate") volatile_state.state = "candidate" aUpdateState() persistent_state.current_term = persistent_state.current_term + 1 @@ -219,7 +213,7 @@ function trigger_election_timeout() if i ~= job.position then events.thread(function () local term, vote_granted = send_vote_request(n, i) - print("Vote Request result "..term.." : "..json.encode(vote_granted).." from "..json.encode(i).." - "..json.encode(n)) + print("Vote Request result "..term.." : "..json.encode(vote_granted).." from "..i) if vote_granted == true then nb_vote = nb_vote + 1 -- If the majority grant the vote -> become the leader diff --git a/app_test/raft_election_crash_anim.lua b/app_test/raft_election_crash_anim.lua new file mode 100644 index 0000000..512de52 --- /dev/null +++ b/app_test/raft_election_crash_anim.lua @@ -0,0 +1,251 @@ +--[[ +Raft implementation - Only the leader election sub-problem +Helped with https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14 +--]] +require("splay.base") +local json = require("json") +local urpc = require("splay.urpc") + +print("ANIM START "..job.position) + +function aUpdateState() + print("ANIM STATE "..job.position.." : "..volatile_state.state.." : "..persistent_state.current_term) +end + +function aReceiveData(from, data) + print("ANIM RDATA "..job.position.." <- "..math.floor(from).." : uuid : "..data) +end + +function aSendData(to, data) + print("ANIM SDATA "..job.position.." -> "..math.floor(to).." : uuid : "..data) +end + +majority_threshold = #job.nodes / 2 +thread_heartbeat = nil + +volatile_state = { + state = "follower", -- follower, candidate or leader + commit_index -- used for log replication +} + +-- Save in storage before rpc response (in file - it is a trick for persistency) +persistent_state = { + current_term = 0, + voted_for = nil, + log = {} -- Array of {term = associated_term, data = } +} + +-- Timeout for each purpose in second +election_timeout = 1.5 -- random: [election_timeout, 2 * election_timeout] +rpc_timeout = 0.5 +heartbeat_timeout = 0.6 + +-- Timeout variable (to check if timeout has been canceled) +election_time = nil + +-- File to save the persistent state +filename_persistent = "pers"..job.ref..".json" +local pers_file = io.open(filename_persistent, "r") +if pers_file ~= nil then + persistent_state = json.decode(pers_file:read("*a")) + pers_file:close() +end + +-- Utils functions +function save_persistent_state() + pers_file = io.open(filename_persistent, "w+") + -- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : RANDOM 0.01 + pers_file:write(json.encode(persistent_state)) + pers_file:close() +end + +-- If someone have a bigger term -> stepdown (request or response) +function stepdown(term) + print("Stepdown : "..term.." > "..persistent_state.current_term) + persistent_state.current_term = tonumber(term) + persistent_state.voted_for = nil + save_persistent_state() + volatile_state.state = "follower" + aUpdateState() + set_election_timeout() + + -- If I was leader but obviously not anymore - remove pediodic heartbeat + if thread_heartbeat ~= nil then + events.kill(thread_heartbeat) + end +end + +function send_vote_request(node, node_index) + aSendData(node_index, "SEND VOTE REQUEST") + + -- Here not really used because only election + last_log_index = #persistent_state.log + last_log_term = 0 + if last_log_index > 0 then + last_log_term = persistent_state.log[#persistent_state.log].term + end + + local term, vote_granted = urpc.call(node, { + "request_vote", persistent_state.current_term, job.position + }, rpc_timeout) + if term == nil then -- Timeout occur retry + print("RPC VOTE REQUEST Timeout retried - resend") + term, vote_granted = send_vote_request(node, node_index) + end + aReceiveData(node_index, "RECEIVED VOTE REQUEST") + return term, vote_granted +end + +function send_append_entry(node_index, node, entry) + aSendData(node_index, "SEND APPEND ENTRY") + + local term, success = urpc.call(node, { + "append_entry", persistent_state.current_term, job.position, entry + }, rpc_timeout) + if term == nil then -- Timeout + print("RPC append entry - Timeout retried - resend") + term, success = send_append_entry(node_index, node, entry) + end + aReceiveData(node_index, "RECEIVED APPEND ENTRY") + return term, success +end + +function heartbeat() + -- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : AFTER 5 + for i, n in pairs(job.nodes) do + if i ~= job.position then + events.thread(function () + term, success = send_append_entry(i, n, nil) + if term > persistent_state.current_term then + stepdown(term) + end + end) + end + end +end + +function become_leader() + volatile_state.state = "leader" + aUpdateState() + -- cancelled timout election + election_time = misc.time() + -- trigger the heartbeart directly and periodically + heartbeat() + thread_heartbeat = events.periodic(heartbeat_timeout, function() heartbeat() end) + + -- No client simulation for now (because no replication log) +end + +-- RCP functions + +-- Append Entry RPC function used by leader for the heartbeat (avoiding new election) - entry == nil means heartbeat +-- Also normally used for log replication (not present here) +function append_entry(term, leader_id, entry) + aReceiveData(leader_id, " RCP APPEND ENTRY") + if term > persistent_state.current_term then + stepdown(term) + end + -- reset the election timeout (avoiding new election) + set_election_timeout() + volatile_state.state = "follower" -- if candidate, return in follower state + + -- HEARTBEAT + if entry == nil then + aSendData(leader_id, "SEND RESULT APPEND ENTRY") + return persistent_state.current_term, true + else + -- NORMAL Entry (Log replication feature - not present here) + aSendData(leader_id, "SEND RESULT APPEND ENTRY") + return persistent_state.current_term, false + end +end + +-- Vote Request RPC function, called by candidate to get votes +function request_vote(term, candidate_id) + -- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : RANDOM 0.05 + aReceiveData(candidate_id, " RCP REQUEST VOTE") + + -- It the candidate is late - don't grant the vote + if term < persistent_state.current_term then + aSendData(candidate_id, "SEND RESULT REQUEST VOET") + return persistent_state.current_term, false + elseif term > persistent_state.current_term then + stepdown(term) + end + + local vote_granted = false + + -- Condition to grant the vote : + -- (If the node doesn't already grant the vote to and other) and + -- (log of the candidate is updated - not usefull if only election) + if persistent_state.voted_for == nil or persistent_state.voted_for == candidate_id then + -- Save the candidate vote + persistent_state.voted_for = candidate_id + save_persistent_state() + vote_granted = true + set_election_timeout() -- reset the election timeout + end + aSendData(candidate_id, "SEND RESULT REQUEST VOET") + return persistent_state.current_term, vote_granted +end + +-- Timeout function +function set_election_timeout() + election_time = misc.time() + local time = election_time + events.thread(function () + -- Randomize the sleeping time = [election_timeout, 2 * election_timeout] + events.sleep(((math.random() + 1.0) * election_timeout)) + -- if the timeout is not cancelled -> trigger election + if (time == election_time) then trigger_election_timeout() end + end) +end + +-- Trigger function +function trigger_election_timeout() + print("Election Trigger -> term+1, become candidate") + volatile_state.state = "candidate" + aUpdateState() + persistent_state.current_term = persistent_state.current_term + 1 + persistent_state.voted_for = job.position + save_persistent_state() + -- If conflict in this election, and new election can be trigger + set_election_timeout() + local nb_vote = 1 + for i, n in pairs(job.nodes) do + if i ~= job.position then + events.thread(function () + local term, vote_granted = send_vote_request(n, i) + print("Vote Request result "..term.." : "..json.encode(vote_granted).." from "..i) + if vote_granted == true then + nb_vote = nb_vote + 1 + -- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : RANDOM 0.06 + + -- If the majority grant the vote -> become the leader + if nb_vote > majority_threshold and volatile_state.state == "candidate" then + become_leader() + end + end + if term > persistent_state.current_term then + stepdown(term) + end + end) + end + end +end + +-- UDP RPC server +urpc.server(job.me) + +-- Main +events.run(function() + + aUpdateState() + + -- Election manage + set_election_timeout() + + -- After 100 second the node will exit no matter what + events.sleep(100) + events.exit() +end) \ No newline at end of file diff --git a/topos_raft/raft_topo_fail.xml b/topos_raft/raft_topo_fail.xml index 5b8775f..2cd8d5b 100644 --- a/topos_raft/raft_topo_fail.xml +++ b/topos_raft/raft_topo_fail.xml @@ -1,6 +1,5 @@ -