Skip to content

Commit

Permalink
node sync
Browse files Browse the repository at this point in the history
  • Loading branch information
fanhousanbu committed May 14, 2024
1 parent 71fac3d commit be15c21
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 82 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.2
require (
github.com/gin-gonic/gin v1.9.1
github.com/hashicorp/memberlist v0.5.1
github.com/stretchr/testify v1.9.0
github.com/swaggo/swag v1.16.3
github.com/syndtr/goleveldb v1.0.0
gopkg.in/yaml.v2 v2.4.0
Expand All @@ -16,6 +17,7 @@ require (
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/spec v0.21.0 // indirect
Expand All @@ -32,6 +34,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/miekg/dns v1.1.59 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/sync v0.7.0 // indirect
Expand Down
17 changes: 9 additions & 8 deletions internal/community/node/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ func (d *CommunityDelegate) GetBroadcasts(overhead, limit int) [][]byte {

// LocalState return the local state data while a remote node joins or sync
func (d *CommunityDelegate) LocalState(join bool) []byte {
if join {
if _, err := storage.GetAllMembers(); err != nil {
return nil
if ss, err := storage.GetSnapshot(); err == nil {
if join {
return ss
} else {

return []byte{1, 2, 3}
if s, err := storage.UnmarshalSnapshot(ss); err == nil {
members := storage.GetAllMembers(s.TotalMembers)
if members != nil {
return members[0].Marshal()
}
}
}
} else {
//TODO: retrive partial data by non-init sync policy form storage and return to joiner
return []byte{4, 5, 6}
}
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/community/node/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"another_node/conf"
"another_node/internal/community/storage"
"log"
"strings"

Expand Down Expand Up @@ -62,6 +63,7 @@ func New(listen *uint16, globalName *string, entrypoints *string, genesis *bool)
}

go node.listen()
go storage.ScheduleSnapshot()

return node, nil
}
Expand Down
24 changes: 1 addition & 23 deletions internal/community/storage/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/syndtr/goleveldb/leveldb"
)
Expand Down Expand Up @@ -42,7 +40,7 @@ type Member struct {
}

// uintToBytes convert uint to bytes in little endian
func uintToBytes[T uint16 | uint32](n T) []byte {
func uintToBytes[T uint16 | uint32 | int64](n T) []byte {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, n)
ret := buf.Bytes()
Expand Down Expand Up @@ -154,24 +152,11 @@ func compareAndUpdateMember(oldMember, newMember *Member) *Member {
}

const MemberPrefix = "member:"
const MemberIndexPrefix = "index:member:"

var memberIndexLock sync.Mutex

func memberKey(member *Member) string {
return fmt.Sprintf("%s%s", MemberPrefix, member.HashedAccount)
}

func memberIndexKey() string {
memberIndexLock.Lock()
defer func() {
// TODO: better way to unlock
time.Sleep(time.Nanosecond * 1)
memberIndexLock.Unlock()
}()
return fmt.Sprintf("%s%d", MemberIndexPrefix, time.Now().UnixNano())
}

// UpsertMember update a member if exists and newer than old by version
func UpsertMember(hashedAccount, publicKey, privateKey, rpcAddress string, rpcPort uint16, version *uint32) error {
if stor, err := conf.GetStorage(); err != nil {
Expand All @@ -180,7 +165,6 @@ func UpsertMember(hashedAccount, publicKey, privateKey, rpcAddress string, rpcPo
if db, err := leveldb.Open(stor, nil); err != nil {
return err
} else {
upserted := false
newMember := &Member{
HashedAccount: hashedAccount,
RpcAddress: rpcAddress,
Expand All @@ -199,18 +183,13 @@ func UpsertMember(hashedAccount, publicKey, privateKey, rpcAddress string, rpcPo
defer func() {
stor.Close()
db.Close()

if upserted {
newMemberIndex(newMember)
}
}()

if oldMemberByte, err := db.Get([]byte(memberKey(newMember)), nil); err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
if err := db.Put([]byte(memberKey(newMember)), newMember.Marshal(), nil); err != nil {
return err
} else {
upserted = true
return nil
}
}
Expand All @@ -223,7 +202,6 @@ func UpsertMember(hashedAccount, publicKey, privateKey, rpcAddress string, rpcPo
if err := db.Put([]byte(memberKey(newMember)), newMember.Marshal(), nil); err != nil {
return err
} else {
upserted = true
return nil
}
}
Expand Down
28 changes: 0 additions & 28 deletions internal/community/storage/member_index.go

This file was deleted.

21 changes: 12 additions & 9 deletions internal/community/storage/member_itor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)

func GetAllMembers() ([]Member, error) {
func GetAllMembers(skip uint32) []Member {
if stor, err := conf.GetStorage(); err != nil {
return nil, err
return nil
} else {
if db, err := leveldb.Open(stor, nil); err != nil {
return nil, err
return nil
} else {
defer func() {
stor.Close()
Expand All @@ -23,20 +23,23 @@ func GetAllMembers() ([]Member, error) {
iter := db.NewIterator(&util.Range{
Start: []byte(MemberPrefix),
}, nil)
i := 1
for iter.Next() {
if m, err := Unmarshal(iter.Value()); err != nil {
return nil, err
} else {
members = append(members, *m)
if i >= int(skip) {
if m, err := Unmarshal(iter.Value()); err != nil {
return nil
} else {
members = append(members, *m)
}
}
}
iter.Release()
err = iter.Error()
if err != nil {
return nil, err
return nil
}

return members, nil
return members
}
}
}
5 changes: 1 addition & 4 deletions internal/community/storage/member_itor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ func TestGetAllMembers(t *testing.T) {
UpsertMember(member2.HashedAccount, member2.PublicKey, "", member2.RpcAddress, member2.RpcPort, &member2.Version)

// Call the GetAllMembers function
members, err := GetAllMembers()
if err != nil {
t.Errorf("Expected no error, but got %v", err)
}
members := GetAllMembers(0)

// Check the returned members
expectedMembers := []Member{*member1, *member2}
Expand Down
10 changes: 0 additions & 10 deletions internal/community/storage/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,6 @@ func TestMarshalToUnmarshalThenMarhsalCompare(t *testing.T) {
}
}

func TestMemberIndex(t *testing.T) {
key1 := memberIndexKey()
key2 := memberIndexKey()
if len(key1) > 1 && len(key2) > 1 && key1 != key2 {
t.Log("TestMemberIndex passed")
} else {
t.Error("TestMemberIndex failed")
}
}

func TestUpsertMember(t *testing.T) {
member := &Member{
HashedAccount: "HelloWorld",
Expand Down
140 changes: 140 additions & 0 deletions internal/community/storage/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package storage

import (
"another_node/conf"
"encoding/binary"
"errors"
"fmt"
"time"

"github.com/syndtr/goleveldb/leveldb"
)

const Version uint8 = 1

type Snapshot struct {
Version uint8
TotalMembers uint32
HashedDigest []byte
}

func (s *Snapshot) Digest() *Snapshot {
s.HashedDigest = []byte{1, 2, 3}
return s
}

func (s *Snapshot) Marshal() []byte {
sizeOfSnapshot := 1 + 4 + len(s.HashedDigest)
buf := make([]byte, sizeOfSnapshot)

offset := 0
buf[offset] = s.Version
offset += 1
copy(buf[offset:offset+4], uintToBytes(s.TotalMembers))
offset += 4
copy(buf[offset:], s.HashedDigest)
return buf
}

func UnmarshalSnapshot(data []byte) (*Snapshot, error) {
if len(data) < 1+4 {
return nil, errors.New("invalid snapshot data")
}

s := &Snapshot{}
offset := 0
s.Version = data[offset]
offset += 1
s.TotalMembers = binary.LittleEndian.Uint32(data[offset : offset+4])
offset += 4
s.HashedDigest = data[offset:]
return s, nil
}

const SnapshotKey = "snapshot"

// GetSnapshot returns the current node state
// node state represent the members of the community snapshot
func GetSnapshot() ([]byte, error) {
if stor, err := conf.GetStorage(); err != nil {
return nil, err
} else {
if db, err := leveldb.Open(stor, nil); err != nil {
return nil, err
} else {
defer func() {
stor.Close()
db.Close()
}()

if ss, err := db.Get([]byte(SnapshotKey), nil); err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return nil, nil
}
return nil, err
} else {
return ss, nil
}
}
}
}

func memberCounter() (total uint32) {
if stor, err := conf.GetStorage(); err != nil {
return
} else {
if db, err := leveldb.Open(stor, nil); err != nil {
return
} else {
defer func() {
stor.Close()
db.Close()
}()

iter := db.NewIterator(nil, nil)
total = 0
for iter.Next() {
total++
}
iter.Release()
err = iter.Error()
if err != nil {
return
}

return
}
}
}

func ScheduleSnapshot() {
t := time.NewTicker(10 * time.Second)
for range t.C {
if err := updateSnapshot(); err != nil {
// log error
fmt.Println("error updating snapshot")
}
}
}

func updateSnapshot() error {
if stor, err := conf.GetStorage(); err != nil {
return err
} else {
if db, err := leveldb.Open(stor, nil); err != nil {
return err
} else {
defer func() {
stor.Close()
db.Close()
}()

ss := &Snapshot{
Version: Version,
TotalMembers: memberCounter(),
}
marshal := ss.Digest().Marshal()
return db.Put([]byte(SnapshotKey), marshal, nil)
}
}
}
Loading

0 comments on commit be15c21

Please sign in to comment.