Skip to content

Commit

Permalink
node/grpc: Change max recv msg size on MaxObjectSize setting update
Browse files Browse the repository at this point in the history
Since storage node serves `ObjectService.Replicate` RPC, the gRPC server
must be able to accept the biggest allowed object. Previously, node
calculated global message limit for the gRPC server once on startup.
With this behavior, when network setting `MaxObjectSize` was increased,
the node stopped accepting write objects larger than the previous limit.
This manifested itself in a denial of replication service.

From now storage node updates max received gRPC message size (if
needed) on each refresh of the `MaxObjectSize` setting cache and via
Netmap contract's polling done once per minute.

Refs #2910.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Aug 8, 2024
1 parent 8d0ac07 commit 19f39df
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 38 deletions.
26 changes: 21 additions & 5 deletions cmd/neofs-node/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,30 @@ type ttlMaxObjectSizeCache struct {
lastUpdated time.Time
lastSize uint64
src putsvc.MaxSizeSource
onChange func(uint64)
}

func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource, onChange func(uint64)) *ttlMaxObjectSizeCache {

Check warning on line 452 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L452

Added line #L452 was not covered by tests
return &ttlMaxObjectSizeCache{
src: src,
src: src,
onChange: onChange,

Check warning on line 455 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L454-L455

Added lines #L454 - L455 were not covered by tests
}
}

func (c *ttlMaxObjectSizeCache) updateLastSize(sz uint64) {
if c.lastSize != sz {
c.onChange(sz)

Check warning on line 461 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L459-L461

Added lines #L459 - L461 were not covered by tests
}
c.lastSize = sz
c.lastUpdated = time.Now()

Check warning on line 464 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L463-L464

Added lines #L463 - L464 were not covered by tests
}

func (c *ttlMaxObjectSizeCache) handleNewMaxObjectPayloadSize(sz uint64) {
c.mtx.Lock()
c.updateLastSize(sz)
c.mtx.Unlock()

Check warning on line 470 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L467-L470

Added lines #L467 - L470 were not covered by tests
}

func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
const ttl = time.Second * 30

Expand All @@ -469,9 +485,9 @@ func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
c.mtx.Lock()
size = c.lastSize
if !c.lastUpdated.After(prevUpdated) {
size = c.src.MaxObjectSize()
c.lastSize = size
c.lastUpdated = time.Now()
newSize := c.src.MaxObjectSize()
c.updateLastSize(newSize)
size = newSize

Check warning on line 490 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L488-L490

Added lines #L488 - L490 were not covered by tests
}
c.mtx.Unlock()

Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ type cfgGRPC struct {
maxChunkSize uint64

maxAddrAmount uint64

maxRecvMsgSize atomic.Value // int
}

type cfgMorph struct {
Expand Down
84 changes: 54 additions & 30 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,27 @@ func initGRPC(c *cfg) {
maxObjSize, err := c.nCli.MaxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

maxRecvSize := maxObjSize
// don't forget about meta fields: object header + other ObjectService.Replicate
// request fields. For the latter, less is needed now, but it is still better to
// take with a reserve for potential protocol extensions. Anyway, 1 KB is
// nothing IRL.
const maxMetadataSize = object.MaxHeaderLen + 1<<10
if maxRecvSize < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice
maxRecvSize += maxMetadataSize
} else {
maxRecvSize = math.MaxUint64
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
maxRecvSize, math.MaxInt))
}
maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjSize)
if maxRecvSize < 0 {

Check warning on line 32 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L31-L32

Added lines #L31 - L32 were not covered by tests
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
overflowed, math.MaxInt))

Check warning on line 36 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}
c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize)

Check warning on line 38 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L38

Added line #L38 was not covered by tests
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
maxRecvMsgSizeOpt := grpc.MaxRecvMsgSizeFunc(func() int {
return c.cfgGRPC.maxRecvMsgSize.Load().(int) // initialized above, so safe
})
c.log.Info("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Int("max recv msg", maxRecvSize))

Check warning on line 45 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L41-L45

Added lines #L41 - L45 were not covered by tests

var successCount int
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
if maxRecvMsgSizeOpt != nil {
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
maxRecvMsgSizeOpt,

Check warning on line 51 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L51

Added line #L51 was not covered by tests
}

tlsCfg := sc.TLS()
Expand Down Expand Up @@ -157,3 +142,42 @@ func stopGRPC(name string, s *grpc.Server, l *zap.Logger) {

l.Info("gRPC server stopped successfully")
}

// calculates approximation for max size of the ObjectService.Replicate request
// with given object payload limit. Second value is returned when calculation
// result overflows int type. In this case, first return is negative.
func calculateMaxReplicationRequestSize(maxObjPayloadSize uint64) (int, uint64) {
res := maxObjPayloadSize
// don't forget about meta fields: object header + other ObjectService.Replicate
// request fields. For the latter, less is needed now, but it is still better to
// take with a reserve for potential protocol extensions. Anyway, 1 KB is
// nothing IRL.
const maxMetadataSize = object.MaxHeaderLen + 1<<10
if res < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice
res += maxMetadataSize
} else {
res = math.MaxUint64
}
if res > math.MaxInt {
return -1, res
}
if res < maxMsgSize { // do not decrease default value
return maxMsgSize, 0
}
return int(res), 0
}

func (c *cfg) handleNewMaxObjectPayloadSize(maxObjPayloadSize uint64) {
maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjPayloadSize)
if maxRecvSize < 0 {
// unlike a startup, we don't want to stop a running service. Moreover, this is
// just a limit: even if it has become incredibly large, most data is expected
// to be of smaller degrees
c.log.Info("max gRPC recv msg size re-calculated for new max object payload size overflows int type, fallback to max int",
zap.Uint64("calculated limit", overflowed))
maxRecvSize = math.MaxInt
}
c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize)
c.log.Info("updated max gRPC recv msg size limit after max object payload size has been changed",
zap.Uint64("new object limit", maxObjPayloadSize), zap.Int("new gRPC limit", maxRecvSize))
}
50 changes: 50 additions & 0 deletions cmd/neofs-node/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"math"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func Test_calculateMaxReplicationRequestSize(t *testing.T) {
limit, _ := calculateMaxReplicationRequestSize(64 << 20)
require.EqualValues(t, 67126272, limit)
t.Run("int overflow", func(t *testing.T) {
limit, overflow := calculateMaxReplicationRequestSize(math.MaxInt - 17<<10 + 1)
require.Negative(t, limit)
require.EqualValues(t, uint64(math.MaxInt)+1, overflow)
})
t.Run("uint64 overflow", func(t *testing.T) {
limit, overflow := calculateMaxReplicationRequestSize(math.MaxUint64 - 17<<10 + 1)
require.Negative(t, limit)
require.EqualValues(t, uint64(math.MaxUint64), overflow)
})
t.Run("smaller than gRPC default", func(t *testing.T) {
limit, _ := calculateMaxReplicationRequestSize(0)
require.EqualValues(t, 4<<20, limit)
limit, _ = calculateMaxReplicationRequestSize(4<<20 - 17<<10 - 1)
require.EqualValues(t, 4<<20, limit)
})
}

func Test_cfg_handleNewMaxObjectPayloadSize(t *testing.T) {
var c cfg
c.log = zap.NewNop()
c.cfgGRPC.maxRecvMsgSize.Store(0) // any

c.handleNewMaxObjectPayloadSize(100 << 20)
require.EqualValues(t, 100<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load())
c.handleNewMaxObjectPayloadSize(64 << 20)
require.EqualValues(t, 64<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load())
// int overflow
c.handleNewMaxObjectPayloadSize(math.MaxInt - 17<<10 + 1)
require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load())
// uint64 overflow
c.handleNewMaxObjectPayloadSize(math.MaxUint64 - 17<<10 + 1)
require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load())
// smaller than gRPC default
c.handleNewMaxObjectPayloadSize(4<<20 - 17<<10 - 1)
require.EqualValues(t, 4<<20, c.cfgGRPC.maxRecvMsgSize.Load())
}
25 changes: 25 additions & 0 deletions cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"bytes"
"context"
"errors"
"fmt"
"sync/atomic"
"time"

netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
Expand Down Expand Up @@ -445,3 +447,26 @@ func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) {

return &ni, nil
}

func listenMaxObjectPayloadSizeChanges(ctx context.Context, cli *nmClient.Client, lg *zap.Logger, f func(uint64)) {

Check warning on line 451 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L451

Added line #L451 was not covered by tests
// config rarely changes, but when it does - we do not want to wait long.
// Notification events would help https://github.com/nspcc-dev/neofs-contract/issues/427
const pollInterval = time.Minute
t := time.NewTimer(pollInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
lg.Debug("stop max object payload size net config poller by context", zap.Error(ctx.Err()))
return
case <-t.C:
lg.Info("rereading max object payload size net config by timer", zap.Duration("interval", pollInterval))
if sz, err := cli.MaxObjectSize(); err == nil {
f(sz)
} else {
lg.Error("failed to read max object payload size net config", zap.Error(err))

Check warning on line 467 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L454-L467

Added lines #L454 - L467 were not covered by tests
}
t.Reset(pollInterval)

Check warning on line 469 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L469

Added line #L469 was not covered by tests
}
}
}
5 changes: 4 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,13 @@ func initObjectService(c *cfg) {
searchsvcV2.WithKeyStorage(keyStorage),
)

cachedMaxObjPayloadSizeSrc := newCachedMaxObjectSizeSource(c, c.handleNewMaxObjectPayloadSize)
go listenMaxObjectPayloadSizeChanges(c.ctx, c.nCli, c.log, cachedMaxObjPayloadSizeSrc.handleNewMaxObjectPayloadSize)

Check warning on line 260 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L259-L260

Added lines #L259 - L260 were not covered by tests

sPut := putsvc.NewService(&transport{clients: putConstructor},
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
putsvc.WithMaxSizeSource(cachedMaxObjPayloadSizeSrc),

Check warning on line 265 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L265

Added line #L265 was not covered by tests
putsvc.WithObjectStorage(storageEngine{engine: ls}),
putsvc.WithContainerSource(c.cfgObject.cnrSource),
putsvc.WithNetworkMapSource(c.netMapSource),
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,5 @@ retract (
v1.22.1 // Contains retraction only.
v1.22.0 // Published accidentally.
)

replace google.golang.org/grpc => github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657 h1:khwEiSUz2Pi0dPqIPittz466RG/gFyzvTHrZHDiTcWg=
github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -273,8 +275,6 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3j
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down

0 comments on commit 19f39df

Please sign in to comment.