From 224d36e1ed0794fdc0b1ddc6104e772e1e768a19 Mon Sep 17 00:00:00 2001 From: Willfrit Date: Tue, 4 Jun 2019 10:39:37 +0200 Subject: [PATCH] Adding info for animation raft election. --- app_test/raft_election_anim.lua | 17 +- app_test/raft_election_crash_anim.lua | 17 +- app_test/raft_election_fail_anim.lua | 257 ++++++++++++++++++++++++++ 3 files changed, 273 insertions(+), 18 deletions(-) create mode 100644 app_test/raft_election_fail_anim.lua diff --git a/app_test/raft_election_anim.lua b/app_test/raft_election_anim.lua index 901c531..681b40c 100644 --- a/app_test/raft_election_anim.lua +++ b/app_test/raft_election_anim.lua @@ -81,10 +81,10 @@ function uprc_call_timeout(node, data, timeout, node_index) local ok = false local term, res = nil, false local function call() - aSendData(node_index, "SEND VOTE REQUEST") + aSendData(node_index, "SEND "..data[1]) term, res = urpc.call(node, data, timeout*10) if term ~= nil then - aReceiveData(node_index, "RECEIVED APPEND ENTRY") + aReceiveData(node_index, "RECEIVED RESULT "..data[1].." - "..term.." "..json.encode(res)) end ok = true events.fire(name) @@ -150,7 +150,7 @@ end -- Append Entry RPC function used by leader for the heartbeat (avoiding new election) - entry == nil means heartbeat -- Also normally used for log replication (not present here) function append_entry(term, leader_id, entry) - aReceiveData(leader_id, " RCP APPEND ENTRY") + aReceiveData(leader_id, " RCP append_entry : "..json.encode(entry)) if term > persistent_state.current_term then stepdown(term) end @@ -161,22 +161,22 @@ function append_entry(term, leader_id, entry) -- HEARTBEAT if entry == nil then - aSendData(leader_id, "SEND RESULT APPEND ENTRY") + aSendData(leader_id, "SEND RESULT append_entry") return persistent_state.current_term, true else -- NORMAL Entry (Log replication feature - not present here) - aSendData(leader_id, "SEND RESULT APPEND ENTRY") + aSendData(leader_id, "SEND RESULT append_entry") return persistent_state.current_term, false end end -- Vote Request RPC function, called by candidate to get votes function request_vote(term, candidate_id) - aReceiveData(candidate_id, " RCP REQUEST VOTE") + aReceiveData(candidate_id, " RCP request_vote") -- It the candidate is late - don't grant the vote if term < persistent_state.current_term then - aSendData(candidate_id, "SEND RESULT REQUEST VOET") + aSendData(candidate_id, "SEND RESULT request_vote") return persistent_state.current_term, false elseif term > persistent_state.current_term then stepdown(term) @@ -194,7 +194,7 @@ function request_vote(term, candidate_id) vote_granted = true set_election_timeout() -- reset the election timeout end - aSendData(candidate_id, "SEND RESULT REQUEST VOET") + aSendData(candidate_id, "SEND RESULT request_vote") return persistent_state.current_term, vote_granted end @@ -212,7 +212,6 @@ end -- Trigger function function trigger_election_timeout() - print("Election Trigger -> term+1, become candidate") volatile_state.state = "candidate" aUpdateState() persistent_state.current_term = persistent_state.current_term + 1 diff --git a/app_test/raft_election_crash_anim.lua b/app_test/raft_election_crash_anim.lua index c9d0c99..24f44d2 100644 --- a/app_test/raft_election_crash_anim.lua +++ b/app_test/raft_election_crash_anim.lua @@ -81,10 +81,10 @@ function uprc_call_timeout(node, data, timeout, node_index) local ok = false local term, res = nil, false local function call() - aSendData(node_index, "SEND VOTE REQUEST") + aSendData(node_index, "SEND "..data[1]) term, res = urpc.call(node, data, timeout*10) if term ~= nil then - aReceiveData(node_index, "RECEIVED APPEND ENTRY") + aReceiveData(node_index, "RECEIVED RESULT "..data[1].." - "..term.." "..json.encode(res)) end ok = true events.fire(name) @@ -151,7 +151,7 @@ end -- Append Entry RPC function used by leader for the heartbeat (avoiding new election) - entry == nil means heartbeat -- Also normally used for log replication (not present here) function append_entry(term, leader_id, entry) - aReceiveData(leader_id, " RCP APPEND ENTRY") + aReceiveData(leader_id, " RCP append_entry : "..json.encode(entry)) if term > persistent_state.current_term then stepdown(term) end @@ -162,11 +162,11 @@ function append_entry(term, leader_id, entry) -- HEARTBEAT if entry == nil then - aSendData(leader_id, "SEND RESULT APPEND ENTRY") + aSendData(leader_id, "SEND RESULT append_entry") return persistent_state.current_term, true else -- NORMAL Entry (Log replication feature - not present here) - aSendData(leader_id, "SEND RESULT APPEND ENTRY") + aSendData(leader_id, "SEND RESULT append_entry") return persistent_state.current_term, false end end @@ -174,11 +174,11 @@ end -- Vote Request RPC function, called by candidate to get votes function request_vote(term, candidate_id) -- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : RANDOM 0.05 - aReceiveData(candidate_id, " RCP REQUEST VOTE") + aReceiveData(candidate_id, " RCP request_vote") -- It the candidate is late - don't grant the vote if term < persistent_state.current_term then - aSendData(candidate_id, "SEND RESULT REQUEST VOET") + aSendData(candidate_id, "SEND RESULT request_vote") return persistent_state.current_term, false elseif term > persistent_state.current_term then stepdown(term) @@ -196,7 +196,7 @@ function request_vote(term, candidate_id) vote_granted = true set_election_timeout() -- reset the election timeout end - aSendData(candidate_id, "SEND RESULT REQUEST VOET") + aSendData(candidate_id, "SEND RESULT request_vote") return persistent_state.current_term, vote_granted end @@ -214,7 +214,6 @@ end -- Trigger function function trigger_election_timeout() - print("Election Trigger -> term+1, become candidate") volatile_state.state = "candidate" aUpdateState() persistent_state.current_term = persistent_state.current_term + 1 diff --git a/app_test/raft_election_fail_anim.lua b/app_test/raft_election_fail_anim.lua new file mode 100644 index 0000000..c54bf6b --- /dev/null +++ b/app_test/raft_election_fail_anim.lua @@ -0,0 +1,257 @@ +--[[ +Raft implementation - Only the leader election sub-problem +Helped with https://web.stanford.edu/~ouster/cgi-bin/papers/raft-atc14 +--]] +require("splay.base") +local json = require("json") +local urpc = require("splay.urpc") + +print("ANIM START "..job.position) + +function aUpdateState() + print("ANIM STATE "..job.position.." : "..volatile_state.state.." : "..persistent_state.current_term) +end + +function aReceiveData(from, data) + print("ANIM RDATA "..job.position.." <- "..math.floor(from).." : uuid : "..data) +end + +function aSendData(to, data) + print("ANIM SDATA "..job.position.." -> "..math.floor(to).." : uuid : "..data) +end + +majority_threshold = #job.nodes / 2 +thread_heartbeat = nil + +volatile_state = { + state = "follower", -- follower, candidate or leader + commit_index -- used for log replication +} + +-- Not persistent - buggy code +voted_for = nil + +-- Save in storage before rpc response (in file - it is a trick for persistency) +persistent_state = { + current_term = 0, + log = {} -- Array of {term = associated_term, data = } +} + +-- Timeout for each purpose in second +election_timeout = 1.5 -- random: [election_timeout, 2 * election_timeout] +rpc_timeout = 0.5 +heartbeat_timeout = 0.6 + +-- Timeout variable (to check if timeout has been canceled) +election_time = nil + +-- File to save the persistent state +filename_persistent = "pers"..job.ref..".json" +local pers_file = io.open(filename_persistent, "r") +if pers_file ~= nil then + persistent_state = json.decode(pers_file:read("*a")) + pers_file:close() +end + +-- Utils functions +function save_persistent_state() + pers_file = io.open(filename_persistent, "w+") + pers_file:write(json.encode(persistent_state)) + pers_file:close() +end + +-- If someone have a bigger term -> stepdown (request or response) +function stepdown(term) + print("Stepdown : "..term.." > "..persistent_state.current_term) + persistent_state.current_term = tonumber(term) + voted_for = nil + save_persistent_state() + volatile_state.state = "follower" + aUpdateState() + set_election_timeout() + + -- If I was leader but obviously not anymore - remove pediodic heartbeat + if thread_heartbeat ~= nil then + events.kill(thread_heartbeat) + end +end + +local inc = 0 +function uprc_call_timeout(node, data, timeout, node_index) + inc = inc + 1 -- Unique name for each rpc + local name = "urpc.call:"..inc + local ok = false + local term, res = nil, false + local function call() + aSendData(node_index, "SEND "..data[1]) + term, res = urpc.call(node, data, timeout*10) + if term ~= nil then + aReceiveData(node_index, "RECEIVED RESULT "..data[1].." - "..term.." "..json.encode(res)) + end + ok = true + events.fire(name) + end + local function timeout_rec() + events.thread(function() + events.sleep(timeout) + if (ok == false) then + call() + timeout_rec() + end + end) + end + call() + timeout_rec() + events.wait(name, timeout*10) -- After 9 retry stop anyway -> return nil, false + return term, res +end + +function send_vote_request(node, node_index) + local term, vote_granted = uprc_call_timeout(node, { + "request_vote", persistent_state.current_term, job.position + }, rpc_timeout, node_index) + + return term, vote_granted +end + +function send_append_entry(node_index, node, entry) + local term, success = uprc_call_timeout(node, { + "append_entry", persistent_state.current_term, job.position, entry + }, rpc_timeout, node_index) + + return term, success +end + +function heartbeat() + for i, n in pairs(job.nodes) do + if i ~= job.position then + events.thread(function () + term, success = send_append_entry(i, n, nil) + if term ~= nil and term > persistent_state.current_term then + stepdown(term) + end + end) + end + end +end + +function become_leader() + volatile_state.state = "leader" + aUpdateState() + -- cancelled timout election + election_time = misc.time() + -- trigger the heartbeart directly and periodically + heartbeat() + thread_heartbeat = events.periodic(heartbeat_timeout, function() heartbeat() end) + + -- No client simulation for now (because no replication log) +end + +-- RCP functions + +-- Append Entry RPC function used by leader for the heartbeat (avoiding new election) - entry == nil means heartbeat +-- Also normally used for log replication (not present here) +function append_entry(term, leader_id, entry) + aReceiveData(leader_id, " RCP append_entry : "..json.encode(entry)) + if term > persistent_state.current_term then + stepdown(term) + end + -- reset the election timeout (avoiding new election) + set_election_timeout() + volatile_state.state = "follower" -- if candidate, return in follower state + aUpdateState() + + -- HEARTBEAT + if entry == nil then + aSendData(leader_id, "SEND RESULT append_entry") + return persistent_state.current_term, true + else + -- NORMAL Entry (Log replication feature - not present here) + aSendData(leader_id, "SEND RESULT append_entry") + return persistent_state.current_term, false + end +end + +-- Vote Request RPC function, called by candidate to get votes +function request_vote(term, candidate_id) + aReceiveData(candidate_id, " RCP request_vote") + + -- It the candidate is late - don't grant the vote + if term < persistent_state.current_term then + aSendData(candidate_id, "SEND RESULT request_vote") + return persistent_state.current_term, false + elseif term > persistent_state.current_term then + stepdown(term) + end + + local vote_granted = false + + -- Condition to grant the vote : + -- (If the node doesn't already grant the vote to and other) and + -- (log of the candidate is updated - not usefull if only election) + if voted_for == nil or voted_for == candidate_id then + -- Save the candidate vote + voted_for = candidate_id + vote_granted = true + set_election_timeout() -- reset the election timeout + end + aSendData(candidate_id, "SEND RESULT request_vote") + return persistent_state.current_term, vote_granted +end + +-- Timeout function +function set_election_timeout() + election_time = misc.time() + local time = election_time + events.thread(function () + -- Randomize the sleeping time = [election_timeout, 2 * election_timeout] + events.sleep(((math.random() + 1.0) * election_timeout)) + -- if the timeout is not cancelled -> trigger election + if (time == election_time) then trigger_election_timeout() end + end) +end + +-- Trigger function +function trigger_election_timeout() + volatile_state.state = "candidate" + aUpdateState() + persistent_state.current_term = persistent_state.current_term + 1 + voted_for = job.position + save_persistent_state() + -- If conflict in this election, and new election can be trigger + set_election_timeout() + local nb_vote = 1 + for i, n in pairs(job.nodes) do + if i ~= job.position then + events.thread(function () + local term, vote_granted = send_vote_request(n, i) + if vote_granted == true then + nb_vote = nb_vote + 1 + -- If the majority grant the vote -> become the leader + if nb_vote > majority_threshold and volatile_state.state == "candidate" then + become_leader() + end + end + if term ~= nil and term > persistent_state.current_term then + stepdown(term) + end + end) + end + end +end + +-- UDP RPC server +urpc.server(job.me) + +-- Main +events.run(function() + + aUpdateState() + + -- Election manage + set_election_timeout() + + -- After 100 second the node will exit no matter what + events.sleep(100) + events.exit() +end) \ No newline at end of file