Skip to content

Commit

Permalink
pkg/auth/badgerauth: add admin service for managing badger records
Browse files Browse the repository at this point in the history
Updates #190

Change-Id: Ia87619a4d325e7b809a0086f7b7ffd4ecd310e76
  • Loading branch information
halkyon authored and Storj Robot committed Jun 12, 2022
1 parent 96a0919 commit 383cb6b
Show file tree
Hide file tree
Showing 17 changed files with 1,264 additions and 35 deletions.
16 changes: 16 additions & 0 deletions pkg/auth/authdb/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
// Invalid is the class of error that is returned for invalid records.
var Invalid = errs.Class("invalid")

// KeyHashError is a class of key hash errors.
var KeyHashError = errs.Class("key hash")

// Record is a key/value store record.
type Record struct {
SatelliteAddress string
Expand All @@ -26,6 +29,19 @@ type Record struct {
// KeyHash is the key portion of the key/value store.
type KeyHash [32]byte

// SetBytes sets the key hash from bytes.
func (kh *KeyHash) SetBytes(v []byte) error {
if len(v) > len(KeyHash{}) {
return KeyHashError.New("v exceeds the acceptable length")
}
*kh = KeyHash{}
copy(kh[:], v)
return nil
}

// Bytes returns the bytes for key hash.
func (kh KeyHash) Bytes() []byte { return kh[:] }

// KV is an abstract key/value store of KeyHash to Records.
type KV interface {
// Put stores the record in the key/value store.
Expand Down
115 changes: 115 additions & 0 deletions pkg/auth/badgerauth/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package badgerauth

import (
"context"
"time"

badger "github.com/outcaste-io/badger/v3"
"github.com/zeebo/errs"

"storj.io/common/rpc/rpcstatus"
"storj.io/gateway-mt/pkg/auth/authdb"
"storj.io/gateway-mt/pkg/auth/badgerauth/pb"
)

// Admin represents a service that allows managing database records directly.
type Admin struct {
db *DB
}

var _ pb.DRPCAdminServiceServer = (*Admin)(nil)

// NewAdmin creates a new instance of Admin.
func NewAdmin(db *DB) *Admin {
return &Admin{db: db}
}

// GetRecord gets a database record.
func (admin *Admin) GetRecord(ctx context.Context, req *pb.GetRecordRequest) (_ *pb.GetRecordResponse, err error) {
defer mon.Task()(&ctx)(&err)

var resp pb.GetRecordResponse

var keyHash authdb.KeyHash
if err = keyHash.SetBytes(req.Key); err != nil {
return nil, errToRPCStatusErr(err)
}

resp.Record, err = admin.db.lookupRecord(ctx, keyHash)
if err != nil {
return nil, errToRPCStatusErr(err)
}

return &resp, nil
}

// InvalidateRecord invalidates a record.
func (admin *Admin) InvalidateRecord(ctx context.Context, req *pb.InvalidateRecordRequest) (_ *pb.InvalidateRecordResponse, err error) {
defer mon.Task()(&ctx)(&err)

var resp pb.InvalidateRecordResponse

if req.Reason == "" {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "missing reason")
}

var keyHash authdb.KeyHash
if err = keyHash.SetBytes(req.Key); err != nil {
return nil, errToRPCStatusErr(err)
}

return &resp, errToRPCStatusErr(admin.db.updateRecord(ctx, keyHash, func(record *pb.Record) {
record.InvalidatedAtUnix = time.Now().Unix()
record.InvalidationReason = req.Reason
}))
}

// UnpublishRecord unpublishes a record.
func (admin *Admin) UnpublishRecord(ctx context.Context, req *pb.UnpublishRecordRequest) (_ *pb.UnpublishRecordResponse, err error) {
defer mon.Task()(&ctx)(&err)

var resp pb.UnpublishRecordResponse

var keyHash authdb.KeyHash
if err = keyHash.SetBytes(req.Key); err != nil {
return nil, errToRPCStatusErr(err)
}

return &resp, errToRPCStatusErr(admin.db.updateRecord(ctx, keyHash, func(record *pb.Record) {
record.Public = false
}))
}

// DeleteRecord deletes a database record.
func (admin *Admin) DeleteRecord(ctx context.Context, req *pb.DeleteRecordRequest) (_ *pb.DeleteRecordResponse, err error) {
defer mon.Task()(&ctx)(&err)

var resp pb.DeleteRecordResponse

var keyHash authdb.KeyHash
if err = keyHash.SetBytes(req.Key); err != nil {
return nil, errToRPCStatusErr(err)
}

return &resp, errToRPCStatusErr(admin.db.deleteRecord(ctx, keyHash))
}

func errToRPCStatusErr(err error) error {
switch {
case err == nil:
return nil
case ProtoError.Has(err),
authdb.KeyHashError.Has(err),
errs.Is(err, badger.ErrInvalidKey),
errs.Is(err, badger.ErrBannedKey),
errs.Is(err, badger.ErrEmptyKey):
return rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
case errs.Is(err, badger.ErrKeyNotFound):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
default:
return rpcstatus.Error(rpcstatus.Internal, err.Error())
}
}
151 changes: 151 additions & 0 deletions pkg/auth/badgerauth/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package badgerauth_test

import (
"testing"

"github.com/stretchr/testify/require"

"storj.io/common/rpc/rpcstatus"
"storj.io/common/testcontext"
"storj.io/gateway-mt/pkg/auth/badgerauth"
"storj.io/gateway-mt/pkg/auth/badgerauth/badgerauthtest"
"storj.io/gateway-mt/pkg/auth/badgerauth/pb"
)

func TestNodeAdmin_GetRecord(t *testing.T) {
badgerauthtest.RunSingleNode(t, badgerauth.Config{
ID: badgerauth.NodeID{'a', 'd', 'm', 'g', 'e', 't'},
}, func(ctx *testcontext.Context, t *testing.T, node *badgerauth.Node) {
admin := badgerauth.NewAdmin(node.DB)
records, keys, _ := badgerauthtest.CreateFullRecords(ctx, t, node, 2)

_, err := admin.GetRecord(ctx, &pb.GetRecordRequest{Key: []byte{}})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

_, err = admin.GetRecord(ctx, &pb.GetRecordRequest{Key: []byte{'a'}})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

_, err = admin.GetRecord(ctx, &pb.GetRecordRequest{Key: make([]byte, 33)})
require.Equal(t, rpcstatus.Code(err), rpcstatus.InvalidArgument)

resp, err := admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[0].Bytes()})
require.NoError(t, err)
require.Equal(t, records[keys[0]].EncryptedAccessGrant, resp.Record.EncryptedAccessGrant)

resp, err = admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[1].Bytes()})
require.NoError(t, err)
require.Equal(t, records[keys[1]].EncryptedAccessGrant, resp.Record.EncryptedAccessGrant)
})
}

func TestNodeAdmin_InvalidateRecord(t *testing.T) {
badgerauthtest.RunSingleNode(t, badgerauth.Config{
ID: badgerauth.NodeID{'a', 'd', 'm', 'i', 'n', 'v'},
}, func(ctx *testcontext.Context, t *testing.T, node *badgerauth.Node) {
admin := badgerauth.NewAdmin(node.DB)
records, keys, _ := badgerauthtest.CreateFullRecords(ctx, t, node, 2)

_, err := admin.InvalidateRecord(ctx, &pb.InvalidateRecordRequest{Reason: "test"})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

_, err = admin.InvalidateRecord(ctx, &pb.InvalidateRecordRequest{
Key: make([]byte, 33),
Reason: "something",
})
require.Equal(t, rpcstatus.Code(err), rpcstatus.InvalidArgument)

_, err = admin.InvalidateRecord(ctx, &pb.InvalidateRecordRequest{
Key: []byte{'a'},
Reason: "something",
})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

_, err = admin.InvalidateRecord(ctx, &pb.InvalidateRecordRequest{
Key: keys[0].Bytes(),
Reason: "",
})
require.Equal(t, rpcstatus.Code(err), rpcstatus.InvalidArgument)

_, err = admin.InvalidateRecord(ctx, &pb.InvalidateRecordRequest{
Key: keys[0].Bytes(),
Reason: "your key is disabled",
})
require.NoError(t, err)

resp, err := admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[0].Bytes()})
require.NoError(t, err)
require.Equal(t, records[keys[0]].EncryptedAccessGrant, resp.Record.EncryptedAccessGrant)
require.Equal(t, "your key is disabled", resp.Record.InvalidationReason)
require.NotZero(t, resp.Record.InvalidatedAtUnix)

resp, err = admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[1].Bytes()})
require.NoError(t, err)
require.Equal(t, records[keys[1]].EncryptedAccessGrant, resp.Record.EncryptedAccessGrant)
require.Equal(t, "", resp.Record.InvalidationReason)
require.Zero(t, resp.Record.InvalidatedAtUnix)
})
}

func TestNodeAdmin_UnpublishRecord(t *testing.T) {
badgerauthtest.RunSingleNode(t, badgerauth.Config{
ID: badgerauth.NodeID{'a', 'd', 'm', 'p', 'u', 'b'},
}, func(ctx *testcontext.Context, t *testing.T, node *badgerauth.Node) {
admin := badgerauth.NewAdmin(node.DB)
records, keys, _ := badgerauthtest.CreateFullRecords(ctx, t, node, 2)

_, err := admin.UnpublishRecord(ctx, &pb.UnpublishRecordRequest{})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

_, err = admin.UnpublishRecord(ctx, &pb.UnpublishRecordRequest{Key: make([]byte, 33)})
require.Equal(t, rpcstatus.Code(err), rpcstatus.InvalidArgument)

_, err = admin.UnpublishRecord(ctx, &pb.UnpublishRecordRequest{Key: keys[0].Bytes()})
require.NoError(t, err)

resp, err := admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[0].Bytes()})
require.NoError(t, err)
require.Equal(t, records[keys[0]].EncryptedAccessGrant, resp.Record.EncryptedAccessGrant)
require.False(t, resp.Record.Public)

resp, err = admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[1].Bytes()})
require.NoError(t, err)
require.Equal(t, records[keys[1]].EncryptedAccessGrant, resp.Record.EncryptedAccessGrant)
require.True(t, resp.Record.Public)
})
}

func TestNodeAdmin_DeleteRecord(t *testing.T) {
badgerauthtest.RunSingleNode(t, badgerauth.Config{
ID: badgerauth.NodeID{'a', 'd', 'm', 'd', 'e', 'l'},
}, func(ctx *testcontext.Context, t *testing.T, node *badgerauth.Node) {
admin := badgerauth.NewAdmin(node.DB)
records, keys, entries := badgerauthtest.CreateFullRecords(ctx, t, node, 2)

badgerauthtest.VerifyReplicationLog{
Entries: entries,
}.Check(ctx, t, node.UnderlyingDB().UnderlyingDB())

_, err := admin.DeleteRecord(ctx, &pb.DeleteRecordRequest{Key: []byte{'a'}})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

_, err = admin.DeleteRecord(ctx, &pb.DeleteRecordRequest{Key: keys[0].Bytes()})
require.NoError(t, err)

_, err = admin.DeleteRecord(ctx, &pb.DeleteRecordRequest{Key: make([]byte, 33)})
require.Equal(t, rpcstatus.Code(err), rpcstatus.InvalidArgument)

badgerauthtest.VerifyReplicationLog{
Entries: []badgerauthtest.ReplicationLogEntryWithTTL{entries[1]},
}.Check(ctx, t, node.UnderlyingDB().UnderlyingDB())

_, err = admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[0].Bytes()})
require.Equal(t, rpcstatus.Code(err), rpcstatus.NotFound)

resp, err := admin.GetRecord(ctx, &pb.GetRecordRequest{Key: keys[1].Bytes()})
require.NoError(t, err)
require.Equal(t, resp.Record.EncryptedAccessGrant, records[keys[1]].EncryptedAccessGrant)
})
}
2 changes: 1 addition & 1 deletion pkg/auth/badgerauth/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestBackupRestore(t *testing.T) {
},
func(ctx *testcontext.Context, t *testing.T, node *badgerauth.Node) {
node.Backup.Client = &s3Client
expectedRecords, expectedEntries = badgerauthtest.CreateFullRecords(ctx, t, node, 10)
expectedRecords, _, expectedEntries = badgerauthtest.CreateFullRecords(ctx, t, node, 10)
node.Backup.SyncCycle.TriggerWait()
},
)
Expand Down
18 changes: 9 additions & 9 deletions pkg/auth/badgerauth/badgerauthmigration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (kv *KV) Put(ctx context.Context, keyHash authdb.KeyHash, record *authdb.Re
if err := kv.src.Put(ctx, keyHash, record); err != nil {
return Error.New("failed to write to sqlauth: %w", err)
}
kv.log.Debug("Wrote to sqlauth", zap.Binary("keyHash", keyHash[:]))
kv.log.Debug("Wrote to sqlauth", zap.Binary("keyHash", keyHash.Bytes()))
if err := kv.dst.Put(ctx, keyHash, record); err != nil {
kv.mon.Event("as_badgerauthmigration_destination_put_err")
return Error.New("failed to write to badgerauth: %w", err)
}
kv.log.Debug("Wrote to badgerauth", zap.Binary("keyHash", keyHash[:]))
kv.log.Debug("Wrote to badgerauth", zap.Binary("keyHash", keyHash.Bytes()))
return nil
}

Expand Down Expand Up @@ -146,8 +146,11 @@ func (kv *KV) MigrateToLatest(ctx context.Context) error {
}
if err = dstDB.Update(func(txn *badger.Txn) error {
for _, r := range rows {
keyHash, record := convertRecord(r)
if err = badgerauth.InsertRecord(kv.log, txn, kv.dst.ID(), keyHash, record); err != nil {
var keyHash authdb.KeyHash
if err = keyHash.SetBytes(r.EncryptionKeyHash); err != nil {
return err
}
if err = badgerauth.InsertRecord(kv.log, txn, kv.dst.ID(), keyHash, convertRecord(r)); err != nil {
return err
}
count++
Expand All @@ -170,10 +173,7 @@ func (kv *KV) MigrateToLatest(ctx context.Context) error {
return nil
}

func convertRecord(r *dbx.Record) (authdb.KeyHash, *pb.Record) {
var keyHash authdb.KeyHash
copy(keyHash[:], r.EncryptionKeyHash)

func convertRecord(r *dbx.Record) *pb.Record {
converted := &pb.Record{
CreatedAtUnix: r.CreatedAt.Unix(),
Public: r.Public,
Expand All @@ -193,7 +193,7 @@ func convertRecord(r *dbx.Record) (authdb.KeyHash, *pb.Record) {
converted.InvalidatedAtUnix = r.InvalidAt.Unix()
}

return keyHash, converted
return converted
}

// Close closes the database.
Expand Down
8 changes: 6 additions & 2 deletions pkg/auth/badgerauth/badgerauthtest/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/gateway-mt/pkg/auth/authdb"
Expand All @@ -23,6 +25,7 @@ func CreateFullRecords(
count int,
) (
records map[authdb.KeyHash]*authdb.Record,
keys []authdb.KeyHash,
entries []ReplicationLogEntryWithTTL,
) {
records = make(map[authdb.KeyHash]*authdb.Record)
Expand All @@ -31,7 +34,7 @@ func CreateFullRecords(
marker := testrand.RandAlphaNumeric(32)

var keyHash authdb.KeyHash
copy(keyHash[:], marker)
require.NoError(t, keyHash.SetBytes(marker))

// TODO(artur): make expiresAt configurable or random per record.
expiresAt := time.Unix(time.Now().Add(time.Hour).Unix(), 0)
Expand All @@ -45,6 +48,7 @@ func CreateFullRecords(
Public: testrand.Intn(1) == 0,
}

keys = append(keys, keyHash)
records[keyHash] = record
entries = append(entries, ReplicationLogEntryWithTTL{
Entry: badgerauth.ReplicationLogEntry{
Expand All @@ -63,5 +67,5 @@ func CreateFullRecords(
}.Check(ctx, t, node.UnderlyingDB())
}

return records, entries
return records, keys, entries
}
Loading

0 comments on commit 383cb6b

Please sign in to comment.