From 1a1bcd89675e0546fdfa05721d1e0149f9f248ee Mon Sep 17 00:00:00 2001 From: Nozim Mehrubonov Date: Mon, 20 Nov 2023 13:28:48 +0000 Subject: [PATCH 1/5] Add ttl field for dynamodb with tests --- metamorph/store/dynamodb/dynamodb.go | 22 +++++- metamorph/store/dynamodb/ttl_test.go | 112 +++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 metamorph/store/dynamodb/ttl_test.go diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index 9ec00d4d4..4b7f8e0b4 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -139,7 +139,18 @@ func (ddb *DynamoDB) CreateTransactionsTable(ctx context.Context) error { WriteCapacityUnits: aws.Int64(10), }, }) + if err != nil { + return err + } + ttltx := dynamodb.UpdateTimeToLiveInput{ + TableName: aws.String("transactions"), + TimeToLiveSpecification: &types.TimeToLiveSpecification{ + Enabled: aws.Bool(true), + AttributeName: aws.String("ttl"), + }, + } + _, err = ddb.client.UpdateTimeToLive(ctx, &ttltx) if err != nil { return err } @@ -156,12 +167,21 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error { AttributeName: aws.String("block_hash"), AttributeType: types.ScalarAttributeTypeB, }, + { + AttributeName: aws.String("ttl"), + AttributeType: types.ScalarAttributeTypeS, + }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("block_hash"), KeyType: types.KeyTypeHash, - }}, + }, + { + AttributeName: aws.String("ttl"), + KeyType: types.KeyTypeRange, + }, + }, TableName: aws.String("blocks"), ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), diff --git a/metamorph/store/dynamodb/ttl_test.go b/metamorph/store/dynamodb/ttl_test.go new file mode 100644 index 000000000..8626c90af --- /dev/null +++ b/metamorph/store/dynamodb/ttl_test.go @@ -0,0 +1,112 @@ +package dynamodb + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/ory/dockertest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTimeToLive(t *testing.T) { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + resource, err := pool.Run("amazon/dynamodb-local", "latest", []string{}) + require.NoError(t, err) + + t.Cleanup(func() { + err := pool.Purge(resource) + require.NoError(t, err) + }) + + resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: host + resource.GetPort(port), + SigningRegion: regionUsEast1, + }, nil + }) + cfg, err := config.LoadDefaultConfig( + context.TODO(), + config.WithEndpointResolverWithOptions(resolver), + config.WithCredentialsProvider(credentials.StaticCredentialsProvider{ + Value: aws.Credentials{ + AccessKeyID: "dummy", SecretAccessKey: "dummy", SessionToken: "dummy", + Source: "Hard-coded credentials; values are irrelevant for local DynamoDB", + }, + }), + ) + require.NoError(t, err) + + client := dynamodb.NewFromConfig(cfg) + + pool.MaxWait = 60 * time.Second + + err = pool.Retry(func() error { + _, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) + return err + }) + require.NoError(t, err) + + _, err = New(client, hostname) + require.NoError(t, err) + + tables, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) + require.NoError(t, err) + require.ElementsMatch(t, []string{"blocks", "transactions"}, tables.TableNames) + testkey := []byte("testkey") + + //sdata := &store.StoreData{ + // Hash: TX1Hash, + // Status: metamorph_api.Status_SENT_TO_NETWORK, + // CallbackUrl: "http://callback.com", + // CallbackToken: "abcd", + // MerkleProof: false, + // RawTx: TX1RawBytes, + // LockedBy: hostname, + // StoredAt: time.Now(), + //} + //require.NoError(t, repo.Set(context.TODO(), testkey, sdata)) + //_, err = repo.Get(context.TODO(), testkey) + // this test should not pass. Get should return the item that was inserted previously + //assert.Equal(t, store.ErrNotFound, err) + + _, err = client.PutItem(context.TODO(), &dynamodb.PutItemInput{ + TableName: aws.String("transactions"), + Item: map[string]types.AttributeValue{ + "tx_hash": &types.AttributeValueMemberB{Value: testkey}, + "locked_by": &types.AttributeValueMemberS{Value: "testlocked"}, + "ttl": &types.AttributeValueMemberS{Value: "1"}, + }, + }) + require.NoError(t, err) + + getitem, err := client.GetItem(context.TODO(), &dynamodb.GetItemInput{ + TableName: aws.String("transactions"), + Key: map[string]types.AttributeValue{ + "tx_hash": &types.AttributeValueMemberB{Value: testkey}, + }}) + require.NoError(t, err) + + assert.Equal(t, "testlocked", getitem.Item["locked_by"].(*types.AttributeValueMemberS).Value) + + time.Sleep(10 * time.Second) + result, err := client.GetItem(context.TODO(), &dynamodb.GetItemInput{ + TableName: aws.String("transactions"), + Key: map[string]types.AttributeValue{ + "tx_hash": &types.AttributeValueMemberB{Value: testkey}, + }}) + + require.NotNil(t, err) + fmt.Println(result) + +} From 17129759f70ac0ca0271c9690bfc6d32c4fc9b53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Mon, 20 Nov 2023 16:47:48 +0100 Subject: [PATCH 2/5] BAARC-9: DynamoDB ttl setting --- cmd/metamorph.go | 11 +++++++++-- metamorph/store/dynamodb/dynamodb.go | 19 +++++++++---------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/cmd/metamorph.go b/cmd/metamorph.go index 5aa42b4d2..6fc824a78 100644 --- a/cmd/metamorph.go +++ b/cmd/metamorph.go @@ -38,8 +38,9 @@ import ( ) const ( - DbModeBadger = "badger" - DbModeDynamoDB = "dynamodb" + DbModeBadger = "badger" + DbModeDynamoDB = "dynamodb" + dataRetentionPeriodDaysDefault = 14 ) func StartMetamorph(logger utils.Logger) (func(), error) { @@ -357,9 +358,15 @@ func NewStore(dbMode string, folder string) (s store.MetamorphStore, err error) return nil, err } + ttlDays := viper.GetInt("metamorph.dataRetentionPeriodDays") + if ttlDays == 0 { + ttlDays = dataRetentionPeriodDaysDefault + } + s, err = dynamodb.New( awsdynamodb.NewFromConfig(cfg), hostname, + time.Duration(ttlDays)*24*time.Hour, ) if err != nil { return nil, err diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index 4b7f8e0b4..99c9237c1 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -32,12 +32,14 @@ const ( type DynamoDB struct { client *dynamodb.Client hostname string + ttl time.Duration } -func New(client *dynamodb.Client, hostname string) (*DynamoDB, error) { +func New(client *dynamodb.Client, hostname string, timeToLive time.Duration) (*DynamoDB, error) { repo := &DynamoDB{ client: client, hostname: hostname, + ttl: timeToLive, } err := initialize(context.Background(), repo) @@ -167,20 +169,12 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error { AttributeName: aws.String("block_hash"), AttributeType: types.ScalarAttributeTypeB, }, - { - AttributeName: aws.String("ttl"), - AttributeType: types.ScalarAttributeTypeS, - }, }, KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("block_hash"), KeyType: types.KeyTypeHash, }, - { - AttributeName: aws.String("ttl"), - KeyType: types.KeyTypeRange, - }, }, TableName: aws.String("blocks"), ProvisionedThroughput: &types.ProvisionedThroughput{ @@ -503,6 +497,7 @@ func (ddb *DynamoDB) UpdateMined(ctx context.Context, hash *chainhash.Hash, bloc type BlockItem struct { Hash []byte `dynamodbav:"block_hash"` ProcessedAt string `dynamodbav:"processed_at"` + Ttl int64 `dynamodbav:"ttl"` } func (ddb *DynamoDB) GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) { @@ -558,7 +553,11 @@ func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:SetBlockProcessed") defer span.Finish() - blockItem := BlockItem{Hash: blockHash.CloneBytes(), ProcessedAt: time.Now().UTC().Format(time.RFC3339)} + blockItem := BlockItem{ + Hash: blockHash.CloneBytes(), + ProcessedAt: time.Now().UTC().Format(time.RFC3339), + Ttl: time.Now().Add(ddb.ttl).Unix(), + } item, err := attributevalue.MarshalMap(blockItem) if err != nil { span.SetTag(string(ext.Error), true) From 0f19516383d32bb45c88b4154b021232ba28ab72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Mon, 20 Nov 2023 18:11:15 +0100 Subject: [PATCH 3/5] BAARC-9: Enable time to live for blocks and transactions --- metamorph/store/Interface.go | 3 +- metamorph/store/dynamodb/dynamodb.go | 51 +++++++++++++++++++--------- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/metamorph/store/Interface.go b/metamorph/store/Interface.go index 3f42554a3..cce2056e4 100644 --- a/metamorph/store/Interface.go +++ b/metamorph/store/Interface.go @@ -26,6 +26,7 @@ type StoreData struct { CallbackToken string `dynamodbav:"callback_token"` RejectReason string `dynamodbav:"reject_reason"` LockedBy string `dynamodbav:"locked_by"` + Ttl int64 `dynamodbav:"ttl"` } func (sd *StoreData) EncodeToBytes() ([]byte, error) { @@ -200,7 +201,7 @@ func DecodeFromBytes(b []byte) (*StoreData, error) { return sd, nil } -var ErrNotFound = errors.New("txid could not be found") +var ErrNotFound = errors.New("key could not be found") type MetamorphStore interface { Get(ctx context.Context, key []byte) (*StoreData, error) diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index 99c9237c1..ac56fdd70 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -145,14 +145,14 @@ func (ddb *DynamoDB) CreateTransactionsTable(ctx context.Context) error { return err } - ttltx := dynamodb.UpdateTimeToLiveInput{ + ttlInput := dynamodb.UpdateTimeToLiveInput{ TableName: aws.String("transactions"), TimeToLiveSpecification: &types.TimeToLiveSpecification{ Enabled: aws.Bool(true), AttributeName: aws.String("ttl"), }, } - _, err = ddb.client.UpdateTimeToLive(ctx, &ttltx) + _, err = ddb.client.UpdateTimeToLive(ctx, &ttlInput) if err != nil { return err } @@ -185,6 +185,18 @@ func (ddb *DynamoDB) CreateBlocksTable(ctx context.Context) error { return err } + ttlInput := &dynamodb.UpdateTimeToLiveInput{ + TableName: aws.String("blocks"), + TimeToLiveSpecification: &types.TimeToLiveSpecification{ + Enabled: aws.Bool(true), + AttributeName: aws.String("ttl"), + }, + } + _, err := ddb.client.UpdateTimeToLive(ctx, ttlInput) + if err != nil { + return err + } + return nil } @@ -238,6 +250,8 @@ func (ddb *DynamoDB) Set(ctx context.Context, key []byte, value *store.StoreData span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:Set") defer span.Finish() + ttl := time.Now().Add(ddb.ttl) + value.Ttl = ttl.Unix() value.LockedBy = ddb.hostname // marshal input value for new entry @@ -516,32 +530,35 @@ func (ddb *DynamoDB) GetBlockProcessed(ctx context.Context, blockHash *chainhash response, err := ddb.client.GetItem(ctx, &dynamodb.GetItemInput{ Key: val, TableName: aws.String("blocks"), }) - if err != nil { span.SetTag(string(ext.Error), true) span.LogFields(log.Error(err)) return nil, err } - var blockItem BlockItem - err = attributevalue.UnmarshalMap(response.Item, &blockItem) - if err != nil { - span.SetTag(string(ext.Error), true) - span.LogFields(log.Error(err)) - return nil, err - } - - var processedAtTime time.Time - if blockItem.ProcessedAt != "" { - processedAtTime, err = time.Parse(time.RFC3339, blockItem.ProcessedAt) + if len(response.Item) != 0 { + var blockItem BlockItem + err = attributevalue.UnmarshalMap(response.Item, &blockItem) if err != nil { span.SetTag(string(ext.Error), true) span.LogFields(log.Error(err)) return nil, err } + + var processedAtTime time.Time + if blockItem.ProcessedAt != "" { + processedAtTime, err = time.Parse(time.RFC3339, blockItem.ProcessedAt) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return nil, err + } + } + + return &processedAtTime, nil } - return &processedAtTime, nil + return nil, store.ErrNotFound } func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) error { @@ -553,10 +570,12 @@ func (ddb *DynamoDB) SetBlockProcessed(ctx context.Context, blockHash *chainhash span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:SetBlockProcessed") defer span.Finish() + ttl := time.Now().Add(ddb.ttl) + blockItem := BlockItem{ Hash: blockHash.CloneBytes(), ProcessedAt: time.Now().UTC().Format(time.RFC3339), - Ttl: time.Now().Add(ddb.ttl).Unix(), + Ttl: ttl.Unix(), } item, err := attributevalue.MarshalMap(blockItem) if err != nil { From 13664daad53bdc7f569df616ab5ce0030ebd0f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Mon, 20 Nov 2023 18:11:27 +0100 Subject: [PATCH 4/5] BAARC-9: Add test cases --- metamorph/store/dynamodb/dynamodb_test.go | 29 ++++-- metamorph/store/dynamodb/ttl_test.go | 112 ---------------------- 2 files changed, 23 insertions(+), 118 deletions(-) delete mode 100644 metamorph/store/dynamodb/ttl_test.go diff --git a/metamorph/store/dynamodb/dynamodb_test.go b/metamorph/store/dynamodb/dynamodb_test.go index 9c31c45d6..24bdf5b75 100644 --- a/metamorph/store/dynamodb/dynamodb_test.go +++ b/metamorph/store/dynamodb/dynamodb_test.go @@ -3,7 +3,6 @@ package dynamodb import ( "context" "encoding/hex" - "errors" "testing" "time" @@ -86,7 +85,7 @@ func NewDynamoDBIntegrationTestRepo(t *testing.T) (*DynamoDB, *dynamodb.Client) }) require.NoError(t, err) - repo, err := New(client, hostname) + repo, err := New(client, hostname, time.Hour*1) require.NoError(t, err) tables, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) @@ -152,12 +151,10 @@ func TestDynamoDBIntegration(t *testing.T) { _, isAttrErr := err.(*attributevalue.UnmarshalTypeError) require.True(t, isAttrErr) }) - t.Run("set", func(t *testing.T) { + t.Run("set/get", func(t *testing.T) { err := repo.Set(ctx, nil, dataStatusSent) require.NoError(t, err) - }) - t.Run("get", func(t *testing.T) { returnedData, err := repo.Get(ctx, TX1Hash[:]) require.NoError(t, err) require.Equal(t, dataStatusSent, returnedData) @@ -241,6 +238,26 @@ func TestDynamoDBIntegration(t *testing.T) { err := repo.Del(ctx, TX1Hash[:]) require.NoError(t, err) _, err = repo.Get(ctx, TX1Hash[:]) - require.True(t, errors.Is(store.ErrNotFound, err)) + require.ErrorIs(t, err, store.ErrNotFound) + }) + + t.Run("blocks - time to live = -1 hour", func(t *testing.T) { + repo.ttl = time.Minute * -10 + err := repo.SetBlockProcessed(ctx, Block1Hash) + require.NoError(t, err) + + time.Sleep(2 * time.Second) // give DynamoDB time to delete + _, err = repo.GetBlockProcessed(ctx, Block1Hash) + require.ErrorIs(t, err, store.ErrNotFound) + }) + + t.Run("transactions - time to live = -1 hour", func(t *testing.T) { + repo.ttl = time.Minute * -10 + err := repo.Set(ctx, nil, dataStatusSent) + require.NoError(t, err) + + time.Sleep(10 * time.Second) // give DynamoDB time to delete + _, err = repo.Get(ctx, TX1Hash[:]) + require.ErrorIs(t, err, store.ErrNotFound) }) } diff --git a/metamorph/store/dynamodb/ttl_test.go b/metamorph/store/dynamodb/ttl_test.go deleted file mode 100644 index 8626c90af..000000000 --- a/metamorph/store/dynamodb/ttl_test.go +++ /dev/null @@ -1,112 +0,0 @@ -package dynamodb - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/ory/dockertest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestTimeToLive(t *testing.T) { - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - resource, err := pool.Run("amazon/dynamodb-local", "latest", []string{}) - require.NoError(t, err) - - t.Cleanup(func() { - err := pool.Purge(resource) - require.NoError(t, err) - }) - - resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: host + resource.GetPort(port), - SigningRegion: regionUsEast1, - }, nil - }) - cfg, err := config.LoadDefaultConfig( - context.TODO(), - config.WithEndpointResolverWithOptions(resolver), - config.WithCredentialsProvider(credentials.StaticCredentialsProvider{ - Value: aws.Credentials{ - AccessKeyID: "dummy", SecretAccessKey: "dummy", SessionToken: "dummy", - Source: "Hard-coded credentials; values are irrelevant for local DynamoDB", - }, - }), - ) - require.NoError(t, err) - - client := dynamodb.NewFromConfig(cfg) - - pool.MaxWait = 60 * time.Second - - err = pool.Retry(func() error { - _, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) - return err - }) - require.NoError(t, err) - - _, err = New(client, hostname) - require.NoError(t, err) - - tables, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{}) - require.NoError(t, err) - require.ElementsMatch(t, []string{"blocks", "transactions"}, tables.TableNames) - testkey := []byte("testkey") - - //sdata := &store.StoreData{ - // Hash: TX1Hash, - // Status: metamorph_api.Status_SENT_TO_NETWORK, - // CallbackUrl: "http://callback.com", - // CallbackToken: "abcd", - // MerkleProof: false, - // RawTx: TX1RawBytes, - // LockedBy: hostname, - // StoredAt: time.Now(), - //} - //require.NoError(t, repo.Set(context.TODO(), testkey, sdata)) - //_, err = repo.Get(context.TODO(), testkey) - // this test should not pass. Get should return the item that was inserted previously - //assert.Equal(t, store.ErrNotFound, err) - - _, err = client.PutItem(context.TODO(), &dynamodb.PutItemInput{ - TableName: aws.String("transactions"), - Item: map[string]types.AttributeValue{ - "tx_hash": &types.AttributeValueMemberB{Value: testkey}, - "locked_by": &types.AttributeValueMemberS{Value: "testlocked"}, - "ttl": &types.AttributeValueMemberS{Value: "1"}, - }, - }) - require.NoError(t, err) - - getitem, err := client.GetItem(context.TODO(), &dynamodb.GetItemInput{ - TableName: aws.String("transactions"), - Key: map[string]types.AttributeValue{ - "tx_hash": &types.AttributeValueMemberB{Value: testkey}, - }}) - require.NoError(t, err) - - assert.Equal(t, "testlocked", getitem.Item["locked_by"].(*types.AttributeValueMemberS).Value) - - time.Sleep(10 * time.Second) - result, err := client.GetItem(context.TODO(), &dynamodb.GetItemInput{ - TableName: aws.String("transactions"), - Key: map[string]types.AttributeValue{ - "tx_hash": &types.AttributeValueMemberB{Value: testkey}, - }}) - - require.NotNil(t, err) - fmt.Println(result) - -} From 0e8608c42c98eafa802d25f5abfe0ade157a758f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= Date: Tue, 21 Nov 2023 15:31:01 +0100 Subject: [PATCH 5/5] BAARC-9: Feedback --- metamorph/store/dynamodb/dynamodb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metamorph/store/dynamodb/dynamodb_test.go b/metamorph/store/dynamodb/dynamodb_test.go index 24bdf5b75..abb00ce3f 100644 --- a/metamorph/store/dynamodb/dynamodb_test.go +++ b/metamorph/store/dynamodb/dynamodb_test.go @@ -85,7 +85,7 @@ func NewDynamoDBIntegrationTestRepo(t *testing.T) (*DynamoDB, *dynamodb.Client) }) require.NoError(t, err) - repo, err := New(client, hostname, time.Hour*1) + repo, err := New(client, hostname, 1*time.Hour) require.NoError(t, err) tables, err := client.ListTables(context.Background(), &dynamodb.ListTablesInput{})