Skip to content

Commit

Permalink
refactor: more renaming changes
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 12, 2024
1 parent 3956bc0 commit 9c4a65f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 104 deletions.
75 changes: 35 additions & 40 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (m *Meterer) Start(ctx context.Context) {
for {
select {
case <-ticker.C:
if err := m.OffchainStore.DeleteOldPeriods(ctx, m.CurrentReservationPeriod()-uint32(MinNumPeriods)); err != nil {
now := uint64(time.Now().Unix())
reservationWindow := m.ChainPaymentState.GetReservationWindow()
if err := m.OffchainStore.DeleteOldPeriods(ctx, GetReservationPeriod(now, reservationWindow)-uint32(MinNumPeriods)); err != nil {
m.logger.Error("Failed to prune off-chain state", "error", err)
}
case <-ctx.Done():
Expand Down Expand Up @@ -119,12 +121,12 @@ func (m *Meterer) ServeReservationRequest(ctx context.Context, header core.Payme
return fmt.Errorf("invalid quorum for reservation: %w", err)
}
if !m.ValidateReservationPeriod(header, reservation) {
return fmt.Errorf("invalid bin index for reservation")
return fmt.Errorf("invalid reservation period for reservation")
}

// Update bin usage atomically and check against reservation's data rate as the bin limit
if err := m.IncrementBinUsage(ctx, header, reservation, numSymbols); err != nil {
return fmt.Errorf("bin overflows: %w", err)
// Update reservation period usage atomically and check against reservation's data rate as the period limit
if err := m.IncrementReservatiionPeriodUsage(ctx, header, reservation, numSymbols); err != nil {
return fmt.Errorf("period overflows: %w", err)
}

return nil
Expand All @@ -149,55 +151,48 @@ func (m *Meterer) ValidateQuorum(headerQuorums []uint8, allowedQuorums []uint8)
return nil
}

// ValidateReservationPeriod checks if the provided bin index is valid
// ValidateReservationPeriod checks if the provided reservation period is valid
func (m *Meterer) ValidateReservationPeriod(header core.PaymentMetadata, reservation *core.ActiveReservation) bool {
now := uint64(time.Now().Unix())
reservationWindow := m.ChainPaymentState.GetReservationWindow()
currentReservationPeriod := GetReservationPeriod(now, reservationWindow)
// Valid bin indexes are either the current bin or the previous bin
// Valid reservation periods are either the current period or the previous period
if (header.ReservationPeriod != currentReservationPeriod && header.ReservationPeriod != (currentReservationPeriod-1)) || (GetReservationPeriod(reservation.StartTimestamp, reservationWindow) > header.ReservationPeriod || header.ReservationPeriod > GetReservationPeriod(reservation.EndTimestamp, reservationWindow)) {
return false
}
return true
}

// CurrentReservationPeriod returns the current bin index
func (m *Meterer) CurrentReservationPeriod() uint32 {
now := uint64(time.Now().Unix())
reservationWindow := m.ChainPaymentState.GetReservationWindow()
return GetReservationPeriod(now, reservationWindow)
}

// IncrementBinUsage increments the bin usage atomically and checks for overflow
func (m *Meterer) IncrementBinUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, numSymbols uint) error {
// IncrementReservatiionPeriodUsage increments the reservation period usage atomically and checks for overflow
func (m *Meterer) IncrementReservatiionPeriodUsage(ctx context.Context, header core.PaymentMetadata, reservation *core.ActiveReservation, numSymbols uint) error {
symbolsCharged := m.SymbolsCharged(numSymbols)
newUsage, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.ReservationPeriod), uint64(symbolsCharged))
newUsage, err := m.OffchainStore.UpdateReservationPeriod(ctx, header.AccountID, uint64(header.ReservationPeriod), uint64(symbolsCharged))
if err != nil {
return fmt.Errorf("failed to increment bin usage: %w", err)
return fmt.Errorf("failed to increment reservation period usage: %w", err)
}

// metered usage stays within the bin limit
usageLimit := m.GetReservationBinLimit(reservation)
// metered usage stays within the period limit
usageLimit := m.GetReservationPeriodLimit(reservation)
if newUsage <= usageLimit {
return nil
} else if newUsage-uint64(numSymbols) >= usageLimit {
// metered usage before updating the size already exceeded the limit
return fmt.Errorf("bin has already been filled")
return fmt.Errorf("period has already been filled")
}
if newUsage <= 2*usageLimit && header.ReservationPeriod+2 <= GetReservationPeriod(reservation.EndTimestamp, m.ChainPaymentState.GetReservationWindow()) {
_, err := m.OffchainStore.UpdateReservationBin(ctx, header.AccountID, uint64(header.ReservationPeriod+2), newUsage-usageLimit)
_, err := m.OffchainStore.UpdateReservationPeriod(ctx, header.AccountID, uint64(header.ReservationPeriod+2), newUsage-usageLimit)
if err != nil {
return err
}
return nil
}
return fmt.Errorf("overflow usage exceeds bin limit")
return fmt.Errorf("overflow usage exceeds period limit")
}

// GetReservationPeriod returns the current bin index by chunking time by the bin interval;
// bin interval used by the disperser should be public information
func GetReservationPeriod(timestamp uint64, binInterval uint32) uint32 {
return uint32(timestamp) / binInterval
// GetReservationPeriod returns the current reservation period by chunking time by the period interval;
// period interval used by the disperser should be public information
func GetReservationPeriod(timestamp uint64, periodInterval uint32) uint32 {
return uint32(timestamp) / periodInterval
}

// ServeOnDemandRequest handles the rate limiting logic for incoming requests
Expand Down Expand Up @@ -225,8 +220,8 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM
return fmt.Errorf("invalid on-demand payment: %w", err)
}

// Update bin usage atomically and check against bin capacity
if err := m.IncrementGlobalBinUsage(ctx, uint64(symbolsCharged)); err != nil {
// Update reservation period usage atomically and check against period capacity
if err := m.IncrementGlobalPeriodUsage(ctx, uint64(symbolsCharged)); err != nil {
//TODO: conditionally remove the payment based on the error type (maybe if the error is store-op related)
err := m.OffchainStore.RemoveOnDemandPayment(ctx, header.AccountID, header.CumulativePayment)
if err != nil {
Expand Down Expand Up @@ -283,33 +278,33 @@ func (m *Meterer) SymbolsCharged(numSymbols uint) uint32 {
return uint32(core.RoundUpDivide(uint(numSymbols), uint(m.ChainPaymentState.GetMinNumSymbols()))) * m.ChainPaymentState.GetMinNumSymbols()
}

// ValidateReservationPeriod checks if the provided bin index is valid
// ValidateReservationPeriod checks if the provided reservation period is valid
func (m *Meterer) ValidateGlobalReservationPeriod(header core.PaymentMetadata) (uint32, error) {
// Deterministic function: local clock -> index (1second intervals)
currentReservationPeriod := uint32(time.Now().Unix())

// Valid bin indexes are either the current bin or the previous bin (allow this second or prev sec)
// Valid reservation periods are either the current period or the previous period (allow this second or prev sec)
if header.ReservationPeriod != currentReservationPeriod && header.ReservationPeriod != (currentReservationPeriod-1) {
return 0, fmt.Errorf("invalid bin index for on-demand request")
return 0, fmt.Errorf("invalid reservation period for on-demand request")
}
return currentReservationPeriod, nil
}

// IncrementBinUsage increments the bin usage atomically and checks for overflow
func (m *Meterer) IncrementGlobalBinUsage(ctx context.Context, symbolsCharged uint64) error {
//TODO: edit globalIndex based on bin interval in a subsequent PR
// IncrementReservatiionPeriodUsage increments the reservation period usage atomically and checks for overflow
func (m *Meterer) IncrementGlobalPeriodUsage(ctx context.Context, symbolsCharged uint64) error {
//TODO: edit globalIndex based on period interval in a subsequent PR
globalIndex := uint64(time.Now().Unix())
newUsage, err := m.OffchainStore.UpdateGlobalBin(ctx, globalIndex, symbolsCharged)
newUsage, err := m.OffchainStore.UpdateGlobalPeriod(ctx, globalIndex, symbolsCharged)
if err != nil {
return fmt.Errorf("failed to increment global bin usage: %w", err)
return fmt.Errorf("failed to increment global reservation period usage: %w", err)
}
if newUsage > m.ChainPaymentState.GetGlobalSymbolsPerSecond() {
return fmt.Errorf("global bin usage overflows")
return fmt.Errorf("global reservation period usage overflows")
}
return nil
}

// GetReservationBinLimit returns the bin limit for a given reservation
func (m *Meterer) GetReservationBinLimit(reservation *core.ActiveReservation) uint64 {
// GetReservationPeriodLimit returns the period limit for a given reservation
func (m *Meterer) GetReservationPeriodLimit(reservation *core.ActiveReservation) uint64 {
return reservation.SymbolsPerSecond * uint64(m.ChainPaymentState.GetReservationWindow())
}
20 changes: 10 additions & 10 deletions core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestMetererReservations(t *testing.T) {
ctx := context.Background()
paymentChainState.On("GetReservationWindow", testifymock.Anything).Return(uint32(1), nil)
paymentChainState.On("GetGlobalSymbolsPerSecond", testifymock.Anything).Return(uint64(1009), nil)
paymentChainState.On("GetGlobalRateBinInterval", testifymock.Anything).Return(uint64(1), nil)
paymentChainState.On("GetGlobalRatePeriodInterval", testifymock.Anything).Return(uint64(1), nil)
paymentChainState.On("GetMinNumSymbols", testifymock.Anything).Return(uint32(3), nil)

reservationPeriod := meterer.GetReservationPeriod(uint64(time.Now().Unix()), mt.ChainPaymentState.GetReservationWindow())
Expand All @@ -192,14 +192,14 @@ func TestMetererReservations(t *testing.T) {
err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "quorum number mismatch")

// overwhelming bin overflow for empty bins
// overwhelming period overflow for empty periods
header = createPaymentHeader(reservationPeriod-1, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, 10, quoromNumbers)
assert.NoError(t, err)
// overwhelming bin overflow for empty bins
// overwhelming period overflow for empty periods
header = createPaymentHeader(reservationPeriod-1, big.NewInt(0), accountID2)
err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers)
assert.ErrorContains(t, err, "overflow usage exceeds bin limit")
assert.ErrorContains(t, err, "overflow usage exceeds period limit")

// test non-existent account
unregisteredUser, err := crypto.GenerateKey()
Expand All @@ -211,12 +211,12 @@ func TestMetererReservations(t *testing.T) {
err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2})
assert.ErrorContains(t, err, "failed to get active reservation by account: reservation not found")

// test invalid bin index
// test invalid period
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID1)
err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers)
assert.ErrorContains(t, err, "invalid bin index for reservation")
assert.ErrorContains(t, err, "invalid period for reservation")

// test bin usage metering
// test period usage metering
symbolLength := uint(20)
requiredLength := uint(21) // 21 should be charged for length of 20 since minNumSymbols is 3
for i := 0; i < 9; i++ {
Expand All @@ -230,7 +230,7 @@ func TestMetererReservations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, accountID2.Hex(), item["AccountID"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, strconv.Itoa(int(reservationPeriod)), item["ReservationPeriod"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, strconv.Itoa((i+1)*int(requiredLength)), item["BinUsage"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, strconv.Itoa((i+1)*int(requiredLength)), item["PeriodUsage"].(*types.AttributeValueMemberN).Value)

}
// first over flow is allowed
Expand All @@ -247,13 +247,13 @@ func TestMetererReservations(t *testing.T) {
assert.Equal(t, accountID2.Hex(), item["AccountID"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, strconv.Itoa(int(overflowedReservationPeriod)), item["ReservationPeriod"].(*types.AttributeValueMemberN).Value)
// 25 rounded up to the nearest multiple of minNumSymbols - (200-21*9) = 16
assert.Equal(t, strconv.Itoa(int(16)), item["BinUsage"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, strconv.Itoa(int(16)), item["PeriodUsage"].(*types.AttributeValueMemberN).Value)

// second over flow
header = createPaymentHeader(reservationPeriod, big.NewInt(0), accountID2)
assert.NoError(t, err)
err = mt.MeterRequest(ctx, *header, 1, quoromNumbers)
assert.ErrorContains(t, err, "bin has already been filled")
assert.ErrorContains(t, err, "period has already been filled")
}

func TestMetererOnDemand(t *testing.T) {
Expand Down
44 changes: 22 additions & 22 deletions core/meterer/offchain_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,61 +67,61 @@ func NewOffchainStore(
}, nil
}

func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID string, reservationPeriod uint64, size uint64) (uint64, error) {
func (s *OffchainStore) UpdateReservationPeriod(ctx context.Context, accountID string, reservationPeriod uint64, size uint64) (uint64, error) {
key := map[string]types.AttributeValue{
"AccountID": &types.AttributeValueMemberS{Value: accountID},
"ReservationPeriod": &types.AttributeValueMemberN{Value: strconv.FormatUint(reservationPeriod, 10)},
}

res, err := s.dynamoClient.IncrementBy(ctx, s.reservationTableName, key, "BinUsage", size)
res, err := s.dynamoClient.IncrementBy(ctx, s.reservationTableName, key, "PeriodUsage", size)
if err != nil {
return 0, fmt.Errorf("failed to increment bin usage: %w", err)
}

binUsage, ok := res["BinUsage"]
periodUsage, ok := res["PeriodUsage"]
if !ok {
return 0, errors.New("BinUsage is not present in the response")
return 0, errors.New("PeriodUsage is not present in the response")
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
periodUsageAttr, ok := periodUsage.(*types.AttributeValueMemberN)
if !ok {
return 0, fmt.Errorf("unexpected type for BinUsage: %T", binUsage)
return 0, fmt.Errorf("unexpected type for PeriodUsage: %T", periodUsage)
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
periodUsageValue, err := strconv.ParseUint(periodUsageAttr.Value, 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse BinUsage: %w", err)
return 0, fmt.Errorf("failed to parse PeriodUsage: %w", err)
}

return binUsageValue, nil
return periodUsageValue, nil
}

func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, reservationPeriod uint64, size uint64) (uint64, error) {
func (s *OffchainStore) UpdateGlobalPeriod(ctx context.Context, reservationPeriod uint64, size uint64) (uint64, error) {
key := map[string]types.AttributeValue{
"ReservationPeriod": &types.AttributeValueMemberN{Value: strconv.FormatUint(reservationPeriod, 10)},
}

res, err := s.dynamoClient.IncrementBy(ctx, s.globalBinTableName, key, "BinUsage", size)
res, err := s.dynamoClient.IncrementBy(ctx, s.globalBinTableName, key, "PeriodUsage", size)
if err != nil {
return 0, err
}

binUsage, ok := res["BinUsage"]
periodUsage, ok := res["PeriodUsage"]
if !ok {
return 0, nil
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
periodUsageAttr, ok := periodUsage.(*types.AttributeValueMemberN)
if !ok {
return 0, nil
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
periodUsageValue, err := strconv.ParseUint(periodUsageAttr.Value, 10, 32)
if err != nil {
return 0, err
}

return binUsageValue, nil
return periodUsageValue, nil
}

func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, paymentMetadata core.PaymentMetadata, symbolsCharged uint32) error {
Expand Down Expand Up @@ -408,23 +408,23 @@ func parseReservationPeriodRecord(bin map[string]types.AttributeValue) (*pb.Rese
return nil, fmt.Errorf("failed to parse ReservationPeriod: %w", err)
}

binUsage, ok := bin["BinUsage"]
periodUsage, ok := bin["PeriodUsage"]
if !ok {
return nil, errors.New("BinUsage is not present in the response")
return nil, errors.New("PeriodUsage is not present in the response")
}

binUsageAttr, ok := binUsage.(*types.AttributeValueMemberN)
periodUsageAttr, ok := periodUsage.(*types.AttributeValueMemberN)
if !ok {
return nil, fmt.Errorf("unexpected type for BinUsage: %T", binUsage)
return nil, fmt.Errorf("unexpected type for PeriodUsage: %T", periodUsage)
}

binUsageValue, err := strconv.ParseUint(binUsageAttr.Value, 10, 32)
periodUsageValue, err := strconv.ParseUint(periodUsageAttr.Value, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse BinUsage: %w", err)
return nil, fmt.Errorf("failed to parse PeriodUsage: %w", err)
}

return &pb.ReservationPeriodRecord{
Index: uint32(reservationPeriodValue),
Usage: uint64(binUsageValue),
Usage: uint64(periodUsageValue),
}, nil
}
Loading

0 comments on commit 9c4a65f

Please sign in to comment.