Skip to content

Commit

Permalink
Merge pull request #158 from bitcoin-sv/feature/dynamodb-ttl
Browse files Browse the repository at this point in the history
Feature/dynamodb ttl
  • Loading branch information
boecklim authored Nov 21, 2023
2 parents 41f9952 + 0e8608c commit a1df037
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 25 deletions.
11 changes: 9 additions & 2 deletions cmd/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
)

const (
DbModeBadger = "badger"
DbModeDynamoDB = "dynamodb"
DbModeBadger = "badger"
DbModeDynamoDB = "dynamodb"
dataRetentionPeriodDaysDefault = 14
)

func StartMetamorph(logger utils.Logger) (func(), error) {
Expand Down Expand Up @@ -357,9 +358,15 @@ func NewStore(dbMode string, folder string) (s store.MetamorphStore, err error)
return nil, err
}

ttlDays := viper.GetInt("metamorph.dataRetentionPeriodDays")
if ttlDays == 0 {
ttlDays = dataRetentionPeriodDaysDefault
}

s, err = dynamodb.New(
awsdynamodb.NewFromConfig(cfg),
hostname,
time.Duration(ttlDays)*24*time.Hour,
)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion metamorph/store/Interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type StoreData struct {
CallbackToken string `dynamodbav:"callback_token"`
RejectReason string `dynamodbav:"reject_reason"`
LockedBy string `dynamodbav:"locked_by"`
Ttl int64 `dynamodbav:"ttl"`
}

func (sd *StoreData) EncodeToBytes() ([]byte, error) {
Expand Down Expand Up @@ -200,7 +201,7 @@ func DecodeFromBytes(b []byte) (*StoreData, error) {
return sd, nil
}

var ErrNotFound = errors.New("txid could not be found")
var ErrNotFound = errors.New("key could not be found")

type MetamorphStore interface {
Get(ctx context.Context, key []byte) (*StoreData, error)
Expand Down
70 changes: 54 additions & 16 deletions metamorph/store/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ const (
type DynamoDB struct {
client *dynamodb.Client
hostname string
ttl time.Duration
}

func New(client *dynamodb.Client, hostname string) (*DynamoDB, error) {
func New(client *dynamodb.Client, hostname string, timeToLive time.Duration) (*DynamoDB, error) {
repo := &DynamoDB{
client: client,
hostname: hostname,
ttl: timeToLive,
}

err := initialize(context.Background(), repo)
Expand Down Expand Up @@ -139,7 +141,18 @@ func (ddb *DynamoDB) CreateTransactionsTable(ctx context.Context) error {
WriteCapacityUnits: aws.Int64(10),
},
})
if err != nil {
return err
}

ttlInput := dynamodb.UpdateTimeToLiveInput{
TableName: aws.String("transactions"),
TimeToLiveSpecification: &types.TimeToLiveSpecification{
Enabled: aws.Bool(true),
AttributeName: aws.String("ttl"),
},
}
_, err = ddb.client.UpdateTimeToLive(ctx, &ttlInput)
if err != nil {
return err
}
Expand All @@ -161,7 +174,8 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error {
{
AttributeName: aws.String("block_hash"),
KeyType: types.KeyTypeHash,
}},
},
},
TableName: aws.String("blocks"),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
Expand All @@ -171,6 +185,18 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error {
return err
}

ttlInput := &dynamodb.UpdateTimeToLiveInput{
TableName: aws.String("blocks"),
TimeToLiveSpecification: &types.TimeToLiveSpecification{
Enabled: aws.Bool(true),
AttributeName: aws.String("ttl"),
},
}
_, err := ddb.client.UpdateTimeToLive(ctx, ttlInput)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -224,6 +250,8 @@ func (ddb *DynamoDB) Set(ctx context.Context, key []byte, value *store.StoreData
span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:Set")
defer span.Finish()

ttl := time.Now().Add(ddb.ttl)
value.Ttl = ttl.Unix()
value.LockedBy = ddb.hostname

// marshal input value for new entry
Expand Down Expand Up @@ -483,6 +511,7 @@ func (ddb *DynamoDB) UpdateMined(ctx context.Context, hash *chainhash.Hash, bloc
type BlockItem struct {
Hash []byte `dynamodbav:"block_hash"`
ProcessedAt string `dynamodbav:"processed_at"`
Ttl int64 `dynamodbav:"ttl"`
}

func (ddb *DynamoDB) GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) {
Expand All @@ -501,32 +530,35 @@ func (ddb *DynamoDB) GetBlockProcessed(ctx context.Context, blockHash *chainhash
response, err := ddb.client.GetItem(ctx, &dynamodb.GetItemInput{
Key: val, TableName: aws.String("blocks"),
})

if err != nil {
span.SetTag(string(ext.Error), true)
span.LogFields(log.Error(err))
return nil, err
}

var blockItem BlockItem
err = attributevalue.UnmarshalMap(response.Item, &blockItem)
if err != nil {
span.SetTag(string(ext.Error), true)
span.LogFields(log.Error(err))
return nil, err
}

var processedAtTime time.Time
if blockItem.ProcessedAt != "" {
processedAtTime, err = time.Parse(time.RFC3339, blockItem.ProcessedAt)
if len(response.Item) != 0 {
var blockItem BlockItem
err = attributevalue.UnmarshalMap(response.Item, &blockItem)
if err != nil {
span.SetTag(string(ext.Error), true)
span.LogFields(log.Error(err))
return nil, err
}

var processedAtTime time.Time
if blockItem.ProcessedAt != "" {
processedAtTime, err = time.Parse(time.RFC3339, blockItem.ProcessedAt)
if err != nil {
span.SetTag(string(ext.Error), true)
span.LogFields(log.Error(err))
return nil, err
}
}

return &processedAtTime, nil
}

return &processedAtTime, nil
return nil, store.ErrNotFound
}

func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) error {
Expand All @@ -538,7 +570,13 @@ func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash
span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:SetBlockProcessed")
defer span.Finish()

blockItem := BlockItem{Hash: blockHash.CloneBytes(), ProcessedAt: time.Now().UTC().Format(time.RFC3339)}
ttl := time.Now().Add(ddb.ttl)

blockItem := BlockItem{
Hash: blockHash.CloneBytes(),
ProcessedAt: time.Now().UTC().Format(time.RFC3339),
Ttl: ttl.Unix(),
}
item, err := attributevalue.MarshalMap(blockItem)
if err != nil {
span.SetTag(string(ext.Error), true)
Expand Down
29 changes: 23 additions & 6 deletions metamorph/store/dynamodb/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dynamodb
import (
"context"
"encoding/hex"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -86,7 +85,7 @@ func NewDynamoDBIntegrationTestRepo(t *testing.T) (*DynamoDB, *dynamodb.Client)
})
require.NoError(t, err)

repo, err := New(client, hostname)
repo, err := New(client, hostname, 1*time.Hour)
require.NoError(t, err)

tables, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{})
Expand Down Expand Up @@ -152,12 +151,10 @@ func TestDynamoDBIntegration(t *testing.T) {
_, isAttrErr := err.(*attributevalue.UnmarshalTypeError)
require.True(t, isAttrErr)
})
t.Run("set", func(t *testing.T) {
t.Run("set/get", func(t *testing.T) {
err := repo.Set(ctx, nil, dataStatusSent)
require.NoError(t, err)
})

t.Run("get", func(t *testing.T) {
returnedData, err := repo.Get(ctx, TX1Hash[:])
require.NoError(t, err)
require.Equal(t, dataStatusSent, returnedData)
Expand Down Expand Up @@ -241,6 +238,26 @@ func TestDynamoDBIntegration(t *testing.T) {
err := repo.Del(ctx, TX1Hash[:])
require.NoError(t, err)
_, err = repo.Get(ctx, TX1Hash[:])
require.True(t, errors.Is(store.ErrNotFound, err))
require.ErrorIs(t, err, store.ErrNotFound)
})

t.Run("blocks - time to live = -1 hour", func(t *testing.T) {
repo.ttl = time.Minute * -10
err := repo.SetBlockProcessed(ctx, Block1Hash)
require.NoError(t, err)

time.Sleep(2 * time.Second) // give DynamoDB time to delete
_, err = repo.GetBlockProcessed(ctx, Block1Hash)
require.ErrorIs(t, err, store.ErrNotFound)
})

t.Run("transactions - time to live = -1 hour", func(t *testing.T) {
repo.ttl = time.Minute * -10
err := repo.Set(ctx, nil, dataStatusSent)
require.NoError(t, err)

time.Sleep(10 * time.Second) // give DynamoDB time to delete
_, err = repo.Get(ctx, TX1Hash[:])
require.ErrorIs(t, err, store.ErrNotFound)
})
}

0 comments on commit a1df037

Please sign in to comment.