Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RedisCache] For RateLimiting #77

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions common/aws/elasticcache/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package elasticcache

import (
"context"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/go-redis/redis/v8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

type RedisClientConfig struct {
EndpointURL string
Port string
}

type RedisClient struct {
redisClient *redis.Client
logger common.Logger // Ensure common.Logger is imported correctly
}

func NewClient(cfg RedisClientConfig, logger common.Logger) (*RedisClient, error) {
redisClient := redis.NewClient(&redis.Options{
Addr: cfg.EndpointURL + ":" + cfg.Port,
Password: "", // no password set
DB: 0, // use default DB
})

// Test the Redis connection
_, err := redisClient.Ping(context.Background()).Result()
if err != nil {
return nil, err // Return the error instead of logging and exiting
}
logger.Info("Redis connection successful")

return &RedisClient{redisClient: redisClient, logger: logger}, nil
}

// Get retrieves a value from Redis
func (c *RedisClient) Get(ctx context.Context, key string) *redis.StringCmd {
return c.redisClient.Get(ctx, key)
}

// Set sets a value in Redis
func (c *RedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
return c.redisClient.Set(ctx, key, value, expiration)
}
73 changes: 73 additions & 0 deletions common/aws/elasticcache/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package elasticcache_test

import (
"context"
"log"
"testing"
"time"

elasticCache "github.com/Layr-Labs/eigenda/common/aws/elasticcache"
cmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
)

func TestRedisClient(t *testing.T) {
// Start Docker pool
pool, err := dockertest.NewPool("")
if err != nil {
t.Fatalf("Could not connect to Docker: %v", err)
}

// Start Redis container
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "redis",
Tag: "latest",
PortBindings: map[docker.Port][]docker.PortBinding{
"6379/tcp": {{HostIP: "", HostPort: "6379"}},
},
})
if err != nil {
t.Fatalf("Could not start Redis container: %v", err)
}

// Delay cleanup until after all tests have run
t.Cleanup(func() {
if err := pool.Purge(resource); err != nil {
t.Fatalf("Could not purge Redis container: %v", err)
}
})

// Wait for Redis to be ready
if err := pool.Retry(func() error {
// Perform a health check...
return nil // return nil if healthy
}); err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}

// Set up Redis client
cfg := elasticCache.RedisClientConfig{
EndpointURL: "localhost",
Port: "6379",
}

logger := &cmock.Logger{}
client, err := elasticCache.NewClient(cfg, logger)
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}

// Test setting a value
key := "testKey"
value := "testValue"
_, err = client.Set(context.Background(), key, value, 10*time.Second).Result()
assert.NoError(t, err, "Set should not return an error")

// Test getting the value
stringCmd := client.Get(context.Background(), key)
result, err := stringCmd.Result()
assert.NoError(t, err, "Get should not return an error")
assert.Equal(t, value, result, "Get should return the value that was set")
}
41 changes: 41 additions & 0 deletions common/store/redis_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package store

import (
"context"
"encoding/json"

"github.com/Layr-Labs/eigenda/common"
commoncache "github.com/Layr-Labs/eigenda/common/aws/elasticcache"
)

type RedisStore[T any] struct {
client *commoncache.RedisClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an interface and not implementation

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this interface static checker.
var _ common.KVStore[any] = (*RedisStore[any])(nil)

func NewRedisStore[T any](client *commoncache.RedisClient) common.KVStore[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client Should be an interface and not implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferable to return pointer struct instead of interface by convention
*RedisStore[T]

return &RedisStore[T]{client: client}
}

func (s *RedisStore[T]) GetItem(ctx context.Context, key string) (*T, error) {
val, err := s.client.Get(ctx, key).Result()
if err != nil {
return nil, err
}

var item T
err = json.Unmarshal([]byte(val), &item)
if err != nil {
return nil, err
}

return &item, nil
}

func (s *RedisStore[T]) UpdateItem(ctx context.Context, key string, value *T) error {
jsonData, err := json.Marshal(value)
if err != nil {
return err
}

return s.client.Set(ctx, key, jsonData, 0).Err() // 0 means no expiration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we want do add expiration? What do you think of adding it as params?

}
79 changes: 79 additions & 0 deletions common/store/redis_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package store_test

import (
"context"
"log"
"testing"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws/elasticcache"
cmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/common/store"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/assert"
)

func TestRedisStore(t *testing.T) {
// Start Docker pool
pool, err := dockertest.NewPool("")
if err != nil {
t.Fatalf("Could not connect to Docker: %v", err)
}

// Start Redis container
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "redis",
Tag: "latest",
PortBindings: map[docker.Port][]docker.PortBinding{
"6379/tcp": {{HostIP: "", HostPort: "6379"}},
},
})
if err != nil {
t.Fatalf("Could not start Redis container: %v", err)
}

// Delay cleanup until after all tests have run
t.Cleanup(func() {
if err := pool.Purge(resource); err != nil {
t.Fatalf("Could not purge Redis container: %v", err)
}
})

// Wait for Redis to be ready
if err := pool.Retry(func() error {
// Perform a health check...
return nil // return nil if healthy
}); err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}

// Set up the Redis client to point to your local Redis server
clientConfig := elasticcache.RedisClientConfig{
EndpointURL: "localhost",
Port: "6379",
}

redisClient, err := elasticcache.NewClient(clientConfig, &cmock.Logger{}) // Assuming logger can be nil
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}

redisStore := store.NewRedisStore[common.RateBucketParams](redisClient)

// Test Update and Get Item
ctx := context.Background()
testKey := "testKey"
testValue := common.RateBucketParams{
BucketLevels: []time.Duration{time.Second, time.Minute},
LastRequestTime: time.Now().UTC(),
}

err = redisStore.UpdateItem(ctx, testKey, &testValue)
assert.NoError(t, err, "UpdateItem should not return an error")

result, err := redisStore.GetItem(ctx, testKey)
assert.NoError(t, err, "GetItem should not return an error")
assert.Equal(t, testValue, *result, "GetItem should return the value that was set")
}
9 changes: 9 additions & 0 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/aws/elasticcache"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/common/logging"
"github.com/Layr-Labs/eigenda/common/ratelimit"
Expand All @@ -27,6 +28,8 @@ type Config struct {

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
RateLimiterRedisClient bool
RedisClientConfig elasticcache.RedisClientConfig
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand Down Expand Up @@ -59,6 +62,12 @@ func NewConfig(ctx *cli.Context) (Config, error) {

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),

// TODO Define a new config for RedisClientConfig
RedisClientConfig: elasticcache.RedisClientConfig{
EndpointURL: ctx.GlobalString("redis-endpoint-url"),
Port: ctx.GlobalString("redis-port"),
},
}
return config, nil
}
27 changes: 21 additions & 6 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"

"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/common/aws/elasticcache"
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/common/logging"
Expand Down Expand Up @@ -97,16 +98,30 @@ func RunDisperserServer(ctx *cli.Context) error {
globalParams := config.RatelimiterConfig.GlobalRateParams

var bucketStore common.KVStore[common.RateBucketParams]
if config.BucketTableName != "" {
dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)

// Can be defined as a Factory of Stores used by RateLimiter
if config.RateLimiterRedisClient {
redisClient, err := elasticcache.NewClient(config.RedisClientConfig, logger)
if err != nil {
return err
}
bucketStore = store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName)

bucketStore = store.NewRedisStore[common.RateBucketParams](redisClient)

} else {
bucketStore, err = store.NewLocalParamStore[common.RateBucketParams](config.BucketStoreSize)
if err != nil {
return err
if config.BucketTableName != "" {
dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}
bucketStore = store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName)
}

if bucketStore == nil {
bucketStore, err = store.NewLocalParamStore[common.RateBucketParams](config.BucketStoreSize)
if err != nil {
return err
}
}
}
ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/fxamacker/cbor/v2 v2.5.0
github.com/gin-contrib/logger v0.2.6
github.com/gin-gonic/gin v1.9.1
github.com/go-redis/redis/v8 v8.11.5
github.com/hashicorp/go-multierror v1.1.1
github.com/joho/godotenv v1.5.1
github.com/onsi/ginkgo/v2 v2.11.0
Expand Down Expand Up @@ -69,6 +70,7 @@ require (
github.com/containerd/continuity v0.3.0 // indirect
github.com/crate-crypto/go-kzg-4844 v0.3.0 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v20.10.17+incompatible // indirect
github.com/docker/docker v24.0.6+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
Expand Down
Loading
Loading