Skip to content

Commit

Permalink
Relay rate limits (#906)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Nov 19, 2024
1 parent fbe6d3b commit afd5894
Show file tree
Hide file tree
Showing 12 changed files with 1,224 additions and 54 deletions.
33 changes: 18 additions & 15 deletions relay/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/Layr-Labs/eigenda/relay/limiter"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
Expand All @@ -12,21 +13,6 @@ import (
)

// Config is the configuration for the relay Server.
//
// Environment variables are mapped into this struct by taking the name of the field in this struct,
// converting to upper case, and prepending "RELAY_". For example, "BlobCacheSize" can be set using the
// environment variable "RELAY_BLOBCACHESIZE".
//
// For nested structs, add the name of the struct variable before the field name, separated by an underscore.
// For example, "Log.Format" can be set using the environment variable "RELAY_LOG_FORMAT".
//
// Slice values can be set using a comma-separated list. For example, "RelayIDs" can be set using the environment
// variable "RELAY_RELAYIDS='1,2,3,4'".
//
// It is also possible to set the configuration using a configuration file. The path to the configuration file should
// be passed as the first argument to the relay binary, e.g. "bin/relay config.yaml". The structure of the config
// file should mirror the structure of this struct, with keys in the config file matching the field names
// of this struct.
type Config struct {

// Log is the configuration for the logger. Default is common.DefaultLoggerConfig().
Expand Down Expand Up @@ -70,6 +56,23 @@ func NewConfig(ctx *cli.Context) (Config, error) {
BlobMaxConcurrency: ctx.Int(flags.BlobMaxConcurrencyFlag.Name),
ChunkCacheSize: ctx.Int(flags.ChunkCacheSizeFlag.Name),
ChunkMaxConcurrency: ctx.Int(flags.ChunkMaxConcurrencyFlag.Name),
RateLimits: limiter.Config{
MaxGetBlobOpsPerSecond: ctx.Float64(flags.MaxGetBlobOpsPerSecondFlag.Name),
GetBlobOpsBurstiness: ctx.Int(flags.GetBlobOpsBurstinessFlag.Name),
MaxGetBlobBytesPerSecond: ctx.Float64(flags.MaxGetBlobBytesPerSecondFlag.Name),
GetBlobBytesBurstiness: ctx.Int(flags.GetBlobBytesBurstinessFlag.Name),
MaxConcurrentGetBlobOps: ctx.Int(flags.MaxConcurrentGetBlobOpsFlag.Name),
MaxGetChunkOpsPerSecond: ctx.Float64(flags.MaxGetChunkOpsPerSecondFlag.Name),
GetChunkOpsBurstiness: ctx.Int(flags.GetChunkOpsBurstinessFlag.Name),
MaxGetChunkBytesPerSecond: ctx.Float64(flags.MaxGetChunkBytesPerSecondFlag.Name),
GetChunkBytesBurstiness: ctx.Int(flags.GetChunkBytesBurstinessFlag.Name),
MaxConcurrentGetChunkOps: ctx.Int(flags.MaxConcurrentGetChunkOpsFlag.Name),
MaxGetChunkOpsPerSecondClient: ctx.Float64(flags.MaxGetChunkOpsPerSecondClientFlag.Name),
GetChunkOpsBurstinessClient: ctx.Int(flags.GetChunkOpsBurstinessClientFlag.Name),
MaxGetChunkBytesPerSecondClient: ctx.Float64(flags.MaxGetChunkBytesPerSecondClientFlag.Name),
GetChunkBytesBurstinessClient: ctx.Int(flags.GetChunkBytesBurstinessClientFlag.Name),
MaxConcurrentGetChunkOpsClient: ctx.Int(flags.MaxConcurrentGetChunkOpsClientFlag.Name),
},
},
}
for i, id := range relayIDs {
Expand Down
119 changes: 119 additions & 0 deletions relay/cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,110 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHUNK_MAX_CONCURRENCY"),
Value: 32,
}
MaxGetBlobOpsPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-blob-ops-per-second"),
Usage: "Max number of GetBlob operations per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_BLOB_OPS_PER_SECOND"),
Value: 1024,
}
GetBlobOpsBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-blob-ops-burstiness"),
Usage: "Burstiness of the GetBlob rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_BLOB_OPS_BURSTINESS"),
Value: 1024,
}
MaxGetBlobBytesPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-blob-bytes-per-second"),
Usage: "Max bandwidth for GetBlob operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_BLOB_BYTES_PER_SECOND"),
Value: 20 * 1024 * 1024,
}
GetBlobBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-blob-bytes-burstiness"),
Usage: "Burstiness of the GetBlob bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_BLOB_BYTES_BURSTINESS"),
Value: 20 * 1024 * 1024,
}
MaxConcurrentGetBlobOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-blob-ops"),
Usage: "Max number of concurrent GetBlob operations",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_BLOB_OPS"),
Value: 1024,
}
MaxGetChunkOpsPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-ops-per-second"),
Usage: "Max number of GetChunk operations per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_OPS_PER_SECOND"),
Value: 1024,
}
GetChunkOpsBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-ops-burstiness"),
Usage: "Burstiness of the GetChunk rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_OPS_BURSTINESS"),
Value: 1024,
}
MaxGetChunkBytesPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-bytes-per-second"),
Usage: "Max bandwidth for GetChunk operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND"),
Value: 20 * 1024 * 1024,
}
GetChunkBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS"),
Value: 20 * 1024 * 1024,
}
MaxConcurrentGetChunkOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops"),
Usage: "Max number of concurrent GetChunk operations",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_CHUNK_OPS"),
Value: 1024,
}
MaxGetChunkOpsPerSecondClientFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-ops-per-second-client"),
Usage: "Max number of GetChunk operations per second per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_OPS_PER_SECOND_CLIENT"),
Value: 8,
}
GetChunkOpsBurstinessClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-ops-burstiness-client"),
Usage: "Burstiness of the GetChunk rate limiter per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_OPS_BURSTINESS_CLIENT"),
Value: 8,
}
MaxGetChunkBytesPerSecondClientFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-bytes-per-second-client"),
Usage: "Max bandwidth for GetChunk operations in bytes per second per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND_CLIENT"),
Value: 2 * 1024 * 1024,
}
GetChunkBytesBurstinessClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness-client"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS_CLIENT"),
}
MaxConcurrentGetChunkOpsClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops-client"),
Usage: "Max number of concurrent GetChunk operations per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_CHUNK_OPS_CLIENT"),
Value: 1,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -102,6 +206,21 @@ var optionalFlags = []cli.Flag{
BlobMaxConcurrencyFlag,
ChunkCacheSizeFlag,
ChunkMaxConcurrencyFlag,
MaxGetBlobOpsPerSecondFlag,
GetBlobOpsBurstinessFlag,
MaxGetBlobBytesPerSecondFlag,
GetBlobBytesBurstinessFlag,
MaxConcurrentGetBlobOpsFlag,
MaxGetChunkOpsPerSecondFlag,
GetChunkOpsBurstinessFlag,
MaxGetChunkBytesPerSecondFlag,
GetChunkBytesBurstinessFlag,
MaxConcurrentGetChunkOpsFlag,
MaxGetChunkOpsPerSecondClientFlag,
GetChunkOpsBurstinessClientFlag,
MaxGetChunkBytesPerSecondClientFlag,
GetChunkBytesBurstinessClientFlag,
MaxConcurrentGetChunkOpsClientFlag,
}

var Flags []cli.Flag
Expand Down
102 changes: 102 additions & 0 deletions relay/limiter/blob_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package limiter

import (
"fmt"
"golang.org/x/time/rate"
"sync"
"time"
)

// BlobRateLimiter enforces rate limits on GetBlob operations.
type BlobRateLimiter struct {

// config is the rate limit configuration.
config *Config

// opLimiter enforces rate limits on the maximum rate of GetBlob operations
opLimiter *rate.Limiter

// bandwidthLimiter enforces rate limits on the maximum bandwidth consumed by GetBlob operations. Only the size
// of the blob data is considered, not the size of the entire response.
bandwidthLimiter *rate.Limiter

// operationsInFlight is the number of GetBlob operations currently in flight.
operationsInFlight int

// this lock is used to provide thread safety
lock sync.Mutex
}

// NewBlobRateLimiter creates a new BlobRateLimiter.
func NewBlobRateLimiter(config *Config) *BlobRateLimiter {
globalGetBlobOpLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobOpsPerSecond),
config.GetBlobOpsBurstiness)

globalGetBlobBandwidthLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobBytesPerSecond),
config.GetBlobBytesBurstiness)

return &BlobRateLimiter{
config: config,
opLimiter: globalGetBlobOpLimiter,
bandwidthLimiter: globalGetBlobBandwidthLimiter,
}
}

// BeginGetBlobOperation should be called when a GetBlob operation is about to begin. If it returns an error,
// the operation should not be performed. If it does not return an error, FinishGetBlobOperation should be
// called when the operation completes.
func (l *BlobRateLimiter) BeginGetBlobOperation(now time.Time) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

l.lock.Lock()
defer l.lock.Unlock()

if l.operationsInFlight >= l.config.MaxConcurrentGetBlobOps {
return fmt.Errorf("global concurrent request limit exceeded for getBlob operations, try again later")
}
if l.opLimiter.TokensAt(now) < 1 {
return fmt.Errorf("global rate limit exceeded for getBlob operations, try again later")
}

l.operationsInFlight++
l.opLimiter.AllowN(now, 1)

return nil
}

// FinishGetBlobOperation should be called exactly once for each time BeginGetBlobOperation is called and
// returns nil.
func (l *BlobRateLimiter) FinishGetBlobOperation() {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return
}

l.lock.Lock()
defer l.lock.Unlock()

l.operationsInFlight--
}

// RequestGetBlobBandwidth should be called when a GetBlob is about to start downloading blob data
// from S3. It returns an error if there is insufficient bandwidth available. If it returns nil, the
// operation should proceed.
func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

// no locking needed, the only thing we touch here is the bandwidthLimiter, which is inherently thread-safe

allowed := l.bandwidthLimiter.AllowN(now, int(bytes))
if !allowed {
return fmt.Errorf("global rate limit exceeded for getBlob bandwidth, try again later")
}
return nil
}
Loading

0 comments on commit afd5894

Please sign in to comment.