Skip to content

Commit

Permalink
chore: rebase to master
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 9, 2024
1 parent 8babd1d commit af1d9fe
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 5 deletions.
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str

// QueryIndexOrderWithLimit returns all items in the index that match the given key
// If forward is true, the items are returned in ascending order
func (c *Client) QueryIndexOrderWithLimit(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, forward bool, limit int32) ([]Item, error) {
func (c *Client) QueryIndexOrderWithLimit(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues, forward bool, limit int32) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
Expand Down
8 changes: 4 additions & 4 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func TestQueryIndexOrderWithLimit(t *testing.T) {
assert.Len(t, unprocessed, 0)

// Test forward order with limit
queryResult, err := dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
queryResult, err := dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
}, true, 10)
assert.NoError(t, err)
Expand All @@ -637,7 +637,7 @@ func TestQueryIndexOrderWithLimit(t *testing.T) {
}

// Test reverse order with limit
queryResult, err = dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
queryResult, err = dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
}, false, 10)
assert.NoError(t, err)
Expand All @@ -648,14 +648,14 @@ func TestQueryIndexOrderWithLimit(t *testing.T) {
}

// Test with a smaller limit
queryResult, err = dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
queryResult, err = dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
}, true, 5)
assert.NoError(t, err)
assert.Len(t, queryResult, 5)

// Test with a limit larger than the number of items
queryResult, err = dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
queryResult, err = dynamoClient.QueryIndexOrderWithLimit(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpressionValues{
":status": &types.AttributeValueMemberN{Value: "0"},
}, true, 50)
assert.NoError(t, err)
Expand Down
233 changes: 233 additions & 0 deletions core/meterer/offchain_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package meterer

Check failure on line 1 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

: # github.com/Layr-Labs/eigenda/core/meterer

import (
"context"
"errors"
"fmt"
"strconv"
"time"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

type OffchainStore struct {
dynamoClient *commondynamodb.Client
reservationTableName string
onDemandTableName string
globalBinTableName string
logger logging.Logger
// TODO: add maximum storage for both tables
}

func NewOffchainStore(
cfg commonaws.ClientConfig,
reservationTableName string,
onDemandTableName string,
globalBinTableName string,
logger logging.Logger,
) (OffchainStore, error) {

dynamoClient, err := commondynamodb.NewClient(cfg, logger)
if err != nil {
return OffchainStore{}, err
}

err = dynamoClient.TableCheck(context.Background(), reservationTableName)

Check failure on line 39 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)

Check failure on line 39 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)

Check failure on line 39 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Unit Tests

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)
if err != nil {
return OffchainStore{}, err
}
err = dynamoClient.TableCheck(context.Background(), onDemandTableName)

Check failure on line 43 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)

Check failure on line 43 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)

Check failure on line 43 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Unit Tests

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)
if err != nil {
return OffchainStore{}, err
}
err = dynamoClient.TableCheck(context.Background(), globalBinTableName)

Check failure on line 47 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)

Check failure on line 47 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)

Check failure on line 47 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Unit Tests

dynamoClient.TableCheck undefined (type *"github.com/Layr-Labs/eigenda/common/aws/dynamodb".Client has no field or method TableCheck)
if err != nil {
return OffchainStore{}, err
}
//TODO: add a separate thread to periodically clean up the tables
// delete expired reservation bins (<i-1) and old on-demand payments (retain max N payments)
return OffchainStore{
dynamoClient: dynamoClient,
reservationTableName: reservationTableName,
onDemandTableName: onDemandTableName,
globalBinTableName: globalBinTableName,
logger: logger,
}, nil
}

type ReservationBin struct {
AccountID string
BinIndex uint32
BinUsage uint32
UpdatedAt time.Time
}

type PaymentTuple struct {
CumulativePayment uint64
DataLength uint32
}

type GlobalBin struct {
BinIndex uint32
BinUsage uint64
UpdatedAt time.Time
}

func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID string, binIndex uint64, size uint64) (uint64, error) {
key := map[string]types.AttributeValue{
"AccountID": &types.AttributeValueMemberS{Value: accountID},
"BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)},
}

res, err := s.dynamoClient.IncrementBy(ctx, s.reservationTableName, key, "BinUsage", size)
if err != nil {
return 0, fmt.Errorf("failed to increment bin usage: %w", err)
}

binUsage, ok := res["BinUsage"]
if !ok {
return 0, errors.New("BinUsage is not present in the response")
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
if !ok {
return 0, fmt.Errorf("unexpected type for BinUsage: %T", binUsage)
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse BinUsage: %w", err)
}

return binUsageValue, nil
}

func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, size uint64) (uint64, error) {
key := map[string]types.AttributeValue{
"BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)},
}

res, err := s.dynamoClient.IncrementBy(ctx, s.globalBinTableName, key, "BinUsage", size)
if err != nil {
return 0, err
}

binUsage, ok := res["BinUsage"]
if !ok {
return 0, nil
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
if !ok {
return 0, nil
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
if err != nil {
return 0, err
}

return binUsageValue, nil
}

func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, paymentMetadata core.PaymentMetadata, symbolsCharged uint32) error {

Check failure on line 137 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

undefined: core.PaymentMetadata (typecheck)

Check failure on line 137 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Linter

undefined: core.PaymentMetadata) (typecheck)

Check failure on line 137 in core/meterer/offchain_store.go

View workflow job for this annotation

GitHub Actions / Unit Tests

undefined: core.PaymentMetadata
result, err := s.dynamoClient.GetItem(ctx, s.onDemandTableName,
commondynamodb.Item{
"AccountID": &types.AttributeValueMemberS{Value: paymentMetadata.AccountID},
"CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(paymentMetadata.CumulativePayment, 10)},
},
)
if err != nil {
fmt.Println("new payment record: %w", err)
}
if result != nil {
return fmt.Errorf("exact payment already exists")
}
err = s.dynamoClient.PutItem(ctx, s.onDemandTableName,
commondynamodb.Item{
"AccountID": &types.AttributeValueMemberS{Value: paymentMetadata.AccountID},
"CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(paymentMetadata.CumulativePayment, 10)},
"DataLength": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(symbolsCharged), 10)},
},
)

if err != nil {
return fmt.Errorf("failed to add payment: %w", err)
}
return nil
}

// RemoveOnDemandPayment removes a specific payment from the list for a specific account
func (s *OffchainStore) RemoveOnDemandPayment(ctx context.Context, accountID string, payment uint64) error {
err := s.dynamoClient.DeleteItem(ctx, s.onDemandTableName,
commondynamodb.Key{
"AccountID": &types.AttributeValueMemberS{Value: accountID},
"CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(payment, 10)},
},
)

if err != nil {
return fmt.Errorf("failed to remove payment: %w", err)
}

return nil
}

// GetRelevantOnDemandRecords gets previous cumulative payment, next cumulative payment, blob size of next payment
// The queries are done sequentially instead of one-go for efficient querying and would not cause race condition errors for honest requests
func (s *OffchainStore) GetRelevantOnDemandRecords(ctx context.Context, accountID string, cumulativePayment uint64) (uint64, uint64, uint32, error) {
// Fetch the largest entry smaller than the given cumulativePayment
smallerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex",
"AccountID = :account AND CumulativePayments < :cumulativePayment",
commondynamodb.ExpressionValues{
":account": &types.AttributeValueMemberS{Value: accountID},
":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)},
},
false, // Retrieve results in descending order for the largest smaller amount
1,
)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to query smaller payments for account: %w", err)
}

var prevPayment uint64
if len(smallerResult) > 0 {
prevPayment, err = strconv.ParseUint(smallerResult[0]["CumulativePayments"].(*types.AttributeValueMemberN).Value, 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to parse previous payment: %w", err)
}
}

// Fetch the smallest entry larger than the given cumulativePayment
largerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex",
"AccountID = :account AND CumulativePayments > :cumulativePayment",
commondynamodb.ExpressionValues{
":account": &types.AttributeValueMemberS{Value: accountID},
":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)},
},
true, // Retrieve results in ascending order for the smallest greater amount
1,
)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to query the next payment for account: %w", err)
}
var nextPayment uint64
var nextDataLength uint32
if len(largerResult) > 0 {
nextPayment, err = strconv.ParseUint(largerResult[0]["CumulativePayments"].(*types.AttributeValueMemberN).Value, 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to parse next payment: %w", err)
}
dataLength, err := strconv.ParseUint(largerResult[0]["DataLength"].(*types.AttributeValueMemberN).Value, 10, 32)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to parse blob size: %w", err)
}
nextDataLength = uint32(dataLength)
}

return prevPayment, nextPayment, nextDataLength, nil
}
Loading

0 comments on commit af1d9fe

Please sign in to comment.