Skip to content

Commit

Permalink
init node state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
fanhousanbu committed May 15, 2024
1 parent be15c21 commit 082d58f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 5 deletions.
17 changes: 12 additions & 5 deletions internal/community/node/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (d *CommunityDelegate) LocalState(join bool) []byte {
if s, err := storage.UnmarshalSnapshot(ss); err == nil {
members := storage.GetAllMembers(s.TotalMembers)
if members != nil {
return members[0].Marshal()
return storage.MarshalMembers(members)
}
}
}
Expand All @@ -52,11 +52,18 @@ func (d *CommunityDelegate) LocalState(join bool) []byte {
func (d *CommunityDelegate) MergeRemoteState(buf []byte, join bool) {
if len(buf) > 0 {
if join {
// TODO: merge partial data by init sync policy from remote
fmt.Print("join: Merge remote state: ", buf)
go func() {
members := storage.UnmarshalMembers(buf)
storage.InitRemoteMember(members)
}()
} else {
// TODO: merge partial data by non-init sync policy from remote
fmt.Print("sync: Merge remote state: ", buf)
go func() {
if members, err := storage.Unmarshal(buf); err != nil {
fmt.Print("sync: Failed to unmarshal remote state: ", err)
} else {
storage.MergeRemoteMember(members)
}
}()
}
}
}
68 changes: 68 additions & 0 deletions internal/community/storage/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,71 @@ func TryFindMember(hashedAccount string) (*Member, error) {
}
}
}

func MarshalMembers(m []Member) []byte {
ret := []byte{}
for _, member := range m {
b := member.Marshal()
sz := uintToBytes(uint16(len(b)))
ret = append(ret, sz...)
ret = append(ret, b...)
}
return ret
}

func UnmarshalMembers(b []byte) []Member {
ret := []Member{}
for len(b) > 0 {
sz := binary.LittleEndian.Uint16(b[:2])
b = b[2:]
m, _ := Unmarshal(b[:sz])
ret = append(ret, *m)
b = b[sz:]
}
return ret
}

func InitRemoteMember(members []Member) {
for _, member := range members {
if err := UpsertMember(member.HashedAccount, member.PublicKey, "", member.RpcAddress, member.RpcPort, &member.Version); err != nil {
fmt.Print("Failed to init remote member: ", err)
}
}
}

func MergeRemoteMember(recv *Member) error {
if stor, err := conf.GetStorage(); err != nil {
return err
} else {
if db, err := leveldb.Open(stor, nil); err != nil {
return err
} else {
defer func() {
stor.Close()
db.Close()
}()

if oldMemberByte, err := db.Get([]byte(memberKey(recv)), nil); err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
if err := db.Put([]byte(memberKey(recv)), recv.Marshal(), nil); err != nil {
return err
} else {
return nil
}
}
return err
} else {
if oldMember, err := Unmarshal(oldMemberByte); err != nil {
return err
} else {
recv = compareAndUpdateMember(oldMember, recv)
if err := db.Put([]byte(memberKey(recv)), recv.Marshal(), nil); err != nil {
return err
} else {
return nil
}
}
}
}
}
}
27 changes: 27 additions & 0 deletions internal/community/storage/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,30 @@ func TestUpsertMember(t *testing.T) {
t.Error("TestUpsertMember failed")
}
}

func TestMarshalMembers(t *testing.T) {
member1 := &Member{
HashedAccount: "HelloWorld",
RpcAddress: "test",
RpcPort: 165,
PublicKey: "Abc Def",
PrivateKeyVault: func() *string { s := "privateKeyVault"; return &s }(),
Version: 22222,
}
member2 := &Member{
HashedAccount: "HelloWorld2",
RpcAddress: "test2",
RpcPort: 166,
PublicKey: "Abc Def2",
PrivateKeyVault: func() *string { s := "privateKeyVault2"; return &s }(),
Version: 22223,
}
members := []Member{*member1, *member2}
marshal := MarshalMembers(members)
unmarshal := UnmarshalMembers(marshal)
if reflect.DeepEqual(members, unmarshal) {
t.Log("TestMarshalMembers passed")
} else {
t.Error("TestMarshalMembers failed")
}
}

0 comments on commit 082d58f

Please sign in to comment.