Skip to content

Commit

Permalink
Final anim raft
Browse files Browse the repository at this point in the history
  • Loading branch information
Willfrit committed Jun 1, 2019
1 parent b6271fc commit 1f0e52f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 41 deletions.
23 changes: 10 additions & 13 deletions app_test/raft_election_anim.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ function stepdown(term)
end

local inc = 0
function uprc_call_timeout(node, data, timeout)
function uprc_call_timeout(node, data, timeout, node_index)
inc = inc + 1 -- Unique name for each rpc
local name = "urpc.call:"..inc
local ok = false
local term, res = nil, false
local function call()
aSendData(node_index, "SEND VOTE REQUEST")
term, res = urpc.call(node, data, timeout*10)
if term ~= nil then
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
end
ok = true
events.fire(name)
end
Expand All @@ -101,26 +105,18 @@ function uprc_call_timeout(node, data, timeout)
end

function send_vote_request(node, node_index)
aSendData(node_index, "SEND VOTE REQUEST")

local term, vote_granted = uprc_call_timeout(node, {
"request_vote", persistent_state.current_term, job.position
}, rpc_timeout)
if term ~= nil then
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
end
}, rpc_timeout, node_index)

return term, vote_granted
end

function send_append_entry(node_index, node, entry)
aSendData(node_index, "SEND APPEND ENTRY")

local term, success = uprc_call_timeout(node, {
"append_entry", persistent_state.current_term, job.position, entry
}, rpc_timeout)
if term ~= nil then
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
end
}, rpc_timeout, node_index)

return term, success
end

Expand Down Expand Up @@ -161,6 +157,7 @@ function append_entry(term, leader_id, entry)
-- reset the election timeout (avoiding new election)
set_election_timeout()
volatile_state.state = "follower" -- if candidate, return in follower state
aUpdateState()

-- HEARTBEAT
if entry == nil then
Expand Down
66 changes: 38 additions & 28 deletions app_test/raft_election_crash_anim.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ end
-- Utils functions
function save_persistent_state()
pers_file = io.open(filename_persistent, "w+")
-- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : RANDOM 0.01
pers_file:write(json.encode(persistent_state))
pers_file:close()
end
Expand All @@ -75,38 +74,49 @@ function stepdown(term)
end
end

function send_vote_request(node, node_index)
aSendData(node_index, "SEND VOTE REQUEST")

-- Here not really used because only election
last_log_index = #persistent_state.log
last_log_term = 0
if last_log_index > 0 then
last_log_term = persistent_state.log[#persistent_state.log].term
local inc = 0
function uprc_call_timeout(node, data, timeout, node_index)
inc = inc + 1 -- Unique name for each rpc
local name = "urpc.call:"..inc
local ok = false
local term, res = nil, false
local function call()
aSendData(node_index, "SEND VOTE REQUEST")
term, res = urpc.call(node, data, timeout*10)
if term ~= nil then
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
end
ok = true
events.fire(name)
end
local function timeout_rec()
events.thread(function()
events.sleep(timeout)
if (ok == false) then
call()
timeout_rec()
end
end)
end
call()
timeout_rec()
events.wait(name, timeout*10) -- After 9 retry stop anyway -> return nil, false
return term, res
end

local term, vote_granted = urpc.call(node, {
function send_vote_request(node, node_index)
local term, vote_granted = uprc_call_timeout(node, {
"request_vote", persistent_state.current_term, job.position
}, rpc_timeout)
if term == nil then -- Timeout occur retry
print("RPC VOTE REQUEST Timeout retried - resend")
term, vote_granted = send_vote_request(node, node_index)
end
aReceiveData(node_index, "RECEIVED VOTE REQUEST")
}, rpc_timeout, node_index)

return term, vote_granted
end

function send_append_entry(node_index, node, entry)
aSendData(node_index, "SEND APPEND ENTRY")

local term, success = urpc.call(node, {
local term, success = uprc_call_timeout(node, {
"append_entry", persistent_state.current_term, job.position, entry
}, rpc_timeout)
if term == nil then -- Timeout
print("RPC append entry - Timeout retried - resend")
term, success = send_append_entry(node_index, node, entry)
end
aReceiveData(node_index, "RECEIVED APPEND ENTRY")
}, rpc_timeout, node_index)

return term, success
end

Expand All @@ -116,7 +126,7 @@ function heartbeat()
if i ~= job.position then
events.thread(function ()
term, success = send_append_entry(i, n, nil)
if term > persistent_state.current_term then
if term ~= nil and term > persistent_state.current_term then
stepdown(term)
end
end)
Expand Down Expand Up @@ -148,6 +158,7 @@ function append_entry(term, leader_id, entry)
-- reset the election timeout (avoiding new election)
set_election_timeout()
volatile_state.state = "follower" -- if candidate, return in follower state
aUpdateState()

-- HEARTBEAT
if entry == nil then
Expand Down Expand Up @@ -216,7 +227,6 @@ function trigger_election_timeout()
if i ~= job.position then
events.thread(function ()
local term, vote_granted = send_vote_request(n, i)
print("Vote Request result "..term.." : "..json.encode(vote_granted).." from "..i)
if vote_granted == true then
nb_vote = nb_vote + 1
-- CRASH POINT 1 2 3 4 5 : RECOVERY 0.5 : RANDOM 0.06
Expand All @@ -226,7 +236,7 @@ function trigger_election_timeout()
become_leader()
end
end
if term > persistent_state.current_term then
if term ~= nil and term > persistent_state.current_term then
stepdown(term)
end
end)
Expand Down

0 comments on commit 1f0e52f

Please sign in to comment.