注意:设计文档里面代码都是伪代码
- 投票请求
message RequestVoteReq
{
bool prevote;
int64 term;
int64 candidtate_id;
int64 last_log_idx;
int64 last_log_term;
}
- 投票响应
message RequestVoteResp
{
bool prevote;
int64 request_term;
int64 term;
bool vote_granted;
}
- 日志项
enum EntryType
{
NORMAL
JOIN_NO_VOTE_NODE
JOIN_NODE
LEAVE_NODE
NO_OP
}
message Entry
{
int64 term;
int64 id;
entry_type type
int64 data_size;
bytes[] data;
}
- 追加日志请求
message AppendEntriesReq
{
int64 leader_id;
int64 message_index;
int64 term;
int64 prev_log_idx;
int64 prev_log_term;
int64 leader_commit;
int64 entries_size;
entry[] enrires;
}
- 追加日志的响应
message AppendEntriesResp
{
string message_token;
int64 term;
bool success
int64 curr_index;
}
- 发送快照的请求
message SnapshotBlock
{
int64 offset;
bytes[] data;
int64 block_size;
bool is_last_chunk;
}
message SnapshotReq
{
int64 term;
int64 leader_id;
string message_token;
int64 snapshot_index;
int64 snapshot_term;
snapshot_block block;
}
- 发送快照的响应
message SnapshotResp
{
int64 term;
string message_token;
int64 offset;
bool success;
bool is_last_chunk;
}
- Slot 定义
enum SlotStatus
{
RUNNING
MIGRATING
IMPORTING
UNSET
}
message Slot
{
slot_status status
int64 id
}
- 服务定义
enum ServerStatus
{
RUNNING
DOWN
}
message Server
{
string id
string addr
server_status status
}
- 集群分片定义
message ShardGroup
{
int64 id;
slot[] slots;
Server[] servers;
}
- 增删节点到分片的请求和响应
enum ShardMembersChangeType
{
ADD_SERVER
REMOVE_SERVER
}
message ShardGroupMembersChangeReq
{
ShardMembersChangeType req_type;
int64 shard_id;
Server server;
int64 config_version;
}
message SharhgroupMembersChangeResp
{
bool success;
err_code err;
ShardGroup shard;
int64 config_version;
}
- 增删分片到集群的请求和响应
enum ClusterShardChangeType
{
ADD_SHARDGROUP
REMOVE_SHRADGROUP
}
message ClusterShardGroupChangeReq
{
cluster_shard_change_type req_type;
int64 shard_id;
shard_group shard;
int64 config_version;
}
message ClusterShardgroupChangeResp
{
bool success;
err_code err;
shard_group[] shards
int64 config_version;
}
- 客户数据读写请求
enum OpType
{
PUT;
GET;
DEL;
SCAN;
}
message KvOpPair
{
OpType op_t;
string key;
string val;
}
message ClientOperationReq
{
string client_token;
uint64 timestamp;
kv_op_pair[] kvs;
}
message ClientOperationResp
{
bool success;
error_code err;
kv_op_pair[] kvs;
int64 cursor;
}
service EraftKv {
// raft 内部
service RequestVote(RequestVoteReq, RequestVoteResp)
service AppendEntries(AppendEntriesReq, AppendEntriesResp)
service Snapshot(SnapshotReq, SnapshotResp);
// 客户端
service ProcessRwOperation(ClientOperationReq, ClientOperationResp);
service ProcessShardgroupMembersChange(ShardgroupMembersChangeReq, SharhgroupMembersChangeResp);
service ProcessClusterShardgroupChange(ClusterShardgroupChangeReq,ClusterShardgroupChangeResp);
}
- raft_server 核心
StateEnum
{
FOLLOWER
PRECANDIDATE
CANDIDATE
LEADER
}
// server 上下文
class RaftServer
{
// 成员变量
int64 current_term;
int64 voted_for;
int64 commit_idx;
int64 last_applied_idx;
int64 last_applied_term;
StateEnum state;
int64 leader_id;
// 用于读优化的
int64 message_index;
int64 last_acked_message_index;
NetworkInterface ntface;
StorageInterface stface;
RaftLog log_;
...
//主要方法
RaftServer(RaftLog* logImpl);
Status SaveMetaData(int64 term, int64 vote);
Status LoadMetaData();
Node* JoinNode(int64 id, bool is_self);
Status RemoveNode(Node* node);
Status RunCycles();
Status HandleRequestVoteReq(Node *from_node, RequestVoteReq* req, RequestVoteResp* resp)
Status HandleRequestVoteResp(Node *from_node, RequestVoteResp* resp);
Status HandleAppendEntriesReq(Node *from_node,
AppendEntriesReq* req, AppendEntriesResp* resp);
Status HandleAppendEntriesResp(Node *from_node,
AppendEntriesResp* resp);
Status HandleSnapshotReq(Node *from_node,
SnapshotReq* req, SnapshotResp* resp);
Status HandleSnapshotResp(Node *from_node,
SnapshotResp* resp);
Entry* ProposeEntry(Entry* ety);
Status BecomeLeader();
Status BecomeFollower();
Status BcomeCandidate();
Status BcomePreCandidate();
Status BeginSnapshot();
Status EndSnapshot();
bool SnapshotRunning();
Entry* GetLastAppliedEntry();
int64 GetFirstEntryIdx();
Status RestoreSnapshotAfterRestart();
Status BeginLoadSnapshot(int64 last_included_term, int64 last_included_index);
Status EndLoadSnapshot();
Status ProposeReadReq();
int64 GetLogsCountCanSnapshot();
Status RestoreLog();
}
// 待应用层实现网络消息发送的接口
interface RaftNetworkInterface
{
Status SendRequestVote(RaftServer* raft, Node* target_node, request_vote_req* req);
Status SendAppendEntries(RaftServer* raft, Node* target_node, AppendEntriesReq* req);
Status SendSnapshot(RaftServer* raft, Node* target_node, SnapshotReq* req);
Status ConfChangeEvent(RaftServer* raft, Node* node, Entry* entry, EventType t);
void StateChangeEvent(RaftServer* raft, StateEnum* state);
...
}
class GRpcRaftNetwork : public RaftNetworkInterface {
// grpc 实现
grpc::Channel* chan_;
}
// 待应用层实现,快照存储相关的接口
interface RaftStorageInterface
{
Status LoadSnapshot(RaftServer* raft, int64 snapshot_index, int64 snapshot_term);
Status GetSnapshotBlock(RaftServer* raft, Node* node, int64 offset, SnapshotBlock* block);
Status StoreSnapshotBlock(RaftServer* raft, int64 snapshot_index, int64 offset, SnapshotBlock* block);
Status ClearSnapshot(RaftServer* raft);
Status SaveMeta(int64 term, int64 vote);
Vec<Entry*> GetEntsToSend(RaftServer* raft, Node* node, int64 idx, int64 count);
Status CreateSnapshot(string snapshot_dir);
...
}
class RocksDBRaftStorage : public RaftStorageInterface {
// rocksdb 实现存储
rocksdb::DB* kv_db_;
}
class RaftLog
{
uint64 count_;
uint64 node_id_;
// 日志项缓存
Vec<LogEntry*> ents_;
LogStoreInterface* firstLogStore;
LogStoreInterface* secLogStore;
...
// 方法
}
// 待应用层实现的 log 具体存储的接口
interface LogStoreInterface
{
void Init(RaftServer* raft)
void Free();
void Reset(int64 index, int64 term);
Status Append(LogEntry *ety);
Status EraseBefore(int64 firstIndex);
Status EraseAfter(int64 fromIndex);
LogEntry* Get(int64 index);
Vec<LogEntry*> Gets(int64 firstIdx, int64 entCount)
int64 FirstIndex();
int64 LastIndex();
int64 Count();
Status SyncLogDB();
}
// 内存版本实现
class MemoryLogStorage : public LogStoreInterface {
Vec<LogEntry*> ents_;
}
// rocksdb 版实现
class RocksDBLogStorage : public LogStoreInterface {
int64 node_id_;
// 前一个 log_db 最后一个日志的任期
int64 prev_log_term_;
// 前一个 log_db 最后一个日志的索引号
int64 prev_log_index_;
int64 log_entry_size_;
// 当前 db 中最后一个日志的索引号
int64 last_entry_index_;
string log_db_path_;
rocksdb::DB* log_db_;
}
struct ERaftKvServerConfig {
string svr_version;
string svr_addr;
string kv_db_path;
string log_db_path;
int64 tick_interval;
int64 request_timeout;
int64 election_timeout;
int64 response_timeout;
int64 ae_max_count;
int64 ae_max_size;
int64 snap_max_count;
int64 snap_max_size;
int64 grpc_max_recv_msg_size;
int64 grpc_max_send_msg_size;
}
class ERaftKvServer : public grpc::EraftKv::Service
{
RaftServer raft_context_;
ERaftKvServerConfig config_;
Status RequestVote(RequestVoteReq, RequestVoteResp)
Status AppendEntries(AppendEntriesReq, AppendEntriesResp)
Status Snapshot(SnapshotReq, SnapshotResp);
Status ProcessRwOperation(ClientOperationReq, ClientOperationResp);
Status ProcessShardgroupMembersChange(ShardgroupMembersChangeReq, SharhgroupMembersChangeResp);
Status ProcessClusterShardgroupChange(ClusterShardgroupChangeReq,ClusterShardgroupChangeResp);
Status InitTicker();
Status BuildAndRunRpcServer();
Status RunRaftCycle();
}
struct ERaftMetaServerConfig {
string svr_version;
string svr_addr;
string kv_db_path;
string log_db_path;
int64 tick_interval;
int64 request_timeout;
int64 election_timeout;
int64 response_timeout;
int64 ae_max_count;
int64 ae_max_size;
int64 snap_max_count;
int64 snap_max_size;
int64 grpc_max_recv_msg_size;
int64 grpc_max_send_msg_size;
}
class ERaftMetaServer
{
RaftServer raft_context_;
ERaftMetaServerConfig config_;
Status RequestVote(RequestVoteReq, RequestVoteResp)
Status AppendEntries(AppendEntriesReq, AppendEntriesResp)
Status Snapshot(SnapshotReq, SnapshotResp);
Status ProcessRwOperation(ClientOperationReq, ClientOperationResp);
Status ProcessShardgroupMembersChange(ShardgroupMembersChangeReq, SharhgroupMembersChangeResp);
Status ProcessClusterShardgroupChange(ClusterShardgroupChangeReq,ClusterShardgroupChangeResp);
Status InitTicker();
Status BuildAndRunRpcServer();
Status RunRaftCycle();
}