From 8fad272c4a039e8a859caa027f32690ae3c3e6bd Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 22:14:02 -0700 Subject: [PATCH 01/13] Add IsAuthenticated to RequestParams --- common/ratelimit.go | 13 +++++++------ disperser/apiserver/server.go | 12 +++++++++++- disperser/apiserver/server_test.go | 4 ++++ disperser/cmd/apiserver/main.go | 10 +++++++++- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/common/ratelimit.go b/common/ratelimit.go index 5d4950019e..0f4a10deb2 100644 --- a/common/ratelimit.go +++ b/common/ratelimit.go @@ -16,16 +16,17 @@ import ( type RequesterID = string type RequestParams struct { - RequesterID RequesterID - BlobSize uint - Rate RateParam - Info interface{} + RequesterID RequesterID + BlobSize uint + Rate RateParam + Info interface{} + IsAuthenticated bool } type RateLimiter interface { // AllowRequest checks whether the request should be allowed. If the request is allowed, the function returns true. // If the request is not allowed, the function returns false and the RequestParams of the request that was not allowed. - // In order to for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed. + // In order for the request to be allowed, all of the requests represented by the RequestParams slice must be allowed. // Each RequestParams object represents a single request. Each request is subjected to the same GlobalRateParams, but the // individual parameters of the request can differ. // @@ -37,7 +38,7 @@ type RateLimiter interface { type GlobalRateParams struct { // BucketSizes are the time scales at which the rate limit is enforced. - // For each time scale, the rate limiter will make sure that the give rate (possibly subject to a relaxation given + // For each time scale, the rate limiter will make sure that the given rate (possibly subject to a relaxation given // by one of the Multipliers) is observed when the request bandwidth is averaged at this time scale. // In terms of implementation, the rate limiter uses a set of "time buckets". A time bucket, i, is filled to a maximum of // `BucketSizes[i]` at a rate of 1, and emptied by an amount equal to `(size of request)/RateParam` each time a diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 640758c3a7..7be65e9e1e 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -355,11 +355,14 @@ func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, qu rates.BlobRate = rateInfo.BlobRate } + if len(rateInfo.Name) > 0 { + rates.Name = rateInfo.Name + } + break } return rates, key, nil - } // Enum of rateTypes for the limiterInfo struct @@ -446,6 +449,9 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context s.metrics.HandleInternalFailureRpcRequest(apiMethodName) return api.NewInternalError(err.Error()) } + + // Note: There's an implicit assumption that an empty name means the account + // is not in the allow list. requesterName = accountRates.Name // Update the quorum rate @@ -465,6 +471,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context RateType: SystemThroughputType, QuorumID: param.QuorumID, }, + IsAuthenticated: len(requesterName) > 0, }) key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug()) @@ -476,6 +483,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context RateType: SystemBlobRateType, QuorumID: param.QuorumID, }, + IsAuthenticated: len(requesterName) > 0, }) // Account Level @@ -488,6 +496,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context RateType: AccountThroughputType, QuorumID: param.QuorumID, }, + IsAuthenticated: len(requesterName) > 0, }) key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug()) @@ -499,6 +508,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context RateType: AccountBlobRateType, QuorumID: param.QuorumID, }, + IsAuthenticated: len(requesterName) > 0, }) } diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index d86beae396..37a5dd9362 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -662,20 +662,24 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { Allowlist: apiserver.Allowlist{ "1.2.3.4": map[uint8]apiserver.PerUserRateInfo{ 0: { + Name: "eigenlabs", Throughput: 100 * 1024, BlobRate: 5 * 1e6, }, 1: { + Name: "eigenlabs", Throughput: 1024 * 1024, BlobRate: 5 * 1e6, }, }, "0x1aa8226f6d354380dDE75eE6B634875c4203e522": map[uint8]apiserver.PerUserRateInfo{ 0: { + Name: "eigenlabs", Throughput: 100 * 1024, BlobRate: 5 * 1e6, }, 1: { + Name: "eigenlabs", Throughput: 1024 * 1024, BlobRate: 5 * 1e6, }, diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index f8a8ac037f..2b39292fdd 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -113,7 +113,15 @@ func RunDisperserServer(ctx *cli.Context) error { // TODO: create a separate metrics for batcher metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger) - server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, transactor, logger, metrics, ratelimiter, config.RateConfig) + server := apiserver.NewDispersalServer( + config.ServerConfig, + blobStore, + transactor, + logger, + metrics, + ratelimiter, + config.RateConfig, + ) // Enable Metrics Block if config.MetricsConfig.EnableMetrics { From 96847d169535e22da00637dba7618889ee664c8d Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 22:25:47 -0700 Subject: [PATCH 02/13] Try adding bucket metric --- common/ratelimit/limiter.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 08387cf446..3ff1aec62e 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -2,10 +2,12 @@ package ratelimit import ( "context" + "fmt" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" ) type BucketStore = common.KVStore[common.RateBucketParams] @@ -15,6 +17,9 @@ type rateLimiter struct { bucketStore BucketStore logger logging.Logger + + // Prometheus metrics + bucketLevels *prometheus.GaugeVec } func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { @@ -22,6 +27,10 @@ func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), + bucketLevels: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_bucket_levels", + Help: "Current level of each bucket for rate limiting", + }, []string{"requester_id", "bucket_index"}), } } @@ -109,7 +118,19 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - d.logger.Debug("Bucket level", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + // Check if the user is authenticated before adding to metrics + requesterLabel := "unauthenticated" // Default or generic label + if params.IsAuthenticated { + requesterLabel = params.RequesterID + } + + d.bucketLevels.With(prometheus.Labels{"requester_id": requesterLabel, "bucket_index": fmt.Sprintf("%d", i)}).Set(float64(bucketParams.BucketLevels[i])) + d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + } + + for i, level := range bucketParams.BucketLevels { + d.bucketLevels.With(prometheus.Labels{"requester_id": params.RequesterID, "bucket_index": fmt.Sprintf("%d", i)}).Set(float64(level)) + d.logger.Debug("Bucket level updated", "requester_id", params.RequesterID, "bucket_index", i, "level", level) } return allowed, bucketParams From 62e9e3fd5e91d55aba56b405a7d7ad61a0540a6a Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 22:35:07 -0700 Subject: [PATCH 03/13] remove loop --- common/ratelimit/limiter.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 3ff1aec62e..defaf54946 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -128,11 +128,6 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) } - for i, level := range bucketParams.BucketLevels { - d.bucketLevels.With(prometheus.Labels{"requester_id": params.RequesterID, "bucket_index": fmt.Sprintf("%d", i)}).Set(float64(level)) - d.logger.Debug("Bucket level updated", "requester_id", params.RequesterID, "bucket_index", i, "level", level) - } - return allowed, bucketParams } From 64bda6a01d6f4cd84e87471db3df736299186cd6 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 22:51:18 -0700 Subject: [PATCH 04/13] register rate limit metrics --- common/ratelimit/limiter.go | 4 ---- common/ratelimit/metrics.go | 10 ++++++++++ disperser/metrics.go | 2 ++ 3 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 common/ratelimit/metrics.go diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index defaf54946..680bc18f6e 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -27,10 +27,6 @@ func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), - bucketLevels: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "rate_limiter_bucket_levels", - Help: "Current level of each bucket for rate limiting", - }, []string{"requester_id", "bucket_index"}), } } diff --git a/common/ratelimit/metrics.go b/common/ratelimit/metrics.go new file mode 100644 index 0000000000..2f1dd43da2 --- /dev/null +++ b/common/ratelimit/metrics.go @@ -0,0 +1,10 @@ +package ratelimit + +import "github.com/prometheus/client_golang/prometheus" + +func RegisterMetrics(registerer prometheus.Registerer) { + registerer.MustRegister(prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_bucket_levels", + Help: "Current level of each bucket for rate limiting", + }, []string{"requester_id", "bucket_index"})) +} diff --git a/disperser/metrics.go b/disperser/metrics.go index f9ed4950b1..881d1062bd 100644 --- a/disperser/metrics.go +++ b/disperser/metrics.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/Layr-Labs/eigenda/common/ratelimit" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -42,6 +43,7 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) + ratelimit.RegisterMetrics(reg) metrics := &Metrics{ // TODO: revamp this metric -- it'll focus on quorum tracking, which is relevant From a371822bad74d503c3b4f2199f163c93c2a6a923 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 23:28:09 -0700 Subject: [PATCH 05/13] try this way --- common/ratelimit/limiter.go | 7 ++++++- common/ratelimit/metrics.go | 10 ---------- common/ratelimit/ratelimit_test.go | 3 ++- disperser/apiserver/server_test.go | 5 +++-- disperser/cmd/apiserver/main.go | 8 +++++--- disperser/metrics.go | 5 +---- 6 files changed, 17 insertions(+), 21 deletions(-) delete mode 100644 common/ratelimit/metrics.go diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 680bc18f6e..c52d667781 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -8,6 +8,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type BucketStore = common.KVStore[common.RateBucketParams] @@ -22,11 +23,15 @@ type rateLimiter struct { bucketLevels *prometheus.GaugeVec } -func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { +func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { return &rateLimiter{ globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), + bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_bucket_levels", + Help: "Current level of each bucket for rate limiting", + }, []string{"requester_id", "bucket_index"}), } } diff --git a/common/ratelimit/metrics.go b/common/ratelimit/metrics.go deleted file mode 100644 index 2f1dd43da2..0000000000 --- a/common/ratelimit/metrics.go +++ /dev/null @@ -1,10 +0,0 @@ -package ratelimit - -import "github.com/prometheus/client_golang/prometheus" - -func RegisterMetrics(registerer prometheus.Registerer) { - registerer.MustRegister(prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "rate_limiter_bucket_levels", - Help: "Current level of each bucket for rate limiting", - }, []string{"requester_id", "bucket_index"})) -} diff --git a/common/ratelimit/ratelimit_test.go b/common/ratelimit/ratelimit_test.go index 0555f2e748..969d698dbf 100644 --- a/common/ratelimit/ratelimit_test.go +++ b/common/ratelimit/ratelimit_test.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/common/ratelimit" "github.com/Layr-Labs/eigenda/common/store" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -25,7 +26,7 @@ func makeTestRatelimiter() (common.RateLimiter, error) { return nil, err } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logging.NewNoopLogger()) + ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logging.NewNoopLogger()) return ratelimiter, nil diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 37a5dd9362..b4c90c93de 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -20,6 +20,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" @@ -641,7 +642,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create bucket store") } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ QuorumRateInfos: map[core.QuorumID]apiserver.QuorumRateInfo{ @@ -697,7 +698,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics("9001", logger), ratelimiter, rateConfig) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 2b39292fdd..e6fa92df06 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/common/aws/s3" @@ -91,6 +92,8 @@ func RunDisperserServer(ctx *cli.Context) error { blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) + reg := prometheus.NewRegistry() + var ratelimiter common.RateLimiter if config.EnableRatelimiter { globalParams := config.RatelimiterConfig.GlobalRateParams @@ -108,11 +111,10 @@ func RunDisperserServer(ctx *cli.Context) error { return err } } - ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) } - // TODO: create a separate metrics for batcher - metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger) + metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger) server := apiserver.NewDispersalServer( config.ServerConfig, blobStore, diff --git a/disperser/metrics.go b/disperser/metrics.go index 881d1062bd..6a762344d9 100644 --- a/disperser/metrics.go +++ b/disperser/metrics.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" - "github.com/Layr-Labs/eigenda/common/ratelimit" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -38,12 +37,10 @@ const ( AccountRateLimitedFailure string = "ratelimited-account" // The request rate limited at account level ) -func NewMetrics(httpPort string, logger logging.Logger) *Metrics { +func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics { namespace := "eigenda_disperser" - reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - ratelimit.RegisterMetrics(reg) metrics := &Metrics{ // TODO: revamp this metric -- it'll focus on quorum tracking, which is relevant From 3d3116cc5ae2c6b3809ec61394e269d9adfa5921 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 23:41:09 -0700 Subject: [PATCH 06/13] fix node build --- node/cmd/main.go | 33 ++++++++++++++++++--------------- node/node.go | 9 ++++----- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/node/cmd/main.go b/node/cmd/main.go index 1fe3a9564c..00b959213d 100644 --- a/node/cmd/main.go +++ b/node/cmd/main.go @@ -8,12 +8,13 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/pubip" + "github.com/Layr-Labs/eigenda/common/ratelimit" + "github.com/Layr-Labs/eigenda/common/store" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/common/ratelimit" - "github.com/Layr-Labs/eigenda/common/store" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigenda/node/flags" "github.com/Layr-Labs/eigenda/node/grpc" @@ -56,18 +57,8 @@ func NodeMain(ctx *cli.Context) error { pubIPProvider := pubip.ProviderOrDefault(config.PubIPProvider) - // Create the node. - node, err := node.NewNode(config, pubIPProvider, logger) - if err != nil { - return err - } - - err = node.Start(context.Background()) - if err != nil { - node.Logger.Error("could not start node", "error", err) - return err - } - + // Rate limiter + reg := prometheus.NewRegistry() globalParams := common.GlobalRateParams{ BucketSizes: []time.Duration{bucketDuration}, Multipliers: []float32{bucketMultiplier}, @@ -79,7 +70,19 @@ func NodeMain(ctx *cli.Context) error { return err } - ratelimiter := ratelimit.NewRateLimiter(globalParams, bucketStore, logger) + ratelimiter := ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) + + // Create the node. + node, err := node.NewNode(reg, config, pubIPProvider, logger) + if err != nil { + return err + } + + err = node.Start(context.Background()) + if err != nil { + node.Logger.Error("could not start node", "error", err) + return err + } // Creates the GRPC server. server := grpc.NewServer(config, node, logger, ratelimiter) diff --git a/node/node.go b/node/node.go index a39f6dcc8b..1f93085162 100644 --- a/node/node.go +++ b/node/node.go @@ -66,16 +66,15 @@ type Node struct { } // NewNode creates a new Node with the provided config. -func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { +func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { // Setup metrics // sdkClients, err := buildSdkClients(config, logger) // if err != nil { // return nil, err // } - promReg := prometheus.NewRegistry() - eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics")) - rpcCallsCollector := rpccalls.NewCollector(AppName, promReg) + eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, reg, logger.With("component", "EigenMetrics")) + rpcCallsCollector := rpccalls.NewCollector(AppName, reg) // Generate BLS keys keyPair, err := core.MakeKeyPairFromString(config.PrivateBls) @@ -113,7 +112,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) - metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) + metrics := NewMetrics(eigenMetrics, reg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) // Make validator v, err := verifier.NewVerifier(&config.EncoderConfig, false) From 27320c76b85a84104b1e1f9ac3f4af5d36129864 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 2 May 2024 23:44:40 -0700 Subject: [PATCH 07/13] fix test --- test/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration_test.go b/test/integration_test.go index fc5c8c4661..c75afeee0b 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -168,7 +168,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } finalizer := batchermock.NewFinalizer() - disperserMetrics := disperser.NewMetrics("9100", logger) + disperserMetrics := disperser.NewMetrics(prometheus.NewRegistry(), "9100", logger) txnManager := batchermock.NewTxnManager() batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan) From 95b651a36bf1b69fd50a5942cb6cb3574e79df6d Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 May 2024 22:54:15 -0700 Subject: [PATCH 08/13] Remove unauth label and add system bucket level --- common/ratelimit/limiter.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index c52d667781..2868b1141f 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -3,6 +3,7 @@ package ratelimit import ( "context" "fmt" + "strings" "time" "github.com/Layr-Labs/eigenda/common" @@ -20,7 +21,8 @@ type rateLimiter struct { logger logging.Logger // Prometheus metrics - bucketLevels *prometheus.GaugeVec + bucketLevels *prometheus.GaugeVec + systemBucketLevels *prometheus.GaugeVec } func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { @@ -32,6 +34,10 @@ func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParam Name: "rate_limiter_bucket_levels", Help: "Current level of each bucket for rate limiting", }, []string{"requester_id", "bucket_index"}), + systemBucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_system_bucket_levels", + Help: "Current level of each bucket for system rate limiting", + }, []string{"bucket_index"}), } } @@ -120,12 +126,15 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar allowed = allowed && bucketParams.BucketLevels[i] > 0 // Check if the user is authenticated before adding to metrics - requesterLabel := "unauthenticated" // Default or generic label if params.IsAuthenticated { - requesterLabel = params.RequesterID + requesterLabel := params.RequesterID + d.bucketLevels.With(prometheus.Labels{"requester_id": requesterLabel, "bucket_index": fmt.Sprintf("%d", i)}).Set(float64(bucketParams.BucketLevels[i])) + } + + if strings.HasPrefix(params.RequesterID, "system:") { + d.systemBucketLevels.With(prometheus.Labels{"bucket_index": fmt.Sprintf("%d", i)}).Set(float64(bucketParams.BucketLevels[i])) } - d.bucketLevels.With(prometheus.Labels{"requester_id": requesterLabel, "bucket_index": fmt.Sprintf("%d", i)}).Set(float64(bucketParams.BucketLevels[i])) d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) } From 1286d98000fb36b41638462f5d1ed8625bb3b167 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 May 2024 23:18:01 -0700 Subject: [PATCH 09/13] Add more labels and fix logic --- common/ratelimit/limiter.go | 40 +++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 2868b1141f..e3859ff75b 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -2,7 +2,6 @@ package ratelimit import ( "context" - "fmt" "strings" "time" @@ -125,14 +124,39 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - // Check if the user is authenticated before adding to metrics - if params.IsAuthenticated { - requesterLabel := params.RequesterID - d.bucketLevels.With(prometheus.Labels{"requester_id": requesterLabel, "bucket_index": fmt.Sprintf("%d", i)}).Set(float64(bucketParams.BucketLevels[i])) - } - if strings.HasPrefix(params.RequesterID, "system:") { - d.systemBucketLevels.With(prometheus.Labels{"bucket_index": fmt.Sprintf("%d", i)}).Set(float64(bucketParams.BucketLevels[i])) + requesterParts := strings.Split(params.RequesterID, ":") + if len(requesterParts) == 2 { + systemParts := strings.Split(requesterParts[1], "-") + if len(systemParts) == 2 { + quorum := systemParts[0] + limitType := systemParts[1] + d.systemBucketLevels.With(prometheus.Labels{ + "quorum": quorum, + "type": limitType, + }).Set(float64(bucketParams.BucketLevels[i])) + } + } + } else { + // Check if the user is authenticated before adding to metrics + if params.IsAuthenticated { + requesterParts := strings.Split(params.RequesterID, ":") + if len(requesterParts) == 3 { + accountType := requesterParts[0] + ipOrEthAddress := requesterParts[1] + quorumAndType := strings.Split(requesterParts[2], "-") + if len(quorumAndType) == 2 { + quorum := quorumAndType[0] + limitType := quorumAndType[1] + d.bucketLevels.With(prometheus.Labels{ + "account_type": accountType, + "ip_eth_address": ipOrEthAddress, + "quorum": quorum, + "type": limitType, + }).Set(float64(bucketParams.BucketLevels[i])) + } + } + } } d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) From 892c925a03f020ae75515cefc9e1365e532b4814 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 7 May 2024 23:28:21 -0700 Subject: [PATCH 10/13] some fixes --- common/ratelimit/limiter.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index e3859ff75b..58597252fa 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -2,6 +2,7 @@ package ratelimit import ( "context" + "strconv" "strings" "time" @@ -32,11 +33,11 @@ func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParam bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "rate_limiter_bucket_levels", Help: "Current level of each bucket for rate limiting", - }, []string{"requester_id", "bucket_index"}), + }, []string{"account_type", "account_key", "quorum", "type", "bucket_index"}), systemBucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "rate_limiter_system_bucket_levels", Help: "Current level of each bucket for system rate limiting", - }, []string{"bucket_index"}), + }, []string{"quorum", "type", "bucket_index"}), } } @@ -132,8 +133,9 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar quorum := systemParts[0] limitType := systemParts[1] d.systemBucketLevels.With(prometheus.Labels{ - "quorum": quorum, - "type": limitType, + "quorum": quorum, + "type": limitType, + "bucket_index": strconv.Itoa(i), }).Set(float64(bucketParams.BucketLevels[i])) } } @@ -149,10 +151,11 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar quorum := quorumAndType[0] limitType := quorumAndType[1] d.bucketLevels.With(prometheus.Labels{ - "account_type": accountType, - "ip_eth_address": ipOrEthAddress, - "quorum": quorum, - "type": limitType, + "account_type": accountType, + "account_key": ipOrEthAddress, + "quorum": quorum, + "type": limitType, + "bucket_index": strconv.Itoa(i), }).Set(float64(bucketParams.BucketLevels[i])) } } From 5c191ecff3ebfdacdf1099c4a7d5dcd1d21081c7 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 May 2024 15:53:54 -0700 Subject: [PATCH 11/13] Try metrics key/params --- common/ratelimit.go | 4 ++ common/ratelimit/limiter.go | 91 +++++++++++++++++++---------------- disperser/apiserver/server.go | 22 +++++++++ 3 files changed, 76 insertions(+), 41 deletions(-) diff --git a/common/ratelimit.go b/common/ratelimit.go index 0f4a10deb2..f880183a6c 100644 --- a/common/ratelimit.go +++ b/common/ratelimit.go @@ -21,6 +21,10 @@ type RequestParams struct { Rate RateParam Info interface{} IsAuthenticated bool + + // If the metrics key is set, the rate limiter will increment the metric that corresponds to the key + MetricsKey string + MetricsParams map[string]string } type RateLimiter interface { diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 58597252fa..d000780e1f 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -3,7 +3,6 @@ package ratelimit import ( "context" "strconv" - "strings" "time" "github.com/Layr-Labs/eigenda/common" @@ -21,8 +20,8 @@ type rateLimiter struct { logger logging.Logger // Prometheus metrics - bucketLevels *prometheus.GaugeVec - systemBucketLevels *prometheus.GaugeVec + accountBucketLevels *prometheus.GaugeVec + systemBucketLevels *prometheus.GaugeVec } func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { @@ -30,10 +29,10 @@ func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParam globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), - bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "rate_limiter_bucket_levels", - Help: "Current level of each bucket for rate limiting", - }, []string{"account_type", "account_key", "quorum", "type", "bucket_index"}), + accountBucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_account_bucket_levels", + Help: "Current level of each account bucket for rate limiting", + }, []string{"account_key", "quorum", "type", "bucket_index"}), systemBucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "rate_limiter_system_bucket_levels", Help: "Current level of each bucket for system rate limiting", @@ -125,44 +124,54 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - if strings.HasPrefix(params.RequesterID, "system:") { - requesterParts := strings.Split(params.RequesterID, ":") - if len(requesterParts) == 2 { - systemParts := strings.Split(requesterParts[1], "-") - if len(systemParts) == 2 { - quorum := systemParts[0] - limitType := systemParts[1] - d.systemBucketLevels.With(prometheus.Labels{ - "quorum": quorum, - "type": limitType, - "bucket_index": strconv.Itoa(i), - }).Set(float64(bucketParams.BucketLevels[i])) + d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + // Update the metrics + if params.MetricsKey != "" { + if params.MetricsKey == "rate_limiter_account_bucket_levels" { + // Validate the metrics param map + if _, ok := params.MetricsParams["account_type"]; !ok { + d.logger.Warn("Missing account_type in metrics params") + continue } - } - } else { - // Check if the user is authenticated before adding to metrics - if params.IsAuthenticated { - requesterParts := strings.Split(params.RequesterID, ":") - if len(requesterParts) == 3 { - accountType := requesterParts[0] - ipOrEthAddress := requesterParts[1] - quorumAndType := strings.Split(requesterParts[2], "-") - if len(quorumAndType) == 2 { - quorum := quorumAndType[0] - limitType := quorumAndType[1] - d.bucketLevels.With(prometheus.Labels{ - "account_type": accountType, - "account_key": ipOrEthAddress, - "quorum": quorum, - "type": limitType, - "bucket_index": strconv.Itoa(i), - }).Set(float64(bucketParams.BucketLevels[i])) - } + if _, ok := params.MetricsParams["account_key"]; !ok { + d.logger.Warn("Missing account_key in metrics params") + continue + } + if _, ok := params.MetricsParams["quorum"]; !ok { + d.logger.Warn("Missing quorum in metrics params") + continue + } + if _, ok := params.MetricsParams["type"]; !ok { + d.logger.Warn("Missing type in metrics params") + continue } + d.accountBucketLevels.With(prometheus.Labels{ + "account_key": params.MetricsParams["account_key"], + "quorum": params.MetricsParams["quorum"], + "type": params.MetricsParams["type"], + "bucket_index": strconv.Itoa(i), + }).Set(float64(bucketParams.BucketLevels[i])) + } else if params.MetricsKey == "rate_limiter_system_bucket_levels" { + // Validate the metrics param map + if _, ok := params.MetricsParams["quorum"]; !ok { + d.logger.Warn("Missing quorum in metrics params") + continue + } + if _, ok := params.MetricsParams["type"]; !ok { + d.logger.Warn("Missing type in metrics params") + continue + } + d.systemBucketLevels.With(prometheus.Labels{ + "quorum": params.MetricsParams["quorum"], + "type": params.MetricsParams["type"], + "bucket_index": strconv.Itoa(i), + }).Set(float64(bucketParams.BucketLevels[i])) + } else { + d.logger.Warn("Unknown metrics key", "key", params.MetricsKey) } + } else { + d.logger.Warn("Missing metrics key") } - - d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) } return allowed, bucketParams diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 7be65e9e1e..7974b3aa5c 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -472,6 +472,11 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context QuorumID: param.QuorumID, }, IsAuthenticated: len(requesterName) > 0, + MetricsKey: "rate_limiter_system_bucket_levels", + MetricsParams: map[string]string{ + "quorum": fmt.Sprint(param.QuorumID), + "type": SystemThroughputType.Plug(), + }, }) key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug()) @@ -484,6 +489,11 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context QuorumID: param.QuorumID, }, IsAuthenticated: len(requesterName) > 0, + MetricsKey: "rate_limiter_system_bucket_levels", + MetricsParams: map[string]string{ + "quorum": fmt.Sprint(param.QuorumID), + "type": SystemBlobRateType.Plug(), + }, }) // Account Level @@ -497,6 +507,12 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context QuorumID: param.QuorumID, }, IsAuthenticated: len(requesterName) > 0, + MetricsKey: "rate_limiter_account_bucket_levels", + MetricsParams: map[string]string{ + "account_key": accountKey, + "quorum": fmt.Sprint(param.QuorumID), + "type": AccountThroughputType.Plug(), + }, }) key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug()) @@ -509,6 +525,12 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context QuorumID: param.QuorumID, }, IsAuthenticated: len(requesterName) > 0, + MetricsKey: "rate_limiter_account_bucket_levels", + MetricsParams: map[string]string{ + "account_key": accountKey, + "quorum": fmt.Sprint(param.QuorumID), + "type": AccountThroughputType.Plug(), + }, }) } From 532c5748df332ba0ebbdc9cc69619d18589d0ffc Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Fri, 10 May 2024 15:23:51 -0700 Subject: [PATCH 12/13] Update bucket metrics --- common/ratelimit.go | 18 ++++----- common/ratelimit/limiter.go | 71 ++++++++--------------------------- disperser/apiserver/server.go | 54 ++++++++------------------ 3 files changed, 40 insertions(+), 103 deletions(-) diff --git a/common/ratelimit.go b/common/ratelimit.go index f880183a6c..64534ec6fb 100644 --- a/common/ratelimit.go +++ b/common/ratelimit.go @@ -15,16 +15,16 @@ import ( // ID is the authenticated Account ID. For retrieval requests, the requester ID will be the requester's IP address. type RequesterID = string -type RequestParams struct { - RequesterID RequesterID - BlobSize uint - Rate RateParam - Info interface{} - IsAuthenticated bool +// RequesterName is the friendly name of the party making the request. In the case +// of a rollup making a dispersal request, the RequesterName is the name of the rollup. +type RequesterName = string - // If the metrics key is set, the rate limiter will increment the metric that corresponds to the key - MetricsKey string - MetricsParams map[string]string +type RequestParams struct { + RequesterID RequesterID + RequesterName RequesterName + BlobSize uint + Rate RateParam + Info interface{} } type RateLimiter interface { diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index d000780e1f..6aee3017c5 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -20,8 +20,7 @@ type rateLimiter struct { logger logging.Logger // Prometheus metrics - accountBucketLevels *prometheus.GaugeVec - systemBucketLevels *prometheus.GaugeVec + bucketLevels *prometheus.GaugeVec } func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParams, bucketStore BucketStore, logger logging.Logger) common.RateLimiter { @@ -29,14 +28,10 @@ func NewRateLimiter(reg prometheus.Registerer, rateParams common.GlobalRateParam globalRateParams: rateParams, bucketStore: bucketStore, logger: logger.With("component", "RateLimiter"), - accountBucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "rate_limiter_account_bucket_levels", - Help: "Current level of each account bucket for rate limiting", - }, []string{"account_key", "quorum", "type", "bucket_index"}), - systemBucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "rate_limiter_system_bucket_levels", - Help: "Current level of each bucket for system rate limiting", - }, []string{"quorum", "type", "bucket_index"}), + bucketLevels: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rate_limiter_bucket_levels", + Help: "Current level of each bucket for rate limiting", + }, []string{"requester_id", "requester_name", "bucket_index"}), } } @@ -125,52 +120,16 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar allowed = allowed && bucketParams.BucketLevels[i] > 0 d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) - // Update the metrics - if params.MetricsKey != "" { - if params.MetricsKey == "rate_limiter_account_bucket_levels" { - // Validate the metrics param map - if _, ok := params.MetricsParams["account_type"]; !ok { - d.logger.Warn("Missing account_type in metrics params") - continue - } - if _, ok := params.MetricsParams["account_key"]; !ok { - d.logger.Warn("Missing account_key in metrics params") - continue - } - if _, ok := params.MetricsParams["quorum"]; !ok { - d.logger.Warn("Missing quorum in metrics params") - continue - } - if _, ok := params.MetricsParams["type"]; !ok { - d.logger.Warn("Missing type in metrics params") - continue - } - d.accountBucketLevels.With(prometheus.Labels{ - "account_key": params.MetricsParams["account_key"], - "quorum": params.MetricsParams["quorum"], - "type": params.MetricsParams["type"], - "bucket_index": strconv.Itoa(i), - }).Set(float64(bucketParams.BucketLevels[i])) - } else if params.MetricsKey == "rate_limiter_system_bucket_levels" { - // Validate the metrics param map - if _, ok := params.MetricsParams["quorum"]; !ok { - d.logger.Warn("Missing quorum in metrics params") - continue - } - if _, ok := params.MetricsParams["type"]; !ok { - d.logger.Warn("Missing type in metrics params") - continue - } - d.systemBucketLevels.With(prometheus.Labels{ - "quorum": params.MetricsParams["quorum"], - "type": params.MetricsParams["type"], - "bucket_index": strconv.Itoa(i), - }).Set(float64(bucketParams.BucketLevels[i])) - } else { - d.logger.Warn("Unknown metrics key", "key", params.MetricsKey) - } - } else { - d.logger.Warn("Missing metrics key") + + // Update metrics only if the requester name is provided. We're making + // an assumption that the requester name is only provided for authenticated + // requests so it should limit the cardinality of the requester_id label. + if params.RequesterName != "" { + d.bucketLevels.With(prometheus.Labels{ + "requester_id": params.RequesterID, + "requester_name": params.RequesterName, + "bucket_index": strconv.Itoa(i), + }).Set(float64(bucketParams.BucketLevels[i])) } } diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 7974b3aa5c..e604b2a3ae 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -464,73 +464,51 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context // System Level key := fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemThroughputType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: encodedSize, - Rate: globalRates.TotalUnauthThroughput, + RequesterID: key, + RequesterName: systemAccountKey, + BlobSize: encodedSize, + Rate: globalRates.TotalUnauthThroughput, Info: limiterInfo{ RateType: SystemThroughputType, QuorumID: param.QuorumID, }, - IsAuthenticated: len(requesterName) > 0, - MetricsKey: "rate_limiter_system_bucket_levels", - MetricsParams: map[string]string{ - "quorum": fmt.Sprint(param.QuorumID), - "type": SystemThroughputType.Plug(), - }, }) key = fmt.Sprintf("%s:%d-%s", systemAccountKey, param.QuorumID, SystemBlobRateType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: blobRateMultiplier, - Rate: globalRates.TotalUnauthBlobRate, + RequesterID: key, + RequesterName: systemAccountKey, + BlobSize: blobRateMultiplier, + Rate: globalRates.TotalUnauthBlobRate, Info: limiterInfo{ RateType: SystemBlobRateType, QuorumID: param.QuorumID, }, - IsAuthenticated: len(requesterName) > 0, - MetricsKey: "rate_limiter_system_bucket_levels", - MetricsParams: map[string]string{ - "quorum": fmt.Sprint(param.QuorumID), - "type": SystemBlobRateType.Plug(), - }, }) // Account Level key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountThroughputType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: encodedSize, - Rate: accountRates.Throughput, + RequesterID: key, + RequesterName: requesterName, + BlobSize: encodedSize, + Rate: accountRates.Throughput, Info: limiterInfo{ RateType: AccountThroughputType, QuorumID: param.QuorumID, }, - IsAuthenticated: len(requesterName) > 0, - MetricsKey: "rate_limiter_account_bucket_levels", - MetricsParams: map[string]string{ - "account_key": accountKey, - "quorum": fmt.Sprint(param.QuorumID), - "type": AccountThroughputType.Plug(), - }, }) key = fmt.Sprintf("%s:%d-%s", accountKey, param.QuorumID, AccountBlobRateType.Plug()) requestParams = append(requestParams, common.RequestParams{ - RequesterID: key, - BlobSize: blobRateMultiplier, - Rate: accountRates.BlobRate, + RequesterID: key, + RequesterName: requesterName, + BlobSize: blobRateMultiplier, + Rate: accountRates.BlobRate, Info: limiterInfo{ RateType: AccountBlobRateType, QuorumID: param.QuorumID, }, - IsAuthenticated: len(requesterName) > 0, - MetricsKey: "rate_limiter_account_bucket_levels", - MetricsParams: map[string]string{ - "account_key": accountKey, - "quorum": fmt.Sprint(param.QuorumID), - "type": AccountThroughputType.Plug(), - }, }) } From c59581d56314b7f898f486dde57ada82a41a8ab8 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Tue, 28 May 2024 12:07:24 -0700 Subject: [PATCH 13/13] Add requester name --- common/ratelimit/limiter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 6aee3017c5..f522f9b64a 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -119,7 +119,7 @@ func (d *rateLimiter) checkAllowed(ctx context.Context, params common.RequestPar bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - d.logger.Debug("Bucket level updated", "key", params.RequesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + d.logger.Debug("Bucket level updated", "key", params.RequesterID, "name", params.RequesterName, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) // Update metrics only if the requester name is provided. We're making // an assumption that the requester name is only provided for authenticated