-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cmd: add tool to migrate objects from Peapod to FSTree
Add `cmd/peapod-to-fstree` application which accepts YAML configuration file of the storage node and, for each configured shard, overtakes data from Peapod to FSTree that has already been configured or created in the parent directory if it is not configured. The tool is going to be used for phased and safe rejection of the Peapod and the transition to FSTree. Closes #2924. Signed-off-by: Andrey Butusov <[email protected]>
- Loading branch information
Showing
2 changed files
with
310 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,291 @@ | ||
package main | ||
|
||
import ( | ||
"errors" | ||
"flag" | ||
"fmt" | ||
"io/fs" | ||
"log" | ||
"os" | ||
"path/filepath" | ||
"slices" | ||
"strings" | ||
|
||
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" | ||
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" | ||
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" | ||
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" | ||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" | ||
"gopkg.in/yaml.v3" | ||
) | ||
|
||
func main() { | ||
nodeCfgPath := flag.String("config", "", "Path to storage node's YAML configuration file") | ||
|
||
flag.Parse() | ||
|
||
if *nodeCfgPath == "" { | ||
log.Fatal("missing storage node config flag") | ||
} | ||
|
||
appCfg := config.New(config.Prm{}, config.WithConfigFile(*nodeCfgPath)) | ||
|
||
err := engineconfig.IterateShards(appCfg, false, func(sc *shardconfig.Config) error { | ||
log.Println("processing shard...") | ||
|
||
var ppd, fstr common.Storage | ||
var ppdPerm fs.FileMode | ||
storagesCfg := sc.BlobStor().Storages() | ||
|
||
for i := range storagesCfg { | ||
switch storagesCfg[i].Type() { | ||
case fstree.Type: | ||
sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) | ||
|
||
fstr = fstree.New( | ||
fstree.WithPath(storagesCfg[i].Path()), | ||
fstree.WithPerm(storagesCfg[i].Perm()), | ||
fstree.WithDepth(sub.Depth()), | ||
fstree.WithNoSync(sub.NoSync()), | ||
fstree.WithCombinedCountLimit(sub.CombinedCountLimit()), | ||
fstree.WithCombinedSizeLimit(sub.CombinedSizeLimit()), | ||
fstree.WithCombinedSizeThreshold(sub.CombinedSizeThreshold()), | ||
fstree.WithCombinedWriteInterval(storagesCfg[i].FlushInterval()), | ||
) | ||
|
||
case peapod.Type: | ||
ppdPerm = storagesCfg[i].Perm() | ||
ppd = peapod.New(storagesCfg[i].Path(), ppdPerm, storagesCfg[i].FlushInterval()) | ||
default: | ||
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) | ||
} | ||
} | ||
|
||
if ppd == nil { | ||
log.Println("Peapod is not configured for the current shard, going to next one...") | ||
return nil | ||
} | ||
|
||
ppdPath := ppd.Path() | ||
if !filepath.IsAbs(ppdPath) { | ||
log.Fatalf("Peapod path '%s' is not absolute, make it like this in the config file first\n", ppdPath) | ||
} | ||
|
||
if fstr == nil { | ||
fstr = fstree.New( | ||
fstree.WithPath(filepath.Join(filepath.Dir(ppdPath), "blobstore")), | ||
fstree.WithPerm(ppdPerm), | ||
) | ||
} | ||
|
||
var compressCfg compression.Config | ||
compressCfg.Enabled = sc.Compress() | ||
compressCfg.UncompressableContentTypes = sc.UncompressableContentTypes() | ||
|
||
err := compressCfg.Init() | ||
if err != nil { | ||
log.Fatal("init compression config for the current shard: ", err) | ||
} | ||
|
||
ppd.SetCompressor(&compressCfg) | ||
fstr.SetCompressor(&compressCfg) | ||
|
||
log.Printf("migrating data from Peapod '%s' to Fstree '%s'...\n", ppd.Path(), fstr.Path()) | ||
|
||
err = common.Copy(fstr, ppd) | ||
if err != nil { | ||
log.Fatal("migration failed: ", err) | ||
} | ||
|
||
log.Println("data successfully migrated in the current shard, going to the next one...") | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
srcPath := *nodeCfgPath | ||
ss := strings.Split(srcPath, ".") | ||
ss[0] += "_fstree" | ||
|
||
dstPath := strings.Join(ss, ".") | ||
|
||
log.Printf("data successfully migrated in all shards, migrating configuration to '%s' file...\n", dstPath) | ||
|
||
err = migrateConfigToFstree(dstPath, srcPath) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
} | ||
|
||
func migrateConfigToFstree(dstPath, srcPath string) error { | ||
fData, err := os.ReadFile(srcPath) | ||
if err != nil { | ||
return fmt.Errorf("read source config config file: %w", err) | ||
} | ||
|
||
var mConfig map[any]any | ||
|
||
err = yaml.Unmarshal(fData, &mConfig) | ||
if err != nil { | ||
return fmt.Errorf("decode config from YAML: %w", err) | ||
} | ||
|
||
v, ok := mConfig["storage"] | ||
if !ok { | ||
return errors.New("missing 'storage' section") | ||
} | ||
|
||
mStorage, ok := v.(map[string]any) | ||
if !ok { | ||
return fmt.Errorf("unexpected 'storage' section type: %T instead of %T", v, mStorage) | ||
} | ||
|
||
v, ok = mStorage["shard"] | ||
if !ok { | ||
return errors.New("missing 'storage.shard' section") | ||
} | ||
|
||
mShards, ok := v.(map[any]any) | ||
if !ok { | ||
return fmt.Errorf("unexpected 'storage.shard' section type: %T instead of %T", v, mShards) | ||
} | ||
|
||
replacePeapodWithFstree := func(mShard map[string]any, shardDesc any) error { | ||
v, ok := mShard["blobstor"] | ||
if !ok { | ||
return fmt.Errorf("missing 'blobstor' section in shard '%v' config", shardDesc) | ||
} | ||
|
||
sBlobStor, ok := v.([]any) | ||
if !ok { | ||
return fmt.Errorf("unexpected 'blobstor' section type in shard '%v': %T instead of %T", shardDesc, v, sBlobStor) | ||
} | ||
|
||
var ppdSubStorage map[string]any | ||
var ppdSubStorageIndex int | ||
var fstreeExist bool | ||
|
||
for i := range sBlobStor { | ||
mSubStorage, ok := sBlobStor[i].(map[string]any) | ||
if !ok { | ||
return fmt.Errorf("unexpected sub-storage #%d type in shard '%v': %T instead of %T", i, shardDesc, v, mStorage) | ||
} | ||
|
||
v, ok := mSubStorage["type"] | ||
if ok { | ||
typ, ok := v.(string) | ||
if !ok { | ||
return fmt.Errorf("unexpected type of sub-storage name: %T instead of %T", v, typ) | ||
} | ||
|
||
if typ == peapod.Type { | ||
ppdSubStorage = mSubStorage | ||
ppdSubStorageIndex = i | ||
} | ||
|
||
if typ == fstree.Type { | ||
fstreeExist = true | ||
} | ||
|
||
continue | ||
} | ||
|
||
// in 'default' section 'type' may be missing | ||
|
||
_, withDepth := mSubStorage["depth"] | ||
_, withNoSync := mSubStorage["no_sync"] | ||
_, withCountLimit := mSubStorage["combined_count_limit"] | ||
_, withSizeLimit := mSubStorage["combined_size_limit"] | ||
_, withSizeThreshold := mSubStorage["combined_size_threshold"] | ||
|
||
if withDepth || withNoSync || withCountLimit || withSizeLimit || withSizeThreshold { | ||
fstreeExist = true | ||
} else { | ||
ppdSubStorage = mSubStorage | ||
ppdSubStorageIndex = i | ||
} | ||
} | ||
|
||
if ppdSubStorage == nil { | ||
log.Printf("peapod is not configured for the shard '%v', skip\n", shardDesc) | ||
return nil | ||
} | ||
|
||
if fstreeExist { | ||
mShard["blobstor"] = slices.Delete(sBlobStor, ppdSubStorageIndex, ppdSubStorageIndex+1) | ||
return nil | ||
} | ||
|
||
for k := range ppdSubStorage { | ||
switch k { | ||
default: | ||
delete(ppdSubStorage, k) | ||
case "type", "path", "perm": | ||
} | ||
} | ||
|
||
ppdSubStorage["type"] = fstree.Type | ||
|
||
v, ok = ppdSubStorage["path"] | ||
if ok { | ||
path, ok := v.(string) | ||
if !ok { | ||
return fmt.Errorf("unexpected sub-storage path type: %T instead of %T", v, path) | ||
} | ||
|
||
ppdSubStorage["path"] = filepath.Join(filepath.Dir(path), fmt.Sprintf("blobstore%v", shardDesc)) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
v, ok = mShards["default"] | ||
if ok { | ||
mShard, ok := v.(map[string]any) | ||
if !ok { | ||
return fmt.Errorf("unexpected 'storage.shard.default' section type: %T instead of %T", v, mShard) | ||
} | ||
|
||
err = replacePeapodWithFstree(mShard, "default") | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
for i := 0; ; i++ { | ||
v, ok = mShards[i] | ||
if !ok { | ||
if i == 0 { | ||
return errors.New("missing numbered shards") | ||
} | ||
break | ||
} | ||
|
||
mShard, ok := v.(map[string]any) | ||
if !ok { | ||
return fmt.Errorf("unexpected 'storage.shard.%d' section type: %T instead of %T", i, v, mStorage) | ||
} | ||
|
||
err = replacePeapodWithFstree(mShard, i) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
data, err := yaml.Marshal(mConfig) | ||
if err != nil { | ||
return fmt.Errorf("encode modified config into YAML: %w", err) | ||
} | ||
|
||
err = os.WriteFile(dstPath, data, 0o640) | ||
if err != nil { | ||
return fmt.Errorf("write resulting config to the destination file: %w", err) | ||
} | ||
|
||
return nil | ||
} |