Skip to content

Commit

Permalink
continue new raft
Browse files Browse the repository at this point in the history
  • Loading branch information
Willfrit committed May 23, 2019
1 parent 66eeab9 commit 36ae032
Showing 1 changed file with 87 additions and 26 deletions.
113 changes: 87 additions & 26 deletions app_test/raft.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,118 @@ majority_threshold = misc.size(job.nodes) / 2

volatile_state = {
commit_index = 0,
last_applied = 0
}
volatile_state_leader = {
next_index = {}, -- Array
match_index = {} -- Array
last_applied = 0,
state = "follower" -- follower, candidate or leader
}

function state_leader_init()
local next_index_tmp = {}
local match_index_tmp = {}
for i, n in pairs(job.nodes) do
next_index_tmp[i] = #persistent_state.log + 1
match_index_tmp[i] = 0
end
return {
next_index = {}, -- Array
match_index = {} -- Array
}
end

-- Save in storage before rpc response (in file - it is a trick)
persistent_state = {
current_term = 0,
voted_for = nil,
log = {}
log = {} -- Array of {term = associated_term, data = <change_state>}
}

-- Minimal timeout for each purpose in second
election_timeout = 1.5
rpc_timeout = 0.2
heartbeat_timeout = 0.6

-- Timeout variable (to check if timeout has been canceled)
rpc_time = {}
election_time = nil
heart_time = nil

local pers_file = io.open("pers.json", "r")
if pers_file ~= nil then
persistent_state = json.decode(pers_file:read("*a"))
pers_file.close()
end

volatile_state_leader = state_leader_init()

-- Utils functions
function save_persistent_state()
pers_file = io.open("pers.json", "w+")
pers_file:write(json.encode(persistent_state))
pers_file.close()
end

function send_vote_request(node)
last_log_index = #persistent_state.log
last_log_term = 0
if last_log_index > 0 then
last_log_term = persistent_state.log[#persistent_state.log].term
end
local term, vote_granted = urpc.call(node, {
"request_vote", persistent_state.current_term, job.position, last_log_index, last_log_term
}, rpc_timeout)
if term == nil then
term, vote_granted = send_vote_request(node)
end -- Timeout occur retry
return term, vote_granted
end

function send_append_entry(node_index, node, entry)
local next_index = volatile_state_leader.next_index[i]
local prev_log_index = next_index - 1
local prev_log_term = 0
if #persistent_state.log > 1 then
local prev_log_term = persistent_state.log[next_index - 1].term
end
local term, success = urpc.call(node, {
"append_entry", job.position, prev_log_index, prev_log_term, entry, volatile_state.commit_index
}, rpc_timeout)
if term == nil then -- Timeout
term, success = send_append_entry(node_index, node, entry)
end
return term, success
end

function heartbeat()
for i, n in pairs(job.nodes) do
if i ~= job.position then
events.thread(function () send_append_entry(i, n, nil) end)
end
end
end

function become_leader()
heartbeat()
events.periodic(heartbeat_timeout, function() heartbeat() end)
-- No client simulation for now
end

-- RCP functions
function append_entry(term, leader_id, prev_log_index, prev_log_term, entry, leader_commit)
print("APPEND ENTRY FROM "..leader_id)
print("APPEND ENTRY FROM "..leader_id.." Term : "..term.." Entry : "..json.encode(entry))
set_election_timeout()
local success = false
if prev_log_index > 1 and then


end

if success then
save_persistent_state() -- Save persistant state
end
return persistent_state.current_term, success
end

function request_vote(term, leader_id, prev_log_index, prev_log_term)
print("REQUEST VOTE")
function request_vote(term, candidate_id, prev_log_index, prev_log_term)
print("REQUEST VOTE FROM "..candidate_id.." Term : "..term)
set_election_timeout()
local vote_granted = nil

if success then
Expand All @@ -59,26 +130,14 @@ function request_vote(term, leader_id, prev_log_index, prev_log_term)
return persistent_state.current_term, persistent_state.voted_for
end

-- Minimal timeout for each purpose in second
election_timeout = 1.5
rpc_timeout = 0.2
heartbeat_timeout = 0.6

-- Timeout variable (to check if timeout has been canceled)
rpc_time = {}
election_time = nil
heart_time = nil

-- Timeout functions
function set_election_timeout()
election_time = misc.time()
local time = election_time
events.thread(function ()
events.sleep(((math.random() + 1.0) * election_timeout))
-- if the timeout is not cancelled
if (time == election_time) then
trigger_election_timeout()
end
if (time == election_time) then trigger_election_timeout() end
end)
end

Expand All @@ -92,9 +151,12 @@ function trigger_election_timeout()
for i, n in pairs(job.nodes) do
if i ~= job.position then
events.thread(function ()
res = urpc.call(n, {"append_entry", 5, job.position})
if res == true then
local term, vote_granted = send_vote_request(node)
if vote_granted == true then
nb_vote = nb_vote + 1
if nb_vote > majority_threshold then -- Become the leader
become_leader()
end
end
end)
end
Expand All @@ -107,7 +169,6 @@ urpc.server(job.me)
-- Main
events.run(function()


-- Election manage
set_election_timeout()
events.exit()
Expand Down

0 comments on commit 36ae032

Please sign in to comment.