Skip to content

Commit

Permalink
Raft Clean!
Browse files Browse the repository at this point in the history
  • Loading branch information
Willfrit committed Apr 22, 2019
1 parent 33544df commit bba61bf
Showing 1 changed file with 16 additions and 30 deletions.
46 changes: 16 additions & 30 deletions app_test/raft.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,20 @@ job.me = {ip = '127.0.0.1', port= tonumber(arg[1])}
--job.nodes = {{ip= '127.0.0.1', port= 15001 }, {ip= '127.0.0.1', port= 15002 },{ip= '127.0.0.1', port= 15003 },{ip= '127.0.0.1', port= 15004 },{ip= '127.0.0.1', port= 15005 }}
job.nodes = {{ip= '127.0.0.1', port= 15001 }, {ip= '127.0.0.1', port= 15002 }, {ip= '127.0.0.1', port= 15003 }} ]]

print("Begin Raft : I am "..job.me.ip..":"..job.me.port)
print("RAFT.LUA START on "..job.me.ip..":"..job.me.port.." ("..job.position..")")

require("splay.base")
local math = require("math")
local net = require("splay.net")
local misc = require("splay.misc")

-- Index in the list of nodes -> Send to other of nodes.
job_index = -1
for k, n in pairs(job.nodes) do
if (job.me.ip == n.ip and job.me.port == n.port) then
job_index = k
break
end
end
if (job_index == -1) then
error("Can't find me in the nodes array")
end
print("I am the job number "..job_index)
job_index = job.position

-- Minimal timeout for each purpose in second
election_timeout = 2.0
rpc_timeout = 0.3
heartbeat_timeout = 0.8
election_timeout = 1.5
rpc_timeout = 0.2
heartbeat_timeout = 0.6

-- Constant msg send and receive between nodes
vote_msg = {req = "VOTEREQ", rep = "VOTEREP"}
Expand All @@ -57,22 +47,18 @@ state = {
-- sockets table of each connected node (nil == cot connected to)
sockets = {}

-- Timeout variable (if x_time < misc.time() then trigger the corresponded timeout)
-- Timeout variable (to check if timeout has been canceled)
rpc_time = {}
election_time = nil
heart_time = nil

-- helper functions
function is_leader(job_index)
return state.leader == job_index
end

function set_contains(set, key)
return set[key] ~= nil
end

function stepdown(term)
print("Stepdown")
print("STEPDOWN : "..term.." > "..state.term)
state.term = tonumber(term)
state.state = "follower"
state.voteFor = nil
Expand Down Expand Up @@ -110,7 +96,7 @@ function set_heart_timeout()
events.thread(function ()
events.sleep(heartbeat_timeout)
-- if the timeout is not cancelled
if (time == heart_time and is_leader(job_index)) then
if (time == heart_time and state.leader == job_index) then
trigger_heart_timeout()
end
end)
Expand Down Expand Up @@ -182,7 +168,7 @@ function receive(s)
end
rpc_time[s.job_index] = nil
if misc.size(state.votes) > misc.size(job.nodes) /2 then
print("I become the leader")
print("I become the LEADER")
state.state = "leader"
state.leader = job_index
trigger_heart_timeout()
Expand All @@ -203,8 +189,9 @@ function receive(s)
-- HEARBEAT
set_election_timeout()
state.term = tonumber(table_d[2])
state.voteFor = nil
else
print("Warning : unkown message"..table_d[1])
print("Warning : unkown message -> "..table_d[1])
end
end
end
Expand All @@ -220,14 +207,14 @@ function init(s, connect)

s.job_index = tonumber(d)

print("connection to: "..ip..":"..port.." - index = "..s.job_index)
print("connection to: "..ip..":"..port.." - job_index = "..s.job_index)
else
local d = s:receive()
s:send(job_index.."\n")

s.job_index = tonumber(d)

print("connection from: "..ip..":"..port.." - index = "..s.job_index)
print("connection from: "..ip..":"..port.." - job_index = "..s.job_index)
end
if sockets[s.job_index] ~= nil then
print("I am already connect to "..s.job_index )
Expand Down Expand Up @@ -264,9 +251,8 @@ events.run(function()
-- Election manage
set_election_timeout()

-- Stop after 20 seconds
events.sleep(20)
print("Raft exit")
-- Stop after 10 seconds
events.sleep(10)
print("RAFT.LUA EXIT on"..job.me.ip..":"..job.me.port)
events.exit()
end)

0 comments on commit bba61bf

Please sign in to comment.