From 758f86cf629a282cb5eec48c2ef14e32736a8420 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 7 Oct 2024 18:06:50 +0300 Subject: [PATCH 1/3] node/engine/put: if a shard cannot handle PUT, add its ID to logs Signed-off-by: Pavel Karpy --- pkg/local_object_storage/engine/put.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 87bc077d64..483e928a17 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -69,6 +69,7 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er // Second return value is true iff object already exists. func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { var putSuccess, alreadyExists bool + id := sh.ID() exitCh := make(chan struct{}) @@ -82,7 +83,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo if err != nil { e.log.Warn("object put: check object existence", zap.Stringer("addr", addr), - zap.Stringer("shard", sh.ID()), + zap.Stringer("shard", id), zap.Error(err)) if shard.IsErrObjectExpired(err) { @@ -103,14 +104,14 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo _, err = sh.ToMoveIt(toMoveItPrm) if err != nil { e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", sh.ID()), + zap.Stringer("shard", id), zap.String("error", err.Error()), ) } } e.log.Debug("object put: object already exists", - zap.Stringer("shard", sh.ID()), + zap.Stringer("shard", id), zap.Stringer("addr", addr)) return @@ -127,7 +128,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) || errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) { e.log.Warn("could not put object to shard", - zap.Stringer("shard_id", sh.ID()), + zap.Stringer("shard_id", id), zap.String("error", err.Error())) return } @@ -138,7 +139,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo putSuccess = true }); err != nil { - e.log.Warn("object put: pool task submitting", zap.Error(err)) + e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) close(exitCh) } From c54d35304a3d42749abe11be1dc74ddc719b0674 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 7 Oct 2024 20:57:15 +0300 Subject: [PATCH 2/3] node/engine: do not skip objects if shards are busy If every shard's pool is overloaded with routines, choose the best one and try to PUT an object to it 30 seconds. Closes #2871. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/local_object_storage/engine/evacuate.go | 2 +- pkg/local_object_storage/engine/put.go | 52 +++++++++++++++++++-- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8e199d4a5..92cd1a352c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000) - Panic in event listener related to inability to switch RPC node (#2970) - Non-container nodes never check placement policy on PUT, SEARCH requests (#3014) +- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index b42edad30f..afaac70b90 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -125,7 +125,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0) + putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 483e928a17..130508d6e5 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -2,6 +2,7 @@ package engine import ( "errors" + "time" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" @@ -10,6 +11,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -46,29 +48,44 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er return err } + var bestShard shardWrapper + var bestPool util.WorkerPool + for i, sh := range e.sortedShards(addr) { e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] + if ok && bestPool == nil { + bestShard = sh + bestPool = pool + } e.mtx.RUnlock() if !ok { // Shard was concurrently removed, skip. continue } - putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen) + putDone, exists, _ := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen) if putDone || exists { return nil } } + e.log.Debug("failed to put object to shards, trying the best one more", + zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) + + if e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) { + return nil + } + return errPutShard } // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { - var putSuccess, alreadyExists bool +// Third return value is true iff object cannot be put because of max concurrent load. +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool, bool) { + var putSuccess, alreadyExists, overloaded bool id := sh.ID() exitCh := make(chan struct{}) @@ -140,10 +157,37 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo putSuccess = true }); err != nil { e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) + overloaded = errors.Is(err, ants.ErrPoolOverload) close(exitCh) } <-exitCh - return putSuccess, alreadyExists + return putSuccess, alreadyExists, overloaded +} + +func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool { + var deadline = 30 * time.Second + timer := time.NewTimer(deadline) + defer timer.Stop() + + const putCooldown = 100 * time.Millisecond + ticker := time.NewTicker(putCooldown) + defer ticker.Stop() + + for { + select { + case <-timer.C: + e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", deadline)) + return false + case <-ticker.C: + putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen) + if overloaded { + ticker.Reset(putCooldown) + continue + } + + return putDone || exists + } + } } From 3024773ca246af3329c545168de7e6dc70463557 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 22 Nov 2024 04:39:05 +0300 Subject: [PATCH 3/3] node/engine: make object PUT retry timeout configurable Missing (zero) value just keeps the old behavior (no retries at all). Signed-off-by: Pavel Karpy --- CHANGELOG.md | 5 +++++ cmd/neofs-node/config.go | 2 ++ cmd/neofs-node/config/engine/config.go | 8 ++++++++ cmd/neofs-node/config/engine/config_test.go | 2 ++ cmd/neofs-node/config/internal/validate/config.go | 7 ++++--- cmd/neofs-node/storage.go | 1 + config/example/node.env | 1 + config/example/node.json | 1 + config/example/node.yaml | 1 + docs/storage-node-configuration.md | 13 +++++++------ pkg/local_object_storage/engine/engine.go | 13 ++++++++++++- pkg/local_object_storage/engine/put.go | 7 +++---- 12 files changed, 47 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92cd1a352c..3e9ec8033e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,11 @@ Metabase version has been increased, auto migrating will be performed once a v0.44.0 Storage Node is started with a v0.43.0 metabase. This action can not be undone. No additional work should be done. +The new `storage.put_retry_timeout` config value added. If an object cannot +be PUT to storage, node tries to PUT it to the best shard for it (according to +placement sorting) and only to it for this long before operation error is +returned. + Binary keys are no longer supported by storage node, NEP-6 wallet support was introduced in version 0.22.3 and support for binary keys was removed from other components in 0.33.0 and 0.37.0. Please migrate to wallets (see 0.37.0 diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 789e4403ce..18fe5611e4 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -96,6 +96,7 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []storage.ShardCfg isIgnoreUninitedShards bool + objectPutRetryDeadline time.Duration } policer struct { @@ -159,6 +160,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c) a.engine.shardPoolSize = engineconfig.ShardPoolSize(c) a.engine.isIgnoreUninitedShards = engineconfig.IgnoreUninitedShards(c) + a.engine.objectPutRetryDeadline = engineconfig.ObjectPutRetryDeadline(c) // Morph diff --git a/cmd/neofs-node/config/engine/config.go b/cmd/neofs-node/config/engine/config.go index 0d53db3a65..e22a388cdc 100644 --- a/cmd/neofs-node/config/engine/config.go +++ b/cmd/neofs-node/config/engine/config.go @@ -3,6 +3,7 @@ package engineconfig import ( "errors" "strconv" + "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" @@ -90,3 +91,10 @@ func ShardErrorThreshold(c *config.Config) uint32 { func IgnoreUninitedShards(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "ignore_uninited_shards") } + +// ObjectPutRetryDeadline returns the value of "put_retry_deadline" config parameter from "storage" section. +// +// Returns false if the value is missing. +func ObjectPutRetryDeadline(c *config.Config) time.Duration { + return config.DurationSafe(c.Sub(subsection), "put_retry_timeout") +} diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index f244ca53d6..e67050a5c1 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -37,6 +37,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) + require.Zero(t, engineconfig.ObjectPutRetryDeadline(empty)) }) const path = "../../../../config/example/node" @@ -46,6 +47,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) + require.EqualValues(t, 5*time.Second, engineconfig.ObjectPutRetryDeadline(c)) require.EqualValues(t, true, engineconfig.IgnoreUninitedShards(c)) err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { diff --git a/cmd/neofs-node/config/internal/validate/config.go b/cmd/neofs-node/config/internal/validate/config.go index 5c6d8f609e..0181a3efeb 100644 --- a/cmd/neofs-node/config/internal/validate/config.go +++ b/cmd/neofs-node/config/internal/validate/config.go @@ -117,9 +117,10 @@ type valideConfig struct { } `mapstructure:"object"` Storage struct { - ShardPoolSize int `mapstructure:"shard_pool_size"` - ShardROErrorThreshold int `mapstructure:"shard_ro_error_threshold"` - IgnoreUninitedShards bool `mapstructure:"ignore_uninited_shards"` + ShardPoolSize int `mapstructure:"shard_pool_size"` + ShardROErrorThreshold int `mapstructure:"shard_ro_error_threshold"` + PutRetryTimeout time.Duration `mapstructure:"put_retry_timeout"` + IgnoreUninitedShards bool `mapstructure:"ignore_uninited_shards"` Shard struct { Default shardDetails `mapstructure:"default"` ShardList map[string]shardDetails `mapstructure:",remain" prefix:""` diff --git a/cmd/neofs-node/storage.go b/cmd/neofs-node/storage.go index dd6c79deb9..4be499d51d 100644 --- a/cmd/neofs-node/storage.go +++ b/cmd/neofs-node/storage.go @@ -84,6 +84,7 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithLogger(c.log), engine.WithIgnoreUninitedShards(c.engine.isIgnoreUninitedShards), + engine.WithObjectPutRetryTimeout(c.engine.objectPutRetryDeadline), ) if c.shared.basics.ttl > 0 { diff --git a/config/example/node.env b/config/example/node.env index f657b07226..ad421124eb 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -89,6 +89,7 @@ NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100 # Storage engine section NEOFS_STORAGE_SHARD_POOL_SIZE=15 +NEOFS_STORAGE_PUT_RETRY_TIMEOUT=5s NEOFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 NEOFS_STORAGE_IGNORE_UNINITED_SHARDS=true ## 0 shard diff --git a/config/example/node.json b/config/example/node.json index 92054143d7..72fe34fcc2 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -118,6 +118,7 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, + "put_retry_timeout": "5s", "ignore_uninited_shards": true, "shard": { "0": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 524e77b7eb..23a0df716a 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -106,6 +106,7 @@ object: storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations + put_retry_timeout: 5s # object PUT retry timeout shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) ignore_uninited_shards: true # do we need to ignore uninited shards (default: false, fail on any shard failure) diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 9dde14f3af..589c9a394f 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -153,12 +153,13 @@ morph: Local storage engine configuration. -| Parameter | Type | Default value | Description | -|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | -| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | -| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. | -| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | +| Parameter | Type | Default value | Description | +|----------------------------|------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | +| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | +| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. | +| `put_retry_deadline` | `duration` | `0` | If an object cannot be PUT to storage, node tries to PUT it to the best shard for it (according to placement sorting) and only to it for this long before operation error is returned. Defalt value does not apply any retry policy at all. | +| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | ## `shard` subsection diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 7e5f47d207..31a53950db 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -4,6 +4,7 @@ import ( "errors" "sync" "sync/atomic" + "time" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -197,7 +198,8 @@ type cfg struct { metrics MetricRegister - shardPoolSize uint32 + objectPutTimeout time.Duration + shardPoolSize uint32 containerSource container.Source @@ -271,3 +273,12 @@ func WithIgnoreUninitedShards(flag bool) Option { c.isIgnoreUninitedShards = flag } } + +// WithObjectPutRetryTimeout return an option to specify time for object PUT operation. +// It does not stop any disk operation, only affects retryes policy. Zero value +// is acceptable and means no retry on any shard. +func WithObjectPutRetryTimeout(t time.Duration) Option { + return func(c *cfg) { + c.objectPutTimeout = t + } +} diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 130508d6e5..2fb42950da 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -73,7 +73,7 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er e.log.Debug("failed to put object to shards, trying the best one more", zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) - if e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) { + if e.objectPutTimeout > 0 && e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) { return nil } @@ -167,8 +167,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo } func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool { - var deadline = 30 * time.Second - timer := time.NewTimer(deadline) + timer := time.NewTimer(e.cfg.objectPutTimeout) defer timer.Stop() const putCooldown = 100 * time.Millisecond @@ -178,7 +177,7 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool ut for { select { case <-timer.C: - e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", deadline)) + e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.cfg.objectPutTimeout)) return false case <-ticker.C: putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen)