diff --git a/cmd/metamorph.go b/cmd/metamorph.go index 5aa42b4d2..6fc824a78 100644 --- a/cmd/metamorph.go +++ b/cmd/metamorph.go @@ -38,8 +38,9 @@ import ( ) const ( - DbModeBadger = "badger" - DbModeDynamoDB = "dynamodb" + DbModeBadger = "badger" + DbModeDynamoDB = "dynamodb" + dataRetentionPeriodDaysDefault = 14 ) func StartMetamorph(logger utils.Logger) (func(), error) { @@ -357,9 +358,15 @@ func NewStore(dbMode string, folder string) (s store.MetamorphStore, err error) return nil, err } + ttlDays := viper.GetInt("metamorph.dataRetentionPeriodDays") + if ttlDays == 0 { + ttlDays = dataRetentionPeriodDaysDefault + } + s, err = dynamodb.New( awsdynamodb.NewFromConfig(cfg), hostname, + time.Duration(ttlDays)*24*time.Hour, ) if err != nil { return nil, err diff --git a/metamorph/store/Interface.go b/metamorph/store/Interface.go index 3f42554a3..cce2056e4 100644 --- a/metamorph/store/Interface.go +++ b/metamorph/store/Interface.go @@ -26,6 +26,7 @@ type StoreData struct { CallbackToken string `dynamodbav:"callback_token"` RejectReason string `dynamodbav:"reject_reason"` LockedBy string `dynamodbav:"locked_by"` + Ttl int64 `dynamodbav:"ttl"` } func (sd *StoreData) EncodeToBytes() ([]byte, error) { @@ -200,7 +201,7 @@ func DecodeFromBytes(b []byte) (*StoreData, error) { return sd, nil } -var ErrNotFound = errors.New("txid could not be found") +var ErrNotFound = errors.New("key could not be found") type MetamorphStore interface { Get(ctx context.Context, key []byte) (*StoreData, error) diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index 9ec00d4d4..ac56fdd70 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -32,12 +32,14 @@ const ( type DynamoDB struct { client *dynamodb.Client hostname string + ttl time.Duration } -func New(client *dynamodb.Client, hostname string) (*DynamoDB, error) { +func New(client *dynamodb.Client, hostname string, timeToLive time.Duration) (*DynamoDB, error) { repo := &DynamoDB{ client: client, hostname: hostname, + ttl: timeToLive, } err := initialize(context.Background(), repo) @@ -139,7 +141,18 @@ func (ddb *DynamoDB) CreateTransactionsTable(ctx context.Context) error { WriteCapacityUnits: aws.Int64(10), }, }) + if err != nil { + return err + } + ttlInput := dynamodb.UpdateTimeToLiveInput{ + TableName: aws.String("transactions"), + TimeToLiveSpecification: &types.TimeToLiveSpecification{ + Enabled: aws.Bool(true), + AttributeName: aws.String("ttl"), + }, + } + _, err = ddb.client.UpdateTimeToLive(ctx, &ttlInput) if err != nil { return err } @@ -161,7 +174,8 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error { { AttributeName: aws.String("block_hash"), KeyType: types.KeyTypeHash, - }}, + }, + }, TableName: aws.String("blocks"), ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), @@ -171,6 +185,18 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error { return err } + ttlInput := &dynamodb.UpdateTimeToLiveInput{ + TableName: aws.String("blocks"), + TimeToLiveSpecification: &types.TimeToLiveSpecification{ + Enabled: aws.Bool(true), + AttributeName: aws.String("ttl"), + }, + } + _, err := ddb.client.UpdateTimeToLive(ctx, ttlInput) + if err != nil { + return err + } + return nil } @@ -224,6 +250,8 @@ func (ddb *DynamoDB) Set(ctx context.Context, key []byte, value *store.StoreData span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:Set") defer span.Finish() + ttl := time.Now().Add(ddb.ttl) + value.Ttl = ttl.Unix() value.LockedBy = ddb.hostname // marshal input value for new entry @@ -483,6 +511,7 @@ func (ddb *DynamoDB) UpdateMined(ctx context.Context, hash *chainhash.Hash, bloc type BlockItem struct { Hash []byte `dynamodbav:"block_hash"` ProcessedAt string `dynamodbav:"processed_at"` + Ttl int64 `dynamodbav:"ttl"` } func (ddb *DynamoDB) GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) { @@ -501,32 +530,35 @@ func (ddb *DynamoDB) GetBlockProcessed(ctx context.Context, blockHash *chainhash response, err := ddb.client.GetItem(ctx, &dynamodb.GetItemInput{ Key: val, TableName: aws.String("blocks"), }) - - if err != nil { - span.SetTag(string(ext.Error), true) - span.LogFields(log.Error(err)) - return nil, err - } - - var blockItem BlockItem - err = attributevalue.UnmarshalMap(response.Item, &blockItem) if err != nil { span.SetTag(string(ext.Error), true) span.LogFields(log.Error(err)) return nil, err } - var processedAtTime time.Time - if blockItem.ProcessedAt != "" { - processedAtTime, err = time.Parse(time.RFC3339, blockItem.ProcessedAt) + if len(response.Item) != 0 { + var blockItem BlockItem + err = attributevalue.UnmarshalMap(response.Item, &blockItem) if err != nil { span.SetTag(string(ext.Error), true) span.LogFields(log.Error(err)) return nil, err } + + var processedAtTime time.Time + if blockItem.ProcessedAt != "" { + processedAtTime, err = time.Parse(time.RFC3339, blockItem.ProcessedAt) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return nil, err + } + } + + return &processedAtTime, nil } - return &processedAtTime, nil + return nil, store.ErrNotFound } func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) error { @@ -538,7 +570,13 @@ func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:SetBlockProcessed") defer span.Finish() - blockItem := BlockItem{Hash: blockHash.CloneBytes(), ProcessedAt: time.Now().UTC().Format(time.RFC3339)} + ttl := time.Now().Add(ddb.ttl) + + blockItem := BlockItem{ + Hash: blockHash.CloneBytes(), + ProcessedAt: time.Now().UTC().Format(time.RFC3339), + Ttl: ttl.Unix(), + } item, err := attributevalue.MarshalMap(blockItem) if err != nil { span.SetTag(string(ext.Error), true) diff --git a/metamorph/store/dynamodb/dynamodb_test.go b/metamorph/store/dynamodb/dynamodb_test.go index 9c31c45d6..abb00ce3f 100644 --- a/metamorph/store/dynamodb/dynamodb_test.go +++ b/metamorph/store/dynamodb/dynamodb_test.go @@ -3,7 +3,6 @@ package dynamodb import ( "context" "encoding/hex" - "errors" "testing" "time" @@ -86,7 +85,7 @@ func NewDynamoDBIntegrationTestRepo(t *testing.T) (*DynamoDB, *dynamodb.Client) }) require.NoError(t, err) - repo, err := New(client, hostname) + repo, err := New(client, hostname, 1*time.Hour) require.NoError(t, err) tables, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) @@ -152,12 +151,10 @@ func TestDynamoDBIntegration(t *testing.T) { _, isAttrErr := err.(*attributevalue.UnmarshalTypeError) require.True(t, isAttrErr) }) - t.Run("set", func(t *testing.T) { + t.Run("set/get", func(t *testing.T) { err := repo.Set(ctx, nil, dataStatusSent) require.NoError(t, err) - }) - t.Run("get", func(t *testing.T) { returnedData, err := repo.Get(ctx, TX1Hash[:]) require.NoError(t, err) require.Equal(t, dataStatusSent, returnedData) @@ -241,6 +238,26 @@ func TestDynamoDBIntegration(t *testing.T) { err := repo.Del(ctx, TX1Hash[:]) require.NoError(t, err) _, err = repo.Get(ctx, TX1Hash[:]) - require.True(t, errors.Is(store.ErrNotFound, err)) + require.ErrorIs(t, err, store.ErrNotFound) + }) + + t.Run("blocks - time to live = -1 hour", func(t *testing.T) { + repo.ttl = time.Minute * -10 + err := repo.SetBlockProcessed(ctx, Block1Hash) + require.NoError(t, err) + + time.Sleep(2 * time.Second) // give DynamoDB time to delete + _, err = repo.GetBlockProcessed(ctx, Block1Hash) + require.ErrorIs(t, err, store.ErrNotFound) + }) + + t.Run("transactions - time to live = -1 hour", func(t *testing.T) { + repo.ttl = time.Minute * -10 + err := repo.Set(ctx, nil, dataStatusSent) + require.NoError(t, err) + + time.Sleep(10 * time.Second) // give DynamoDB time to delete + _, err = repo.Get(ctx, TX1Hash[:]) + require.ErrorIs(t, err, store.ErrNotFound) }) }