Skip to content

Commit

Permalink
continue new raft
Browse files Browse the repository at this point in the history
  • Loading branch information
Willfrit committed May 23, 2019
1 parent 36ae032 commit 65b0948
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions app_test/raft.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ function state_leader_init()
match_index_tmp[i] = 0
end
return {
next_index = {}, -- Array
match_index = {} -- Array
next_index = next_index_tmp, -- Array
match_index = match_index_tmp -- Array
}
end

Expand All @@ -47,7 +47,7 @@ heart_time = nil
local pers_file = io.open("pers.json", "r")
if pers_file ~= nil then
persistent_state = json.decode(pers_file:read("*a"))
pers_file.close()
pers_file:close()
end

volatile_state_leader = state_leader_init()
Expand All @@ -56,26 +56,29 @@ volatile_state_leader = state_leader_init()
function save_persistent_state()
pers_file = io.open("pers.json", "w+")
pers_file:write(json.encode(persistent_state))
pers_file.close()
pers_file:close()
end

function send_vote_request(node)
print("Send request to "..json.encode(node))

last_log_index = #persistent_state.log
last_log_term = 0
if last_log_index > 0 then
last_log_term = persistent_state.log[#persistent_state.log].term
end
local term, vote_granted = urpc.call(node, {
"request_vote", persistent_state.current_term, job.position, last_log_index, last_log_term
"request_vote", persistent_state.current_term, job.position, last_log_index, last_log_term
}, rpc_timeout)
if term == nil then
if term == nil then -- Timeout occur retry
term, vote_granted = send_vote_request(node)
end -- Timeout occur retry
end
return term, vote_granted
end

function send_append_entry(node_index, node, entry)
local next_index = volatile_state_leader.next_index[i]
print("Send append entry to "..json.encode(node).." my volatile leader state "..json.encode(volatile_state_leader))
local next_index = volatile_state_leader.next_index[node_index]
local prev_log_index = next_index - 1
local prev_log_term = 0
if #persistent_state.log > 1 then
Expand All @@ -91,6 +94,7 @@ function send_append_entry(node_index, node, entry)
end

function heartbeat()
-- CRASH POINT 1 2 3 : RECOVERY 0.5 : AFTER 3
for i, n in pairs(job.nodes) do
if i ~= job.position then
events.thread(function () send_append_entry(i, n, nil) end)
Expand All @@ -99,6 +103,8 @@ function heartbeat()
end

function become_leader()
-- cancelled timout election
election_time = misc.time()
heartbeat()
events.periodic(heartbeat_timeout, function() heartbeat() end)
-- No client simulation for now
Expand All @@ -108,26 +114,33 @@ end
function append_entry(term, leader_id, prev_log_index, prev_log_term, entry, leader_commit)
print("APPEND ENTRY FROM "..leader_id.." Term : "..term.." Entry : "..json.encode(entry))
set_election_timeout()
local success = false
if prev_log_index > 1 and then

-- HEARTBEAT
if entry == nil then
return persistent_state.current_term, true
end

local success = false
if success then
save_persistent_state() -- Save persistant state
end
return persistent_state.current_term, success
end

function request_vote(term, candidate_id, prev_log_index, prev_log_term)
function request_vote(term, candidate_id, last_log_index, last_log_term)
print("REQUEST VOTE FROM "..candidate_id.." Term : "..term)
set_election_timeout()
local vote_granted = nil

if success then

if term < persistent_state.current_term then
return persistent_state.current_term, false
end
local vote_granted = false
if ((persistent_state.voted_for == nil or persistent_state.voted_for == candidate_id)
and last_log_index >= #persistent_state.log) or term > persistent_state.current_term then
persistent_state.voted_for = candidate_id
save_persistent_state() -- Save persistant state

vote_granted = true
set_election_timeout()
end
return persistent_state.current_term, persistent_state.voted_for
return persistent_state.current_term, vote_granted
end

-- Timeout functions
Expand All @@ -143,15 +156,17 @@ end

-- Trigger functions
function trigger_election_timeout()
print("Election trigger")
persistent_state.current_term = persistent_state.current_term + 1
persistent_state.voted_for = job.position
save_persistent_state()
set_election_timeout()
local nb_vote = 0
local nb_vote = 1
for i, n in pairs(job.nodes) do
if i ~= job.position then
events.thread(function ()
local term, vote_granted = send_vote_request(node)
local term, vote_granted = send_vote_request(n)
print("vote request result "..term.." : "..json.encode(vote_granted).." from "..json.encode(n))
if vote_granted == true then
nb_vote = nb_vote + 1
if nb_vote > majority_threshold then -- Become the leader
Expand All @@ -171,6 +186,8 @@ events.run(function()

-- Election manage
set_election_timeout()

events.sleep(10)
events.exit()
end)

0 comments on commit 65b0948

Please sign in to comment.