Skip to content

Commit

Permalink
node/control: add object revive control command
Browse files Browse the repository at this point in the history
Includes API definition extending, RPC implementation, tests of metabase func.
The command requests server's storage engine to revive object by address. It's
purge all removal marks from all metabases and returns revival statuses.

Signed-off-by: Andrey Butusov <[email protected]>
  • Loading branch information
End-rey committed Oct 23, 2024
1 parent d743d42 commit 1c78683
Show file tree
Hide file tree
Showing 10 changed files with 1,300 additions and 258 deletions.
41 changes: 41 additions & 0 deletions pkg/local_object_storage/engine/revive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package engine

import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// ReviveShardStatus contains the Status of the object's revival in the Shard and Shard ID.
type ReviveShardStatus struct {
ID string
Status meta.ReviveStatus
}

// ReviveStatus represents the status of the object's revival in the StorageEngine.
type ReviveStatus struct {
Shards []ReviveShardStatus
}

// ReviveObject forcefully revives object by oid.Address in the StorageEngine.
// Iterate over all shards despite errors and purge all removal marks from all metabases.
func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err error) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
reviveStatus, err := sh.ReviveObject(address)
id := *sh.ID()
res.Shards = append(res.Shards, ReviveShardStatus{
ID: id.String(),
Status: reviveStatus,
})
if err != nil {
e.log.Warn("failed to revive object in shard",
zap.String("shard", id.String()),
zap.String("address", address.EncodeToString()),
zap.Error(err),
)
}

return false
})
return
}
147 changes: 147 additions & 0 deletions pkg/local_object_storage/metabase/revive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package meta

import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
)

// ErrObjectWasNotRemoved is returned when object neither in the graveyard nor was marked with GC mark.
var ErrObjectWasNotRemoved = logicerr.New("object neither in the graveyard nor was marked with GC mark")

// ErrReviveFromContainerGarbage is returned when the object is in the container that marked with GC mark.
var ErrReviveFromContainerGarbage = logicerr.New("revive from container marked with GC mark")

type reviveStatusType int

const (
// ReviveStatusGraveyard is the type of revival status of an object from a graveyard.
ReviveStatusGraveyard reviveStatusType = iota
// ReviveStatusGarbage is the type of revival status of an object from the garbage bucket.
ReviveStatusGarbage
// ReviveStatusError is the type of status when an error occurs during revive.
ReviveStatusError
)

// ReviveStatus groups the resulting values of ReviveObject operation.
// Contains the type of revival status and message for details.
type ReviveStatus struct {
statusType reviveStatusType
message string
}

// Message returns message of status.
func (s *ReviveStatus) Message() string {
return s.message
}

// StatusType returns the type of revival status.
func (s *ReviveStatus) StatusType() reviveStatusType {
return s.statusType
}

func (s *ReviveStatus) setStatusGraveyard(tomb string) {
s.statusType = ReviveStatusGraveyard
s.message = fmt.Sprintf("successful revival from graveyard, tomb: %s", tomb)
}

func (s *ReviveStatus) setStatusGarbage() {
s.statusType = ReviveStatusGarbage
s.message = "successful revival from garbage bucket"
}

func (s *ReviveStatus) setStatusError(err error) {
s.statusType = ReviveStatusError
s.message = fmt.Sprintf("didn't revive, err: %v", err)
}

// ReviveObject revives object by oid.Address. Removes GCMark/Tombstone records in the corresponding buckets
// and restore metrics.
func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.ReadOnly() {
res.setStatusError(ErrReadOnlyMode)
return res, ErrReadOnlyMode
} else if db.mode.NoMetabase() {
res.setStatusError(ErrDegradedMode)
return res, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()

err = db.boltDB.Update(func(tx *bbolt.Tx) error {
garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName)
garbageContainersBKT := tx.Bucket(garbageContainersBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)

buf := make([]byte, addressKeySize)

targetKey := addressKey(addr, buf)

if graveyardBKT == nil || garbageObjectsBKT == nil {
// incorrect metabase state, does not make
// sense to check garbage bucket
return ErrObjectWasNotRemoved
}

val := graveyardBKT.Get(targetKey)
if val != nil {
// object in the graveyard
if err := graveyardBKT.Delete(targetKey); err != nil {
return err
}

var tombAddress oid.Address
if err := decodeAddressFromKey(&tombAddress, val[:addressKeySize]); err != nil {
return err
}
res.setStatusGraveyard(tombAddress.EncodeToString())
} else {
val = garbageContainersBKT.Get(targetKey[:cidSize])
if val != nil {
return ErrReviveFromContainerGarbage
}

val = garbageObjectsBKT.Get(targetKey)
if val != nil {
// object marked with GC mark
res.setStatusGarbage()
} else {
// neither in the graveyard
// nor was marked with GC mark
return ErrObjectWasNotRemoved
}
}

if err := garbageObjectsBKT.Delete(targetKey); err != nil {
return err
}

if obj, err := db.get(tx, addr, buf, false, true, currEpoch); err == nil {
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == object.TypeRegular {
if err := changeContainerSize(tx, addr.Container(), obj.PayloadSize(), true); err != nil {
return err
}
}

// also need to restore logical counter
if err := db.updateCounter(tx, logical, 1, true); err != nil {
return err
}
}

return nil
})
if err != nil {
res.setStatusError(err)
}

return
}
130 changes: 130 additions & 0 deletions pkg/local_object_storage/metabase/revive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package meta_test

import (
"testing"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func TestDB_ReviveObject(t *testing.T) {
db := newDB(t)

t.Run("from graveyard", func(t *testing.T) {
raw := generateObject(t)
addAttribute(raw, "foo", "bar")

tombstoneID := oidtest.Address()

err := putBig(db, raw)
require.NoError(t, err)

exists, err := metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

// inhume object with tombstone
err = metaInhume(db, object.AddressOf(raw), tombstoneID)
require.NoError(t, err)

_, err = metaExists(db, object.AddressOf(raw))
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))

_, err = metaGet(db, object.AddressOf(raw), false)
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))

// revive object
res, err := db.ReviveObject(object.AddressOf(raw))
require.NoError(t, err)
require.Equal(t, meta.ReviveStatusGraveyard, res.StatusType())

exists, err = metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)
})

t.Run("from GC", func(t *testing.T) {
raw := generateObject(t)
addAttribute(raw, "foo", "bar")

err := putBig(db, raw)
require.NoError(t, err)

exists, err := metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

// inhume with GC mark
var gcPrm meta.InhumePrm
gcPrm.SetGCMark()
gcPrm.SetAddresses(object.AddressOf(raw))

_, err = db.Inhume(gcPrm)
require.NoError(t, err)

_, err = metaExists(db, object.AddressOf(raw))
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

_, err = metaGet(db, object.AddressOf(raw), false)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

// revive object
res, err := db.ReviveObject(object.AddressOf(raw))
require.NoError(t, err)
require.Equal(t, meta.ReviveStatusGarbage, res.StatusType())

exists, err = metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

obj, err := metaGet(db, object.AddressOf(raw), false)
require.NoError(t, err)
require.NotNil(t, obj)
})

t.Run("revive locked", func(t *testing.T) {
locked := oidtest.Address()

err := db.Lock(locked.Container(), oidtest.ID(), []oid.ID{locked.Object()})
require.NoError(t, err)

var prm meta.InhumePrm
prm.SetAddresses(locked)

_, err = db.Inhume(prm)

require.ErrorIs(t, err, new(apistatus.ObjectLocked))

res, err := db.ReviveObject(locked)
require.ErrorIs(t, err, meta.ErrObjectWasNotRemoved)
require.Equal(t, meta.ReviveStatusError, res.StatusType())
})

t.Run("revive object that not stored in db", func(t *testing.T) {
addr := oidtest.Address()

res, err := db.ReviveObject(addr)
require.ErrorIs(t, err, meta.ErrObjectWasNotRemoved)
require.Equal(t, meta.ReviveStatusError, res.StatusType())
})

t.Run("revive object that not removed", func(t *testing.T) {
raw := generateObject(t)
addAttribute(raw, "foo", "bar")

err := putBig(db, raw)
require.NoError(t, err)

exists, err := metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

res, err := db.ReviveObject(object.AddressOf(raw))
require.ErrorIs(t, err, meta.ErrObjectWasNotRemoved)
require.Equal(t, meta.ReviveStatusError, res.StatusType())
})
}
21 changes: 21 additions & 0 deletions pkg/local_object_storage/shard/revive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package shard

import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// ReviveObject try to revive object in Shard, by remove records from graveyard and garbage.
//
// Returns meta.ReviveStatus of object and error.
func (s *Shard) ReviveObject(addr oid.Address) (meta.ReviveStatus, error) {
s.m.RLock()
defer s.m.RUnlock()

if s.GetMode().ReadOnly() {
return meta.ReviveStatus{}, ErrReadOnlyMode
} else if s.GetMode().NoMetabase() {
return meta.ReviveStatus{}, ErrDegradedMode
}
return s.metaBase.ReviveObject(addr)
}
18 changes: 18 additions & 0 deletions pkg/services/control/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,21 @@ func (w *objectStatusResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.ObjectStatusResponse = r
return nil
}

type reviveObjectResponseWrapper struct {
*ReviveObjectResponse
}

func (w *reviveObjectResponseWrapper) ToGRPCMessage() grpc.Message {
return w.ReviveObjectResponse
}

func (w *reviveObjectResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*ReviveObjectResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*ReviveObjectResponse)(nil))
}

w.ReviveObjectResponse = r
return nil
}
22 changes: 22 additions & 0 deletions pkg/services/control/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache"
rpcObjectStatus = "ObjectStatus"
rpcReviveObject = "ReviveObject"
)

// HealthCheck executes ControlService.HealthCheck RPC.
Expand Down Expand Up @@ -246,3 +247,24 @@ func ObjectStatus(cli *client.Client, req *ObjectStatusRequest, opts ...client.C

return wResp.ObjectStatusResponse, nil
}

// ReviveObject executes ControlService.ReviveObject RPC.
func ReviveObject(
cli *client.Client,
req *ReviveObjectRequest,
opts ...client.CallOption,
) (*ReviveObjectResponse, error) {
wResp := &reviveObjectResponseWrapper{
new(ReviveObjectResponse),
}

wReq := &requestWrapper{
m: req,
}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcReviveObject), wReq, wResp, opts...)
if err != nil {
return nil, err
}

return wResp.ReviveObjectResponse, nil
}
Loading

0 comments on commit 1c78683

Please sign in to comment.