From 83614cc793ba1eb63b144b93abe4b92c06091cb0 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Wed, 18 Aug 2021 23:35:46 +0300 Subject: [PATCH] Added version information to peers storage. --- pkg/node/peer_manager/storage/cbor.go | 107 ++++++++++++++++++--- pkg/node/peer_manager/storage/cbor_test.go | 33 ++++++- 2 files changed, 121 insertions(+), 19 deletions(-) diff --git a/pkg/node/peer_manager/storage/cbor.go b/pkg/node/peer_manager/storage/cbor.go index a76afd8c69..5fa88e48f0 100644 --- a/pkg/node/peer_manager/storage/cbor.go +++ b/pkg/node/peer_manager/storage/cbor.go @@ -5,18 +5,23 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "sync" "time" "github.com/fxamacker/cbor/v2" "github.com/pkg/errors" + "go.uber.org/zap" ) const ( - peersStorageDir = "peers_storage" + // if you change peers storage data format, you have to increment peersStorageCurrentVersion + peersStorageCurrentVersion = 1 + peersStorageDir = "peers_storage" ) type CBORStorage struct { + version int rwMutex sync.RWMutex storageDir string suspended suspendedPeers @@ -27,10 +32,10 @@ type CBORStorage struct { func NewCBORStorage(baseDir string, now time.Time) (*CBORStorage, error) { storageDir := filepath.Join(baseDir, peersStorageDir) - return newCBORStorageInDir(storageDir, now) + return newCBORStorageInDir(storageDir, now, peersStorageCurrentVersion) } -func newCBORStorageInDir(storageDir string, now time.Time) (*CBORStorage, error) { +func newCBORStorageInDir(storageDir string, now time.Time, currVersion int) (*CBORStorage, error) { if err := os.MkdirAll(storageDir, os.ModePerm); err != nil { return nil, errors.Wrapf(err, "failed to create peers storage directory %q", storageDir) } @@ -39,30 +44,50 @@ func newCBORStorageInDir(storageDir string, now time.Time) (*CBORStorage, error) if err := createFileIfNotExist(knownFile); err != nil { return nil, errors.Wrap(err, "failed to create known peers storage file") } - suspendedFile := suspendedFilePath(storageDir) if err := createFileIfNotExist(suspendedFile); err != nil { return nil, errors.Wrap(err, "failed to create suspended peers storage file") } - known := knownPeers{} - if err := unmarshalCborFromFile(knownFile, &known); err != nil && err != io.EOF { - return nil, errors.Wrapf(err, "failed to load known peers from file %q", knownFile) - } - - suspended := suspendedPeers{} - if err := unmarshalCborFromFile(suspendedFile, &suspended); err != nil && err != io.EOF { - return nil, errors.Wrapf(err, "failed to load suspended peers from file %q", suspendedFile) - } - storage := &CBORStorage{ storageDir: storageDir, - suspended: suspended, + suspended: suspendedPeers{}, suspendedFilePath: suspendedFile, - known: known, + known: knownPeers{}, knownFilePath: knownFile, } + versionFile := storageVersionFilePath(storageDir) + oldVersion, err := getPeersStorageVersion(versionFile) + switch { + case err == io.EOF: + // Empty version file, set new version + if err := storage.invalidateStorageAndUpdateVersion(versionFile, currVersion, oldVersion); err != nil { + return nil, errors.Wrap(err, "failed set version to new peers storage") + } + case err != nil: + return nil, errors.Wrap(err, "failed to validate peers storage version") + } + + if oldVersion != currVersion { + // Invalidating old peers storage + zap.S().Debugf( + "Detected different peers storage versions: old='%d', current='%d'. Removing old peers storage.", + oldVersion, + currVersion, + ) + if err := storage.invalidateStorageAndUpdateVersion(versionFile, currVersion, oldVersion); err != nil { + return nil, errors.Wrap(err, "failed invalidate storage and set new version to peers storage") + } + } + + if err := unmarshalCborFromFile(knownFile, &storage.known); err != nil && err != io.EOF { + return nil, errors.Wrapf(err, "failed to load known peers from file %q", knownFile) + } + if err := unmarshalCborFromFile(suspendedFile, &storage.suspended); err != nil && err != io.EOF { + return nil, errors.Wrapf(err, "failed to load suspended peers from file %q", suspendedFile) + } + if len(storage.suspended) != 0 { // Remove expired peers if err := storage.RefreshSuspended(now); err != nil { @@ -274,6 +299,24 @@ func (bs *CBORStorage) DropStorage() error { return nil } +func (bs *CBORStorage) invalidateStorageAndUpdateVersion(versionFile string, currVersion, oldVersion int) error { + if err := bs.DropStorage(); err != nil { + return errors.Wrapf(err, + "failed to drop peers storage in case of different versions, old='%d', current='%d'", + oldVersion, + currVersion, + ) + } + if err := updatePeersStorageVersion(versionFile, currVersion); err != nil { + return errors.Wrapf(err, + "failed to update peers storage file, old='%d', current='%d'", + oldVersion, + currVersion, + ) + } + return nil +} + func (bs *CBORStorage) unsafeSyncKnown(newEntries []KnownPeer, backup knownPeers) error { if err := marshalToCborAndSyncToFile(bs.knownFilePath, bs.known); err != nil { // In case of failure restore initial state from backup @@ -391,6 +434,10 @@ func suspendedFilePath(storageDir string) string { return filepath.Join(storageDir, "peers_suspended.cbor") } +func storageVersionFilePath(storageDir string) string { + return filepath.Join(storageDir, "peers_storage_version.txt") +} + func createFileIfNotExist(path string) (err error) { knownFile, err := os.OpenFile(path, os.O_RDONLY|os.O_CREATE, 0644) if err != nil { @@ -403,3 +450,31 @@ func createFileIfNotExist(path string) (err error) { }() return nil } + +func updatePeersStorageVersion(storageVersionFile string, newVersion int) error { + stringVersion := strconv.Itoa(newVersion) + err := ioutil.WriteFile(storageVersionFile, []byte(stringVersion), 0644) + if err != nil { + return errors.Wrapf(err, "failed to write data in file %q", storageVersionFile) + } + return nil +} + +func getPeersStorageVersion(storageVersionFile string) (int, error) { + if err := createFileIfNotExist(storageVersionFile); err != nil { + return 0, errors.Wrap(err, "failed to create if not exists storage version file") + } + versionData, err := ioutil.ReadFile(storageVersionFile) + if err != nil { + return 0, errors.Wrapf(err, "failed to read from file %q", storageVersionFile) + } + if len(versionData) == 0 { + // it's a new peers storage + return 0, io.EOF + } + oldVersion, err := strconv.Atoi(string(versionData)) + if err != nil { + return 0, errors.Wrapf(err, "failed to parse peers storage version from file %q", storageVersionFile) + } + return oldVersion, nil +} diff --git a/pkg/node/peer_manager/storage/cbor_test.go b/pkg/node/peer_manager/storage/cbor_test.go index 2d0df4fb02..002d7da167 100644 --- a/pkg/node/peer_manager/storage/cbor_test.go +++ b/pkg/node/peer_manager/storage/cbor_test.go @@ -98,7 +98,7 @@ func (s *binaryStorageCborSuite) SetupTest() { } }() now := time.Now() - storage, err := newCBORStorageInDir(tmpdir, now) + storage, err := newCBORStorageInDir(tmpdir, now, peersStorageCurrentVersion) require.NoError(s.T(), err) s.storage = storage @@ -297,7 +297,7 @@ func (s *binaryStorageCborSuite) TestCBORStorageSuspended() { }() newNow := now.Add(suspendDuration) - storage, err := newCBORStorageInDir(s.storage.storageDir, newNow) + storage, err := newCBORStorageInDir(s.storage.storageDir, newNow, peersStorageCurrentVersion) require.NoError(s.T(), err) s.storage = storage @@ -333,7 +333,7 @@ func (s *binaryStorageCborSuite) TestCBORStorageSuspended() { }) } -func (s *binaryStorageCborSuite) TestCBORStorageDrops() { +func (s *binaryStorageCborSuite) TestCBORStorageDropsAndVersioning() { suspendDuration := time.Minute * 5 now := s.now.Truncate(time.Millisecond) suspended := []SuspendedPeer{ @@ -422,4 +422,31 @@ func (s *binaryStorageCborSuite) TestCBORStorageDrops() { checkSuspendedStorageFile() checkKnownStorageFile() }) + + s.Run("drop peers storage in case of different version", func() { + versionFilePath := storageVersionFilePath(s.storage.storageDir) + defer func() { + storage, err := newCBORStorageInDir(s.storage.storageDir, s.now, peersStorageCurrentVersion) + require.NoError(s.T(), err) + s.storage = storage + + version, err := getPeersStorageVersion(versionFilePath) + require.NoError(s.T(), err) + require.Equal(s.T(), peersStorageCurrentVersion, version) + + require.NoError(s.T(), s.storage.AddSuspended(suspended)) + require.NoError(s.T(), s.storage.AddKnown(known)) + }() + + storage, err := newCBORStorageInDir(s.storage.storageDir, s.now, -1) + require.NoError(s.T(), err) + s.storage = storage + + version, err := getPeersStorageVersion(versionFilePath) + require.NoError(s.T(), err) + require.Equal(s.T(), -1, version) + + checkSuspendedStorageFile() + checkKnownStorageFile() + }) }