Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined FSTree versioning and log #3031

Merged
merged 8 commits into from
Nov 28, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Deprecate peapod substorage (#3013)
- Node does not stop trying to PUT an object if there are more PUT tasks than configured (#3027)
- `morph` configuration section renamed to `fschain` both in IR and SN (#3028)
- FSTree is limited to depth 8 max now (#3031)

### Removed
- Support for node.key configuration (#2959)
Expand Down
11 changes: 3 additions & 8 deletions pkg/local_object_storage/blobstor/blobstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (b *BlobStor) SetLogger(l *zap.Logger) {
b.log = l
for i := range b.storage {
b.storage[i].Storage.SetLogger(l)
}

Check warning on line 73 in pkg/local_object_storage/blobstor/blobstor.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/blobstor.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}

// WithStorages provides sub-blobstors.
Expand Down Expand Up @@ -105,11 +108,3 @@
c.compression.UncompressableContentTypes = values
}
}

// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
func (b *BlobStor) SetReportErrorFunc(f func(string, error)) {
for i := range b.storage {
b.storage[i].Storage.SetReportErrorFunc(f)
}
}
5 changes: 2 additions & 3 deletions pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// Storage represents key-value object storage.
Expand All @@ -16,10 +17,8 @@ type Storage interface {

Type() string
Path() string
SetLogger(*zap.Logger)
SetCompressor(cc *compression.Config)
// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

// GetBytes reads object by address into memory buffer in a canonical NeoFS
// binary format. Returns [apistatus.ObjectNotFound] if object is missing.
Expand Down
57 changes: 57 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Package fstree implements a storage subsystem that saves objects as files in FS tree.

The main concept behind it is rather simple: each object is stored as a file
in a directory. Given that handling many files in the same directory is usually
problematic for file systems objects are being put into subdirectories by their
IDs. This directory tree can have different [FSTree.Depth] and each component
of the path (single directory name) is a [DirNameLen] number of ID bytes from
its string representation. File name then is a leftover of object ID (after
stripping [FSTree.Depth] bytes off of it) concatenated with container ID.

For example, an object with ID of WCdSV7F9TnDHFmbgKY7BNCbPs6g7meaVbh6DMNXbytB
from Hh6qJ2Fa9WzSK7PESResS4mmuoZ2a47Z7F6ZvmR7AEHU container will be stored
in W/C/d/S/V directory and a name of 7F9TnDHFmbgKY7BNCbPs6g7meaVbh6DMNXbytB.Hh6qJ2Fa9WzSK7PESResS4mmuoZ2a47Z7F6ZvmR7AEHU
if the depth is 5. The overall structure may look like this (a part of it):

/W/
├── 7
│   └── 2
│   └── F
│   └── 6
│   └── 32vhigDXRPkSwaCTw3FtxWbKjoDGoDvVTJqxBb.J4SkhNifjANvYGr56vh4NbGBRCxvk8PT9YE5EgFSb7cc
├── C
│   └── d
│   └── S
│   └── V
│   └── 7F9TnDHFmbgKY7BNCbPs6g7meaVbh6DMNXbytB.Hh6qJ2Fa9WzSK7PESResS4mmuoZ2a47Z7F6ZvmR7AEHU

Binary file format can differ depending on the FSTree version. The basic format
that was used from the beginning is storing serialized protobuf representation
of the object as is. In this case file can be decoded into an object directly.
If compression is configured then the same file can have a compressed (ZSTD)
serialized protobuf.

Version 0.44.0 of the node has introduced a new file format that is used for
small files, the so-called "combined" one. It has a special prefix (0x7F) that
can not collide with protobuf-encoded or ZSTD-compressed data. Files using
this prefix can contain an unspecified number of objects (configured via
[WithCombinedCountLimit]) that all follow the same serialization pattern:
- the first byte is magic 0x7F described above
- one byte version of the subsequent structure (currently zero only)
- 32-byte OID of the next object then
- 4-byte BE integer length of the next object
- followed by protobuf-encoded or ZSTD-compressed object data (of the length
specified in the previous field)

Overall the structure is like this:

[0x7F 0x00 [OID A] [uint32 size]][object A][0x7F 0x00 [OID B] [uint32 size]][object B]...

Even though this file contains several objects it has hard links for all of
them in the FS tree, so finding a file containing some object doesn't require
any additional effort. Finding it in the file contents however requires
reading the prefix described above, comparing the target OID and either
skipping the object length specified there or reading it after the prefix.
*/
package fstree
58 changes: 38 additions & 20 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -22,13 +21,15 @@
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// FSTree represents an object storage as a filesystem tree.
type FSTree struct {
Info

*compression.Config
log *zap.Logger
Depth uint64
DirNameLen int
writer writer
Expand Down Expand Up @@ -60,14 +61,28 @@
const (
// DirNameLen is how many bytes is used to group keys into directories.
DirNameLen = 1 // in bytes
// MaxDepth is maximum depth of nested directories.
MaxDepth = (sha256.Size - 1) / DirNameLen
// MaxDepth is maximum depth of nested directories. 58^8 is 128e12 of
// directories, enough for a single FSTree.
MaxDepth = 8

// combinedPrefix is the prefix that Protobuf message can't start with,
// it reads as "field number 15 of type 7", but there is no type 7 in
// the system (and we usually don't have 15 fields). ZSTD magic is also
// different.
combinedPrefix = 0x7f

// combinedLenSize is sizeof(uint32), length is a serialized 32-bit BE integer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// combinedLenSize is sizeof(uint32), length is a serialized 32-bit BE integer.
// combinedLenSize is sizeof(uint32), length of a serialized 32-bit BE integer.

no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, to be fixed.

combinedLenSize = 4

// combinedIDOff is the offset from the start of the combined prefix to OID.
combinedIDOff = 2

// combinedLengthOff is the offset from the start of the combined prefix to object length.
combinedLengthOff = combinedIDOff + oid.Size

// combinedDataOff is the offset from the start of the combined prefix to object data.
// It's also the length of the prefix in total.
combinedDataOff = combinedLengthOff + combinedLenSize
)

var _ common.Storage = (*FSTree)(nil)
Expand Down Expand Up @@ -350,19 +365,19 @@
return data, nil
}

func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
const (
prefixSize = 1
idSize = sha256.Size
lengthSize = 4

idOff = prefixSize
lengthOff = idOff + idSize
dataOff = lengthOff + lengthSize
)
// parseCombinedPrefix checks the given array for combined data prefix and
// returns a subslice with OID and object length if so (nil and 0 otherwise).
func parseCombinedPrefix(p [combinedDataOff]byte) ([]byte, uint32) {
if p[0] != combinedPrefix || p[1] != 0 { // Only version 0 is supported now.
return nil, 0
}

Check warning on line 373 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L372-L373

Added lines #L372 - L373 were not covered by tests
return p[combinedIDOff:combinedLengthOff],
binary.BigEndian.Uint32(p[combinedLengthOff:combinedDataOff])
}

func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
var (
comBuf [dataOff]byte
comBuf [combinedDataOff]byte
data []byte
isCombined bool
)
Expand All @@ -378,7 +393,11 @@
}
return nil, err
}
if comBuf[0] != combinedPrefix {
thisOID, l := parseCombinedPrefix(comBuf)
if thisOID == nil {
if isCombined {
return nil, errors.New("malformed combined file")
}

Check warning on line 400 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L398-L400

Added lines #L398 - L400 were not covered by tests
st, err := f.Stat()
if err != nil {
return nil, err
Expand All @@ -396,8 +415,7 @@
return data, nil
}
isCombined = true
var l = binary.BigEndian.Uint32(comBuf[lengthOff:dataOff])
if bytes.Equal(comBuf[idOff:lengthOff], id[:]) {
if bytes.Equal(thisOID, id[:]) {
data = make([]byte, l)
_, err = io.ReadFull(f, data)
if err != nil {
Expand Down Expand Up @@ -473,9 +491,9 @@
t.Config = cc
}

// SetReportErrorFunc implements common.Storage.
func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
// Do nothing, FSTree can encounter only one error which is returned.
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (t *FSTree) SetLogger(l *zap.Logger) {
t.log = l.With(zap.String("substorage", Type))

Check warning on line 496 in pkg/local_object_storage/blobstor/fstree/fstree.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree.go#L495-L496

Added lines #L495 - L496 were not covered by tests
}

// CleanUpTmp removes all temporary files garbage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -51,6 +52,7 @@
}
fd, err := unix.Open(t.RootPath, flags, uint32(t.Permissions))
if err != nil {
t.log.Warn("optimized batching writer is disabled", zap.Error(err))

Check warning on line 55 in pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/fstree/fstree_write_linux.go#L55

Added line #L55 was not covered by tests
return nil // Which means that OS-specific writeData can't be created and FSTree should use the generic one.
}
_ = unix.Close(fd) // Don't care about error.
Expand Down Expand Up @@ -123,11 +125,11 @@
func (b *syncBatch) write(id oid.ID, p string, data []byte) error {
var (
err error
pref [1 + len(id) + 4]byte
pref [combinedDataOff]byte
)
pref[0] = combinedPrefix
copy(pref[1:], id[:])
binary.BigEndian.PutUint32(pref[1+len(id):], uint32(len(data)))
copy(pref[combinedIDOff:], id[:])
binary.BigEndian.PutUint32(pref[combinedLengthOff:], uint32(len(data)))

n, err := unix.Writev(b.fd, [][]byte{pref[:], data})
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/local_object_storage/blobstor/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ func (x *getBytesOnlySubStorage) Close() error { panic("
func (x *getBytesOnlySubStorage) Type() string { panic("must not be called") }
func (x *getBytesOnlySubStorage) Path() string { panic("must not be called") }
func (x *getBytesOnlySubStorage) SetCompressor(cc *compression.Config) { panic("must not be called") }
func (x *getBytesOnlySubStorage) SetReportErrorFunc(f func(string, error)) {
panic("must not be called")
}
func (x *getBytesOnlySubStorage) SetLogger(_ *zap.Logger) { panic("must not be called") }

func (x *getBytesOnlySubStorage) Get(prm common.GetPrm) (common.GetRes, error) {
panic("must not be called")
Expand Down
7 changes: 5 additions & 2 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)

type batch struct {
Expand Down Expand Up @@ -48,6 +49,7 @@
flushInterval time.Duration

compress *compression.Config
log *zap.Logger

readOnly bool

Expand Down Expand Up @@ -274,8 +276,9 @@
x.compress = cc
}

func (x *Peapod) SetReportErrorFunc(func(string, error)) {
// no-op like FSTree
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (x *Peapod) SetLogger(l *zap.Logger) {
x.log = l.With(zap.String("substorage", Type))

Check warning on line 281 in pkg/local_object_storage/blobstor/peapod/peapod.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/blobstor/peapod/peapod.go#L280-L281

Added lines #L280 - L281 were not covered by tests
}

// Get reads data from the underlying database by the given object address.
Expand Down
2 changes: 0 additions & 2 deletions pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func New(opts ...Option) *Shard {
s.reportErrorFunc(s.ID().String(), msg, err)
}

s.blobStor.SetReportErrorFunc(reportFunc)

if c.useWriteCache {
s.writeCache = writecache.New(
append(c.writeCacheOpts,
Expand Down
Loading