Skip to content

Commit

Permalink
Combined FSTree versioning and log (#3031)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Nov 28, 2024
2 parents edc95f6 + b79dcfa commit 2c087f3
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ attribute, which is used for container domain name in NNS contracts (#2954)
- Deprecate peapod substorage (#3013)
- Node does not stop trying to PUT an object if there are more PUT tasks than configured (#3027)
- `morph` configuration section renamed to `fschain` both in IR and SN (#3028)
- FSTree is limited to depth 8 max now (#3031)

### Removed
- Support for node.key configuration (#2959)
Expand Down
11 changes: 3 additions & 8 deletions pkg/local_object_storage/blobstor/blobstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func New(opts ...Option) *BlobStor {
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (b *BlobStor) SetLogger(l *zap.Logger) {
b.log = l
for i := range b.storage {
b.storage[i].Storage.SetLogger(l)
}
}

// WithStorages provides sub-blobstors.
Expand Down Expand Up @@ -105,11 +108,3 @@ func WithUncompressableContentTypes(values []string) Option {
c.compression.UncompressableContentTypes = values
}
}

// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
func (b *BlobStor) SetReportErrorFunc(f func(string, error)) {
for i := range b.storage {
b.storage[i].Storage.SetReportErrorFunc(f)
}
}
5 changes: 2 additions & 3 deletions pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// Storage represents key-value object storage.
Expand All @@ -16,10 +17,8 @@ type Storage interface {

Type() string
Path() string
SetLogger(*zap.Logger)
SetCompressor(cc *compression.Config)
// SetReportErrorFunc allows to provide a function to be called on disk errors.
// This function MUST be called before Open.
SetReportErrorFunc(f func(string, error))

// GetBytes reads object by address into memory buffer in a canonical NeoFS
// binary format. Returns [apistatus.ObjectNotFound] if object is missing.
Expand Down
57 changes: 57 additions & 0 deletions pkg/local_object_storage/blobstor/fstree/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Package fstree implements a storage subsystem that saves objects as files in FS tree.
The main concept behind it is rather simple: each object is stored as a file
in a directory. Given that handling many files in the same directory is usually
problematic for file systems objects are being put into subdirectories by their
IDs. This directory tree can have different [FSTree.Depth] and each component
of the path (single directory name) is a [DirNameLen] number of ID bytes from
its string representation. File name then is a leftover of object ID (after
stripping [FSTree.Depth] bytes off of it) concatenated with container ID.
For example, an object with ID of WCdSV7F9TnDHFmbgKY7BNCbPs6g7meaVbh6DMNXbytB
from Hh6qJ2Fa9WzSK7PESResS4mmuoZ2a47Z7F6ZvmR7AEHU container will be stored
in W/C/d/S/V directory and a name of 7F9TnDHFmbgKY7BNCbPs6g7meaVbh6DMNXbytB.Hh6qJ2Fa9WzSK7PESResS4mmuoZ2a47Z7F6ZvmR7AEHU
if the depth is 5. The overall structure may look like this (a part of it):
/W/
├── 7
│   └── 2
│   └── F
│   └── 6
│   └── 32vhigDXRPkSwaCTw3FtxWbKjoDGoDvVTJqxBb.J4SkhNifjANvYGr56vh4NbGBRCxvk8PT9YE5EgFSb7cc
├── C
│   └── d
│   └── S
│   └── V
│   └── 7F9TnDHFmbgKY7BNCbPs6g7meaVbh6DMNXbytB.Hh6qJ2Fa9WzSK7PESResS4mmuoZ2a47Z7F6ZvmR7AEHU
Binary file format can differ depending on the FSTree version. The basic format
that was used from the beginning is storing serialized protobuf representation
of the object as is. In this case file can be decoded into an object directly.
If compression is configured then the same file can have a compressed (ZSTD)
serialized protobuf.
Version 0.44.0 of the node has introduced a new file format that is used for
small files, the so-called "combined" one. It has a special prefix (0x7F) that
can not collide with protobuf-encoded or ZSTD-compressed data. Files using
this prefix can contain an unspecified number of objects (configured via
[WithCombinedCountLimit]) that all follow the same serialization pattern:
- the first byte is magic 0x7F described above
- one byte version of the subsequent structure (currently zero only)
- 32-byte OID of the next object then
- 4-byte BE integer length of the next object
- followed by protobuf-encoded or ZSTD-compressed object data (of the length
specified in the previous field)
Overall the structure is like this:
[0x7F 0x00 [OID A] [uint32 size]][object A][0x7F 0x00 [OID B] [uint32 size]][object B]...
Even though this file contains several objects it has hard links for all of
them in the FS tree, so finding a file containing some object doesn't require
any additional effort. Finding it in the file contents however requires
reading the prefix described above, comparing the target OID and either
skipping the object length specified there or reading it after the prefix.
*/
package fstree
58 changes: 38 additions & 20 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fstree

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -22,13 +21,15 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// FSTree represents an object storage as a filesystem tree.
type FSTree struct {
Info

*compression.Config
log *zap.Logger
Depth uint64
DirNameLen int
writer writer
Expand Down Expand Up @@ -60,14 +61,28 @@ type writer interface {
const (
// DirNameLen is how many bytes is used to group keys into directories.
DirNameLen = 1 // in bytes
// MaxDepth is maximum depth of nested directories.
MaxDepth = (sha256.Size - 1) / DirNameLen
// MaxDepth is maximum depth of nested directories. 58^8 is 128e12 of
// directories, enough for a single FSTree.
MaxDepth = 8

// combinedPrefix is the prefix that Protobuf message can't start with,
// it reads as "field number 15 of type 7", but there is no type 7 in
// the system (and we usually don't have 15 fields). ZSTD magic is also
// different.
combinedPrefix = 0x7f

// combinedLenSize is sizeof(uint32), length is a serialized 32-bit BE integer.
combinedLenSize = 4

// combinedIDOff is the offset from the start of the combined prefix to OID.
combinedIDOff = 2

// combinedLengthOff is the offset from the start of the combined prefix to object length.
combinedLengthOff = combinedIDOff + oid.Size

// combinedDataOff is the offset from the start of the combined prefix to object data.
// It's also the length of the prefix in total.
combinedDataOff = combinedLengthOff + combinedLenSize
)

var _ common.Storage = (*FSTree)(nil)
Expand Down Expand Up @@ -350,19 +365,19 @@ func getRawObjectBytes(id oid.ID, p string) ([]byte, error) {
return data, nil
}

func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
const (
prefixSize = 1
idSize = sha256.Size
lengthSize = 4

idOff = prefixSize
lengthOff = idOff + idSize
dataOff = lengthOff + lengthSize
)
// parseCombinedPrefix checks the given array for combined data prefix and
// returns a subslice with OID and object length if so (nil and 0 otherwise).
func parseCombinedPrefix(p [combinedDataOff]byte) ([]byte, uint32) {
if p[0] != combinedPrefix || p[1] != 0 { // Only version 0 is supported now.
return nil, 0
}
return p[combinedIDOff:combinedLengthOff],
binary.BigEndian.Uint32(p[combinedLengthOff:combinedDataOff])
}

func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
var (
comBuf [dataOff]byte
comBuf [combinedDataOff]byte
data []byte
isCombined bool
)
Expand All @@ -378,7 +393,11 @@ func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
}
return nil, err
}
if comBuf[0] != combinedPrefix {
thisOID, l := parseCombinedPrefix(comBuf)
if thisOID == nil {
if isCombined {
return nil, errors.New("malformed combined file")
}
st, err := f.Stat()
if err != nil {
return nil, err
Expand All @@ -396,8 +415,7 @@ func extractCombinedObject(id oid.ID, f *os.File) ([]byte, error) {
return data, nil
}
isCombined = true
var l = binary.BigEndian.Uint32(comBuf[lengthOff:dataOff])
if bytes.Equal(comBuf[idOff:lengthOff], id[:]) {
if bytes.Equal(thisOID, id[:]) {
data = make([]byte, l)
_, err = io.ReadFull(f, data)
if err != nil {
Expand Down Expand Up @@ -473,9 +491,9 @@ func (t *FSTree) SetCompressor(cc *compression.Config) {
t.Config = cc
}

// SetReportErrorFunc implements common.Storage.
func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
// Do nothing, FSTree can encounter only one error which is returned.
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (t *FSTree) SetLogger(l *zap.Logger) {
t.log = l.With(zap.String("substorage", Type))
}

// CleanUpTmp removes all temporary files garbage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -51,6 +52,7 @@ func newSpecificWriter(t *FSTree) writer {
}
fd, err := unix.Open(t.RootPath, flags, uint32(t.Permissions))
if err != nil {
t.log.Warn("optimized batching writer is disabled", zap.Error(err))
return nil // Which means that OS-specific writeData can't be created and FSTree should use the generic one.
}
_ = unix.Close(fd) // Don't care about error.
Expand Down Expand Up @@ -123,11 +125,11 @@ func (b *syncBatch) wait() error {
func (b *syncBatch) write(id oid.ID, p string, data []byte) error {
var (
err error
pref [1 + len(id) + 4]byte
pref [combinedDataOff]byte
)
pref[0] = combinedPrefix
copy(pref[1:], id[:])
binary.BigEndian.PutUint32(pref[1+len(id):], uint32(len(data)))
copy(pref[combinedIDOff:], id[:])
binary.BigEndian.PutUint32(pref[combinedLengthOff:], uint32(len(data)))

n, err := unix.Writev(b.fd, [][]byte{pref[:], data})
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/local_object_storage/blobstor/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ func (x *getBytesOnlySubStorage) Close() error { panic("
func (x *getBytesOnlySubStorage) Type() string { panic("must not be called") }
func (x *getBytesOnlySubStorage) Path() string { panic("must not be called") }
func (x *getBytesOnlySubStorage) SetCompressor(cc *compression.Config) { panic("must not be called") }
func (x *getBytesOnlySubStorage) SetReportErrorFunc(f func(string, error)) {
panic("must not be called")
}
func (x *getBytesOnlySubStorage) SetLogger(_ *zap.Logger) { panic("must not be called") }

func (x *getBytesOnlySubStorage) Get(prm common.GetPrm) (common.GetRes, error) {
panic("must not be called")
Expand Down
7 changes: 5 additions & 2 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)

type batch struct {
Expand Down Expand Up @@ -48,6 +49,7 @@ type Peapod struct {
flushInterval time.Duration

compress *compression.Config
log *zap.Logger

readOnly bool

Expand Down Expand Up @@ -274,8 +276,9 @@ func (x *Peapod) SetCompressor(cc *compression.Config) {
x.compress = cc
}

func (x *Peapod) SetReportErrorFunc(func(string, error)) {
// no-op like FSTree
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
func (x *Peapod) SetLogger(l *zap.Logger) {
x.log = l.With(zap.String("substorage", Type))
}

// Get reads data from the underlying database by the given object address.
Expand Down
2 changes: 0 additions & 2 deletions pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func New(opts ...Option) *Shard {
s.reportErrorFunc(s.ID().String(), msg, err)
}

s.blobStor.SetReportErrorFunc(reportFunc)

if c.useWriteCache {
s.writeCache = writecache.New(
append(c.writeCacheOpts,
Expand Down

0 comments on commit 2c087f3

Please sign in to comment.