Skip to content

Commit

Permalink
Fix/put on busy shards (#2965)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Nov 22, 2024
2 parents d0b3744 + 3024773 commit 2bb903c
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- `meta.DB.Open(readOnly)` moves metabase in RO mode (#3000)
- Panic in event listener related to inability to switch RPC node (#2970)
- Non-container nodes never check placement policy on PUT, SEARCH requests (#3014)
- If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871)

### Changed
- `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901)
Expand Down Expand Up @@ -60,6 +61,11 @@ Metabase version has been increased, auto migrating will be performed once
a v0.44.0 Storage Node is started with a v0.43.0 metabase. This action can
not be undone. No additional work should be done.

The new `storage.put_retry_timeout` config value added. If an object cannot
be PUT to storage, node tries to PUT it to the best shard for it (according to
placement sorting) and only to it for this long before operation error is
returned.

Binary keys are no longer supported by storage node, NEP-6 wallet support was
introduced in version 0.22.3 and support for binary keys was removed from
other components in 0.33.0 and 0.37.0. Please migrate to wallets (see 0.37.0
Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []storage.ShardCfg
isIgnoreUninitedShards bool
objectPutRetryDeadline time.Duration
}

policer struct {
Expand Down Expand Up @@ -159,6 +160,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.engine.shardPoolSize = engineconfig.ShardPoolSize(c)
a.engine.isIgnoreUninitedShards = engineconfig.IgnoreUninitedShards(c)
a.engine.objectPutRetryDeadline = engineconfig.ObjectPutRetryDeadline(c)

// Morph

Expand Down
8 changes: 8 additions & 0 deletions cmd/neofs-node/config/engine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package engineconfig
import (
"errors"
"strconv"
"time"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
Expand Down Expand Up @@ -90,3 +91,10 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func IgnoreUninitedShards(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "ignore_uninited_shards")
}

// ObjectPutRetryDeadline returns the value of "put_retry_deadline" config parameter from "storage" section.
//
// Returns false if the value is missing.
func ObjectPutRetryDeadline(c *config.Config) time.Duration {
return config.DurationSafe(c.Sub(subsection), "put_retry_timeout")
}
2 changes: 2 additions & 0 deletions cmd/neofs-node/config/engine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.Zero(t, engineconfig.ObjectPutRetryDeadline(empty))
})

const path = "../../../../config/example/node"
Expand All @@ -46,6 +47,7 @@ func TestEngineSection(t *testing.T) {

require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, 5*time.Second, engineconfig.ObjectPutRetryDeadline(c))
require.EqualValues(t, true, engineconfig.IgnoreUninitedShards(c))

err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
Expand Down
7 changes: 4 additions & 3 deletions cmd/neofs-node/config/internal/validate/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ type valideConfig struct {
} `mapstructure:"object"`

Storage struct {
ShardPoolSize int `mapstructure:"shard_pool_size"`
ShardROErrorThreshold int `mapstructure:"shard_ro_error_threshold"`
IgnoreUninitedShards bool `mapstructure:"ignore_uninited_shards"`
ShardPoolSize int `mapstructure:"shard_pool_size"`
ShardROErrorThreshold int `mapstructure:"shard_ro_error_threshold"`
PutRetryTimeout time.Duration `mapstructure:"put_retry_timeout"`
IgnoreUninitedShards bool `mapstructure:"ignore_uninited_shards"`
Shard struct {
Default shardDetails `mapstructure:"default"`
ShardList map[string]shardDetails `mapstructure:",remain" prefix:""`
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (c *cfg) engineOpts() []engine.Option {

engine.WithLogger(c.log),
engine.WithIgnoreUninitedShards(c.engine.isIgnoreUninitedShards),
engine.WithObjectPutRetryTimeout(c.engine.objectPutRetryDeadline),
)

if c.shared.basics.ttl > 0 {
Expand Down
1 change: 1 addition & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ NEOFS_OBJECT_PUT_POOL_SIZE_REMOTE=100

# Storage engine section
NEOFS_STORAGE_SHARD_POOL_SIZE=15
NEOFS_STORAGE_PUT_RETRY_TIMEOUT=5s
NEOFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
NEOFS_STORAGE_IGNORE_UNINITED_SHARDS=true
## 0 shard
Expand Down
1 change: 1 addition & 0 deletions config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,
"put_retry_timeout": "5s",
"ignore_uninited_shards": true,
"shard": {
"0": {
Expand Down
1 change: 1 addition & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object:
storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
put_retry_timeout: 5s # object PUT retry timeout
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
ignore_uninited_shards: true # do we need to ignore uninited shards (default: false, fail on any shard failure)

Expand Down
13 changes: 7 additions & 6 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ morph:

Local storage engine configuration.

| Parameter | Type | Default value | Description |
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
| Parameter | Type | Default value | Description |
|----------------------------|------------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `ignore_uninited_shards` | `bool` | `false` | Flag that specifies whether uninited shards should be ignored. |
| `put_retry_deadline` | `duration` | `0` | If an object cannot be PUT to storage, node tries to PUT it to the best shard for it (according to placement sorting) and only to it for this long before operation error is returned. Defalt value does not apply any retry policy at all. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |

## `shard` subsection

Expand Down
13 changes: 12 additions & 1 deletion pkg/local_object_storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"sync"
"sync/atomic"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
Expand Down Expand Up @@ -197,7 +198,8 @@ type cfg struct {

metrics MetricRegister

shardPoolSize uint32
objectPutTimeout time.Duration
shardPoolSize uint32

containerSource container.Source

Expand Down Expand Up @@ -271,3 +273,12 @@ func WithIgnoreUninitedShards(flag bool) Option {
c.isIgnoreUninitedShards = flag
}
}

// WithObjectPutRetryTimeout return an option to specify time for object PUT operation.
// It does not stop any disk operation, only affects retryes policy. Zero value
// is acceptable and means no retry on any shard.
func WithObjectPutRetryTimeout(t time.Duration) Option {
return func(c *cfg) {
c.objectPutTimeout = t
}
}
2 changes: 1 addition & 1 deletion pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
putDone, exists, _ := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
Expand Down
62 changes: 53 additions & 9 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"errors"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -46,29 +48,45 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er
return err
}

var bestShard shardWrapper
var bestPool util.WorkerPool

for i, sh := range e.sortedShards(addr) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
if ok && bestPool == nil {
bestShard = sh
bestPool = pool
}
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
continue
}

putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
putDone, exists, _ := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen)
if putDone || exists {
return nil
}
}

e.log.Debug("failed to put object to shards, trying the best one more",
zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID()))

if e.objectPutTimeout > 0 && e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, obj, objBin, hdrLen) {
return nil
}

return errPutShard
}

// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) {
var putSuccess, alreadyExists bool
// Third return value is true iff object cannot be put because of max concurrent load.
func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool, bool) {
var putSuccess, alreadyExists, overloaded bool
id := sh.ID()

exitCh := make(chan struct{})

Expand All @@ -82,7 +100,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo
if err != nil {
e.log.Warn("object put: check object existence",
zap.Stringer("addr", addr),
zap.Stringer("shard", sh.ID()),
zap.Stringer("shard", id),
zap.Error(err))

if shard.IsErrObjectExpired(err) {
Expand All @@ -103,14 +121,14 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo
_, err = sh.ToMoveIt(toMoveItPrm)
if err != nil {
e.log.Warn("could not mark object for shard relocation",
zap.Stringer("shard", sh.ID()),
zap.Stringer("shard", id),
zap.String("error", err.Error()),
)
}
}

e.log.Debug("object put: object already exists",
zap.Stringer("shard", sh.ID()),
zap.Stringer("shard", id),
zap.Stringer("addr", addr))

return
Expand All @@ -127,7 +145,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn("could not put object to shard",
zap.Stringer("shard_id", sh.ID()),
zap.Stringer("shard_id", id),
zap.String("error", err.Error()))
return
}
Expand All @@ -138,11 +156,37 @@ func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPoo

putSuccess = true
}); err != nil {
e.log.Warn("object put: pool task submitting", zap.Error(err))
e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err))
overloaded = errors.Is(err, ants.ErrPoolOverload)
close(exitCh)
}

<-exitCh

return putSuccess, alreadyExists
return putSuccess, alreadyExists, overloaded
}

func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) bool {
timer := time.NewTimer(e.cfg.objectPutTimeout)
defer timer.Stop()

const putCooldown = 100 * time.Millisecond
ticker := time.NewTicker(putCooldown)
defer ticker.Stop()

for {
select {
case <-timer.C:
e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.cfg.objectPutTimeout))
return false
case <-ticker.C:
putDone, exists, overloaded := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen)
if overloaded {
ticker.Reset(putCooldown)
continue
}

return putDone || exists
}
}
}

0 comments on commit 2bb903c

Please sign in to comment.