Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added version information to peers storage. #532

Merged
merged 1 commit into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 90 additions & 16 deletions pkg/node/peer_manager/storage/cbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
peersStorageDir = "peers_storage"
// if you change peers storage data format, you have to increment peersStorageCurrentVersion
peersStorageCurrentVersion = 1
peersStorageDir = "peers_storage"
)

type CBORStorage struct {
Expand All @@ -27,10 +31,10 @@ type CBORStorage struct {

func NewCBORStorage(baseDir string, now time.Time) (*CBORStorage, error) {
storageDir := filepath.Join(baseDir, peersStorageDir)
return newCBORStorageInDir(storageDir, now)
return newCBORStorageInDir(storageDir, now, peersStorageCurrentVersion)
}

func newCBORStorageInDir(storageDir string, now time.Time) (*CBORStorage, error) {
func newCBORStorageInDir(storageDir string, now time.Time, currVersion int) (*CBORStorage, error) {
if err := os.MkdirAll(storageDir, os.ModePerm); err != nil {
return nil, errors.Wrapf(err, "failed to create peers storage directory %q", storageDir)
}
Expand All @@ -39,30 +43,50 @@ func newCBORStorageInDir(storageDir string, now time.Time) (*CBORStorage, error)
if err := createFileIfNotExist(knownFile); err != nil {
return nil, errors.Wrap(err, "failed to create known peers storage file")
}

suspendedFile := suspendedFilePath(storageDir)
if err := createFileIfNotExist(suspendedFile); err != nil {
return nil, errors.Wrap(err, "failed to create suspended peers storage file")
}

known := knownPeers{}
if err := unmarshalCborFromFile(knownFile, &known); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load known peers from file %q", knownFile)
}

suspended := suspendedPeers{}
if err := unmarshalCborFromFile(suspendedFile, &suspended); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load suspended peers from file %q", suspendedFile)
}

storage := &CBORStorage{
storageDir: storageDir,
suspended: suspended,
suspended: suspendedPeers{},
suspendedFilePath: suspendedFile,
known: known,
known: knownPeers{},
knownFilePath: knownFile,
}

versionFile := storageVersionFilePath(storageDir)
oldVersion, err := getPeersStorageVersion(versionFile)
switch {
case err == io.EOF:
// Empty version file, set new version
if err := storage.invalidateStorageAndUpdateVersion(versionFile, currVersion, oldVersion); err != nil {
return nil, errors.Wrap(err, "failed set version to new peers storage")
}
case err != nil:
return nil, errors.Wrap(err, "failed to validate peers storage version")
}

if oldVersion != currVersion {
// Invalidating old peers storage
zap.S().Debugf(
"Detected different peers storage versions: old='%d', current='%d'. Removing old peers storage.",
oldVersion,
currVersion,
)
if err := storage.invalidateStorageAndUpdateVersion(versionFile, currVersion, oldVersion); err != nil {
return nil, errors.Wrap(err, "failed invalidate storage and set new version to peers storage")
}
}

if err := unmarshalCborFromFile(knownFile, &storage.known); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load known peers from file %q", knownFile)
}
if err := unmarshalCborFromFile(suspendedFile, &storage.suspended); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to load suspended peers from file %q", suspendedFile)
}

if len(storage.suspended) != 0 {
// Remove expired peers
if err := storage.RefreshSuspended(now); err != nil {
Expand Down Expand Up @@ -274,6 +298,24 @@ func (bs *CBORStorage) DropStorage() error {
return nil
}

func (bs *CBORStorage) invalidateStorageAndUpdateVersion(versionFile string, currVersion, oldVersion int) error {
if err := bs.DropStorage(); err != nil {
return errors.Wrapf(err,
"failed to drop peers storage in case of different versions, old='%d', current='%d'",
oldVersion,
currVersion,
)
}
if err := updatePeersStorageVersion(versionFile, currVersion); err != nil {
return errors.Wrapf(err,
"failed to update peers storage file, old='%d', current='%d'",
oldVersion,
currVersion,
)
}
return nil
}

func (bs *CBORStorage) unsafeSyncKnown(newEntries []KnownPeer, backup knownPeers) error {
if err := marshalToCborAndSyncToFile(bs.knownFilePath, bs.known); err != nil {
// In case of failure restore initial state from backup
Expand Down Expand Up @@ -391,6 +433,10 @@ func suspendedFilePath(storageDir string) string {
return filepath.Join(storageDir, "peers_suspended.cbor")
}

func storageVersionFilePath(storageDir string) string {
return filepath.Join(storageDir, "peers_storage_version.txt")
}

func createFileIfNotExist(path string) (err error) {
knownFile, err := os.OpenFile(path, os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
Expand All @@ -403,3 +449,31 @@ func createFileIfNotExist(path string) (err error) {
}()
return nil
}

func updatePeersStorageVersion(storageVersionFile string, newVersion int) error {
stringVersion := strconv.Itoa(newVersion)
err := ioutil.WriteFile(storageVersionFile, []byte(stringVersion), 0644)
if err != nil {
return errors.Wrapf(err, "failed to write data in file %q", storageVersionFile)
}
return nil
}

func getPeersStorageVersion(storageVersionFile string) (int, error) {
if err := createFileIfNotExist(storageVersionFile); err != nil {
return 0, errors.Wrap(err, "failed to create if not exists storage version file")
}
versionData, err := ioutil.ReadFile(storageVersionFile)
if err != nil {
return 0, errors.Wrapf(err, "failed to read from file %q", storageVersionFile)
}
if len(versionData) == 0 {
// it's a new peers storage
return 0, io.EOF
}
oldVersion, err := strconv.Atoi(string(versionData))
if err != nil {
return 0, errors.Wrapf(err, "failed to parse peers storage version from file %q", storageVersionFile)
}
return oldVersion, nil
}
33 changes: 30 additions & 3 deletions pkg/node/peer_manager/storage/cbor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *binaryStorageCborSuite) SetupTest() {
}
}()
now := time.Now()
storage, err := newCBORStorageInDir(tmpdir, now)
storage, err := newCBORStorageInDir(tmpdir, now, peersStorageCurrentVersion)
require.NoError(s.T(), err)

s.storage = storage
Expand Down Expand Up @@ -297,7 +297,7 @@ func (s *binaryStorageCborSuite) TestCBORStorageSuspended() {
}()

newNow := now.Add(suspendDuration)
storage, err := newCBORStorageInDir(s.storage.storageDir, newNow)
storage, err := newCBORStorageInDir(s.storage.storageDir, newNow, peersStorageCurrentVersion)
require.NoError(s.T(), err)
s.storage = storage

Expand Down Expand Up @@ -333,7 +333,7 @@ func (s *binaryStorageCborSuite) TestCBORStorageSuspended() {
})
}

func (s *binaryStorageCborSuite) TestCBORStorageDrops() {
func (s *binaryStorageCborSuite) TestCBORStorageDropsAndVersioning() {
suspendDuration := time.Minute * 5
now := s.now.Truncate(time.Millisecond)
suspended := []SuspendedPeer{
Expand Down Expand Up @@ -422,4 +422,31 @@ func (s *binaryStorageCborSuite) TestCBORStorageDrops() {
checkSuspendedStorageFile()
checkKnownStorageFile()
})

s.Run("drop peers storage in case of different version", func() {
versionFilePath := storageVersionFilePath(s.storage.storageDir)
defer func() {
storage, err := newCBORStorageInDir(s.storage.storageDir, s.now, peersStorageCurrentVersion)
require.NoError(s.T(), err)
s.storage = storage

version, err := getPeersStorageVersion(versionFilePath)
require.NoError(s.T(), err)
require.Equal(s.T(), peersStorageCurrentVersion, version)

require.NoError(s.T(), s.storage.AddSuspended(suspended))
require.NoError(s.T(), s.storage.AddKnown(known))
}()

storage, err := newCBORStorageInDir(s.storage.storageDir, s.now, -1)
require.NoError(s.T(), err)
s.storage = storage

version, err := getPeersStorageVersion(versionFilePath)
require.NoError(s.T(), err)
require.Equal(s.T(), -1, version)

checkSuspendedStorageFile()
checkKnownStorageFile()
})
}