Skip to content

Commit

Permalink
Raft election, small change
Browse files Browse the repository at this point in the history
  • Loading branch information
Willfrit committed May 31, 2019
1 parent fdd7ad4 commit b6271fc
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 35 deletions.
45 changes: 32 additions & 13 deletions app_test/raft_election.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,38 +69,57 @@ function stepdown(term)
end
end

local inc = 0
function uprc_call_timeout(node, data, timeout)
inc = inc + 1 -- Unique name for each rpc
local name = "urpc.call:"..inc
local ok = false
local term, res = nil, false
local function call()
term, res = urpc.call(node, data, timeout*10)
ok = true
events.fire(name)
end
local function timeout_rec()
events.thread(function()
events.sleep(timeout)
if (ok == false) then
call()
timeout_rec()
end
end)
end
call()
timeout_rec()
events.wait(name, timeout*10) -- After 9 retry stop anyway -> return nil, false
return term, res
end

function send_vote_request(node, node_index)
print("Send vote request to node "..json.encode(node_index))

local term, vote_granted = urpc.call(node, {
local term, vote_granted = uprc_call_timeout(node, {
"request_vote", persistent_state.current_term, job.position
}, rpc_timeout)
if term == nil then -- Timeout occur retry
print("RPC vote request - Timeout retried - resend")
term, vote_granted = send_vote_request(node, node_index)
end
return term, vote_granted
end

function send_append_entry(node_index, node, entry)
print("Send append entry ("..json.encode(entry)..") to node "..node_index)

local term, success = urpc.call(node, {
local term, success = uprc_call_timeout(node, {
"append_entry", persistent_state.current_term, job.position, entry
}, rpc_timeout)
if term == nil then -- Timeout
print("RPC append entry - Timeout retried - resend")
term, success = send_append_entry(node_index, node, entry)
end
return term, success
end

function heartbeat()
-- CRASH POINT 1 2 3 4 5 : RECOVERY 1.0 : AFTER 3
for i, n in pairs(job.nodes) do
if i ~= job.position then
events.thread(function ()
term, success = send_append_entry(i, n, nil)
if term > persistent_state.current_term then
if term ~= nil and term > persistent_state.current_term then
stepdown(term)
end
end)
Expand Down Expand Up @@ -192,15 +211,15 @@ function trigger_election_timeout()
if i ~= job.position then
events.thread(function ()
local term, vote_granted = send_vote_request(n, i)
print("Vote Request result "..term.." : "..json.encode(vote_granted).." from "..i)
print("Vote Request result "..json.encode(term)..", "..json.encode(vote_granted).." from "..i)
if vote_granted == true then
nb_vote = nb_vote + 1
-- If the majority grant the vote -> become the leader
if nb_vote > majority_threshold and volatile_state.state == "candidate" then
become_leader()
end
end
if term > persistent_state.current_term then
if term ~= nil and term > persistent_state.current_term then
stepdown(term)
end
end)
Expand Down
56 changes: 35 additions & 21 deletions app_test/raft_election_anim.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,38 +74,53 @@ function stepdown(term)
end
end

local inc = 0
function uprc_call_timeout(node, data, timeout)
inc = inc + 1 -- Unique name for each rpc
local name = "urpc.call:"..inc
local ok = false
local term, res = nil, false
local function call()
term, res = urpc.call(node, data, timeout*10)
ok = true
events.fire(name)
end
local function timeout_rec()
events.thread(function()
events.sleep(timeout)
if (ok == false) then
call()
timeout_rec()
end
end)
end
call()
timeout_rec()
events.wait(name, timeout*10) -- After 9 retry stop anyway -> return nil, false
return term, res
end

function send_vote_request(node, node_index)
aSendData(node_index, "SEND VOTE REQUEST")

-- Here not really used because only election
last_log_index = #persistent_state.log
last_log_term = 0
if last_log_index > 0 then
last_log_term = persistent_state.log[#persistent_state.log].term
end

local term, vote_granted = urpc.call(node, {
local term, vote_granted = uprc_call_timeout(node, {
"request_vote", persistent_state.current_term, job.position
}, rpc_timeout)
if term == nil then -- Timeout occur retry
print("RPC VOTE REQUEST Timeout retried - resend")
term, vote_granted = send_vote_request(node, node_index)
end
aReceiveData(node_index, "RECEIVED VOTE REQUEST")
if term ~= nil then
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
end
return term, vote_granted
end

function send_append_entry(node_index, node, entry)
aSendData(node_index, "SEND APPEND ENTRY")

local term, success = urpc.call(node, {
local term, success = uprc_call_timeout(node, {
"append_entry", persistent_state.current_term, job.position, entry
}, rpc_timeout)
if term == nil then -- Timeout
print("RPC append entry - Timeout retried - resend")
term, success = send_append_entry(node_index, node, entry)
if term ~= nil then
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
end
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
return term, success
end

Expand All @@ -114,7 +129,7 @@ function heartbeat()
if i ~= job.position then
events.thread(function ()
term, success = send_append_entry(i, n, nil)
if term > persistent_state.current_term then
if term ~= nil and term > persistent_state.current_term then
stepdown(term)
end
end)
Expand Down Expand Up @@ -213,15 +228,14 @@ function trigger_election_timeout()
if i ~= job.position then
events.thread(function ()
local term, vote_granted = send_vote_request(n, i)
print("Vote Request result "..term.." : "..json.encode(vote_granted).." from "..i)
if vote_granted == true then
nb_vote = nb_vote + 1
-- If the majority grant the vote -> become the leader
if nb_vote > majority_threshold and volatile_state.state == "candidate" then
become_leader()
end
end
if term > persistent_state.current_term then
if term ~= nil and term > persistent_state.current_term then
stepdown(term)
end
end)
Expand Down
File renamed without changes
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<edge int_idx="16" int_src="6" int_dst="5" specs="DaemonRouter"/>
</edges>
<specs>
<RouterRouter dbl_kbps="15000" int_delayms="600"/>
<RouterRouter dbl_kbps="15000" int_delayms="200"/>
<DaemonRouter dbl_kbps="1000" int_delayms="15"/>
</specs>
</topology>

0 comments on commit b6271fc

Please sign in to comment.