Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket level metric #542

Merged
merged 13 commits into from
May 29, 2024
17 changes: 11 additions & 6 deletions common/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@ import (
// ID is the authenticated Account ID. For retrieval requests, the requester ID will be the requester's IP address.
type RequesterID = string

// RequesterName is the friendly name of the party making the request. In the case
// of a rollup making a dispersal request, the RequesterName is the name of the rollup.
type RequesterName = string

type RequestParams struct {
RequesterID RequesterID
BlobSize uint
Rate RateParam
Info interface{}
RequesterID RequesterID
RequesterName RequesterName
BlobSize uint
Rate RateParam
Info interface{}
}

type RateLimiter interface {
// AllowRequest checks whether the request should be allowed. If the request is allowed, the function returns true.
// If the request is not allowed, the function returns false and the RequestParams of the request that was not allowed.
// In order to for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed.
// In order for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed.
// Each RequestParams object represents a single request. Each request is subjected to the same GlobalRateParams, but the
// individual parameters of the request can differ.
//
Expand All @@ -37,7 +42,7 @@ type RateLimiter interface {

type GlobalRateParams struct {
// BucketSizes are the time scales at which the rate limit is enforced.
// For each time scale, the rate limiter will make sure that the give rate (possibly subject to a relaxation given
// For each time scale, the rate limiter will make sure that the given rate (possibly subject to a relaxation given
// by one of the Multipliers) is observed when the request bandwidth is averaged at this time scale.
// In terms of implementation, the rate limiter uses a set of "time buckets". A time bucket, i, is filled to a maximum of
// `BucketSizes[i]` at a rate of 1, and emptied by an amount equal to `(size of request)/RateParam` each time a
Expand Down
25 changes: 23 additions & 2 deletions common/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package ratelimit

import (
"context"
"strconv"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type BucketStore = common.KVStore[common.RateBucketParams]
Expand All @@ -15,13 +18,20 @@ type rateLimiter struct {
bucketStore BucketStore

logger logging.Logger

// Prometheus metrics
bucketLevels *prometheus.GaugeVec
}

func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter {
func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter {
return &rateLimiter{
globalRateParams: rateParams,
bucketStore: bucketStore,
logger: logger.With("component", "RateLimiter"),
bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rate_limiter_bucket_levels",
Help: "Current level of each bucket for rate limiting",
}, []string{"requester_id", "requester_name", "bucket_index"}),
}
}

Expand Down Expand Up @@ -109,7 +119,18 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar
bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction)
allowed = allowed && bucketParams.BucketLevels[i] > 0

d.logger.Debug("Bucket level", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed)
d.logger.Debug("Bucket level updated", "key", params.RequesterID, "name", params.RequesterName, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed)

// Update metrics only if the requester name is provided. We're making
// an assumption that the requester name is only provided for authenticated
// requests so it should limit the cardinality of the requester_id label.
if params.RequesterName != "" {
d.bucketLevels.With(prometheus.Labels{
"requester_id": params.RequesterID,
"requester_name": params.RequesterName,
"bucket_index": strconv.Itoa(i),
}).Set(float64(bucketParams.BucketLevels[i]))
}
}

return allowed, bucketParams
Expand Down
3 changes: 2 additions & 1 deletion common/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/Layr-Labs/eigenda/common/ratelimit"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,7 +26,7 @@ func makeTestRatelimiter() (common.RateLimiter, error) {
return nil, err
}

ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logging.NewNoopLogger())
ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logging.NewNoopLogger())

return ratelimiter, nil

Expand Down
36 changes: 23 additions & 13 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,14 @@ func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, qu
rates.BlobRate = rateInfo.BlobRate
}

if len(rateInfo.Name) > 0 {
rates.Name = rateInfo.Name
}

break
}

return rates, key, nil

}

// Enum of rateTypes for the limiterInfo struct
Expand Down Expand Up @@ -446,6 +449,9 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
s.metrics.HandleInternalFailureRpcRequest(apiMethodName)
return api.NewInternalError(err.Error())
}

// Note: There's an implicit assumption that an empty name means the account
// is not in the allow list.
requesterName = accountRates.Name

// Update the quorum rate
Expand All @@ -458,9 +464,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
// System Level
key := fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemThroughputType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: encodedSize,
Rate: globalRates.TotalUnauthThroughput,
RequesterID: key,
RequesterName: systemAccountKey,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't these be requesterName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I see how it's confusing but in this case we are calculating the system rate limit metric so we want to make sure the metric that is being updated corresponds to the name "system".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to Ian's point, this might be a good reason to name these something like MetricsId and MetricsFlag, so that the intended usage is clear to the library user. Not super critical though.

BlobSize: encodedSize,
Rate: globalRates.TotalUnauthThroughput,
Info: limiterInfo{
RateType: SystemThroughputType,
QuorumID: param.QuorumID,
Expand All @@ -469,9 +476,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context

key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: blobRateMultiplier,
Rate: globalRates.TotalUnauthBlobRate,
RequesterID: key,
RequesterName: systemAccountKey,
BlobSize: blobRateMultiplier,
Rate: globalRates.TotalUnauthBlobRate,
Info: limiterInfo{
RateType: SystemBlobRateType,
QuorumID: param.QuorumID,
Expand All @@ -481,9 +489,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context
// Account Level
key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountThroughputType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: encodedSize,
Rate: accountRates.Throughput,
RequesterID: key,
RequesterName: requesterName,
BlobSize: encodedSize,
Rate: accountRates.Throughput,
Info: limiterInfo{
RateType: AccountThroughputType,
QuorumID: param.QuorumID,
Expand All @@ -492,9 +501,10 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context

key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug())
requestParams = append(requestParams, common.RequestParams{
RequesterID: key,
BlobSize: blobRateMultiplier,
Rate: accountRates.BlobRate,
RequesterID: key,
RequesterName: requesterName,
BlobSize: blobRateMultiplier,
Rate: accountRates.BlobRate,
Info: limiterInfo{
RateType: AccountBlobRateType,
QuorumID: param.QuorumID,
Expand Down
9 changes: 7 additions & 2 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli"

pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand Down Expand Up @@ -641,7 +642,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
if err != nil {
panic("failed to create bucket store")
}
ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger)

rateConfig := apiserver.RateConfig{
QuorumRateInfos: map[core.QuorumID]apiserver.QuorumRateInfo{
Expand All @@ -662,20 +663,24 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
Allowlist: apiserver.Allowlist{
"1.2.3.4": map[uint8]apiserver.PerUserRateInfo{
0: {
Name: "eigenlabs",
Throughput: 100 * 1024,
BlobRate: 5 * 1e6,
},
1: {
Name: "eigenlabs",
Throughput: 1024 * 1024,
BlobRate: 5 * 1e6,
},
},
"0x1aa8226f6d354380dDE75eE6B634875c4203e522": map[uint8]apiserver.PerUserRateInfo{
0: {
Name: "eigenlabs",
Throughput: 100 * 1024,
BlobRate: 5 * 1e6,
},
1: {
Name: "eigenlabs",
Throughput: 1024 * 1024,
BlobRate: 5 * 1e6,
},
Expand All @@ -693,7 +698,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
return apiserver.NewDispersalServer(disperser.ServerConfig{
GrpcPort: "51001",
GrpcTimeout: 1 * time.Second,
}, queue, transactor, logger, disperser.NewMetrics("9001", logger), ratelimiter, rateConfig)
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig)
}

func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) {
Expand Down
18 changes: 14 additions & 4 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/disperser/apiserver"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/prometheus/client_golang/prometheus"

"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/common/aws/s3"
Expand Down Expand Up @@ -91,6 +92,8 @@ func RunDisperserServer(ctx *cli.Context) error {
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

reg := prometheus.NewRegistry()

var ratelimiter common.RateLimiter
if config.EnableRatelimiter {
globalParams := config.RatelimiterConfig.GlobalRateParams
Expand All @@ -108,12 +111,19 @@ func RunDisperserServer(ctx *cli.Context) error {
return err
}
}
ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger)
}

// TODO: create a separate metrics for batcher
metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger)
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, transactor, logger, metrics, ratelimiter, config.RateConfig)
metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger)
server := apiserver.NewDispersalServer(
config.ServerConfig,
blobStore,
transactor,
logger,
metrics,
ratelimiter,
config.RateConfig,
)

// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
Expand Down
3 changes: 1 addition & 2 deletions disperser/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ const (
AccountRateLimitedFailure string = "ratelimited-account" // The request rate limited at account level
)

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics {
namespace := "eigenda_disperser"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

Expand Down
33 changes: 18 additions & 15 deletions node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common/pubip"
"github.com/Layr-Labs/eigenda/common/ratelimit"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/prometheus/client_golang/prometheus"

"github.com/urfave/cli"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/ratelimit"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigenda/node/flags"
"github.com/Layr-Labs/eigenda/node/grpc"
Expand Down Expand Up @@ -56,18 +57,8 @@ func NodeMain(ctx *cli.Context) error {

pubIPProvider := pubip.ProviderOrDefault(config.PubIPProvider)

// Create the node.
node, err := node.NewNode(config, pubIPProvider, logger)
if err != nil {
return err
}

err = node.Start(context.Background())
if err != nil {
node.Logger.Error("could not start node", "error", err)
return err
}

// Rate limiter
reg := prometheus.NewRegistry()
globalParams := common.GlobalRateParams{
BucketSizes: []time.Duration{bucketDuration},
Multipliers: []float32{bucketMultiplier},
Expand All @@ -79,7 +70,19 @@ func NodeMain(ctx *cli.Context) error {
return err
}

ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
ratelimiter := ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger)

// Create the node.
node, err := node.NewNode(reg, config, pubIPProvider, logger)
if err != nil {
return err
}

err = node.Start(context.Background())
if err != nil {
node.Logger.Error("could not start node", "error", err)
return err
}

// Creates the GRPC server.
server := grpc.NewServer(config, node, logger, ratelimiter)
Expand Down
9 changes: 4 additions & 5 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ type Node struct {
}

// NewNode creates a new Node with the provided config.
func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
// Setup metrics
// sdkClients, err := buildSdkClients(config, logger)
// if err != nil {
// return nil, err
// }

promReg := prometheus.NewRegistry()
eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics"))
rpcCallsCollector := rpccalls.NewCollector(AppName, promReg)
eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, reg, logger.With("component", "EigenMetrics"))
rpcCallsCollector := rpccalls.NewCollector(AppName, reg)

// Generate BLS keys
keyPair, err := core.MakeKeyPairFromString(config.PrivateBls)
Expand Down Expand Up @@ -113,7 +112,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger
// Setup Node Api
nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi"))

metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst)
metrics := NewMetrics(eigenMetrics, reg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst)

// Make validator
v, err := verifier.NewVerifier(&config.EncoderConfig, false)
Expand Down
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
}
finalizer := batchermock.NewFinalizer()

disperserMetrics := disperser.NewMetrics("9100", logger)
disperserMetrics := disperser.NewMetrics(prometheus.NewRegistry(), "9100", logger)
txnManager := batchermock.NewTxnManager()

batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)
Expand Down
Loading