Skip to content

Commit

Permalink
Implement Lockable KVStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Siddharth More authored and Siddharth More committed Nov 28, 2023
1 parent 29a376b commit cac367f
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 82 deletions.
17 changes: 6 additions & 11 deletions common/aws/elasticcache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package elasticcache

import (
"context"
"errors"
"time"

"github.com/Layr-Labs/eigenda/common"
Expand Down Expand Up @@ -42,18 +41,12 @@ func (c *RedisClient) Get(ctx context.Context, key string) *redis.StringCmd {
}

// Set sets a value in Redis
func (c *RedisClient) Set(ctx context.Context, key string, value interface{}, lockKey string, lockValue string, expiration time.Duration) (*redis.StatusCmd, error) {

// TODO: Make RedisLock Expiration a Configurable parameter
if !c.acquireLock(lockKey, lockValue, time.Second*30) {
return nil, errors.New("unable to acquire lock")
}
defer c.releaseLock(lockKey, lockValue)
func (c *RedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) (*redis.StatusCmd, error) {

return c.redisClient.Set(ctx, key, value, expiration), nil
}

func (c *RedisClient) acquireLock(lockKey string, lockValue string, expiration time.Duration) bool {
func (c *RedisClient) AcquireLock(lockKey string, lockValue string, expiration time.Duration) bool {
result, err := c.redisClient.SetNX(context.Background(), lockKey, lockValue, expiration).Result()
if err != nil {
// Handle error
Expand All @@ -62,7 +55,7 @@ func (c *RedisClient) acquireLock(lockKey string, lockValue string, expiration t
return result
}

func (c *RedisClient) releaseLock(lockKey string, lockValue string) {
func (c *RedisClient) ReleaseLock(lockKey string, lockValue string) error {
script := `
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
Expand All @@ -72,6 +65,8 @@ func (c *RedisClient) releaseLock(lockKey string, lockValue string) {
`
_, err := c.redisClient.Eval(context.Background(), script, []string{lockKey}, lockValue).Result()
if err != nil {
// Handle error
return err
}

return nil
}
48 changes: 14 additions & 34 deletions common/aws/elasticcache/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestRedisClientUpdateAndGet(t *testing.T) {
// Set up the Redis client
cfg := elasticCache.RedisClientConfig{
EndpointURL: "localhost",
Port: "6379", // Assuming Redis is running on the default port
}

client, err := elasticCache.NewClient(cfg, nil)
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}

// Test setting a value
key := "testKey"
value := "testValue"
_, err = client.Set(context.Background(), key, value, "testKey", "testValue", 10*time.Second)
assert.NoError(t, err, "Set should not return an error")

// Test getting the value
stringCmd := client.Get(context.Background(), key)
result, err := stringCmd.Result()
assert.NoError(t, err, "Get should not return an error")
assert.Equal(t, value, result, "Get should return the value that was set")
}

func TestRedisClient(t *testing.T) {
// Set up the Redis client
cfg := elasticCache.RedisClientConfig{
Expand All @@ -45,19 +20,24 @@ func TestRedisClient(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}
assert.NoError(t, err, "NewClient should not return an error")
assert.NotNil(t, client, "RedisClient should not be nil")

// Test setting a value
// Test Set method
key := "testKey"
value := "testValue"
_, err = client.Set(context.Background(), key, value, "testKey", "testValue", 10*time.Second)
_, err = client.Set(context.Background(), key, value, 0) // 0 expiration means no expiration
assert.NoError(t, err, "Set should not return an error")

_, err = client.Set(context.Background(), key, value, "testKey", "testValue", 10*time.Second)
assert.Error(t, err, "Set should fail when the lock is already held")

// Test getting the value
stringCmd := client.Get(context.Background(), key)
result, err := stringCmd.Result()
// Test Get method
getCmd := client.Get(context.Background(), key)
getResult, err := getCmd.Result()
assert.NoError(t, err, "Get should not return an error")
assert.Equal(t, value, result, "Get should return the value that was set")
assert.Equal(t, value, getResult, "Get should return the value that was set")

// Test AcquireLock and ReleaseLock methods
lockKey := "testLockKey"
lockValue := "uniqueLockValue"
assert.True(t, client.AcquireLock(lockKey, lockValue, time.Second*10), "AcquireLock should return true")
assert.NoError(t, client.ReleaseLock(lockKey, lockValue), "ReleaseLock should not return an error")
}
15 changes: 14 additions & 1 deletion common/param_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package common

import "context"
import (
"context"
"time"
)

// KVStore is a simple key value store interface.
type KVStore[T any] interface {
Expand All @@ -9,3 +12,13 @@ type KVStore[T any] interface {
// UpdateItem updates the value for the given key.
UpdateItem(ctx context.Context, key string, value *T) error
}

// LockableKVStore extends KVStore with lock and unlock capabilities.
type LockableKVStore[T any] interface {
KVStore[T] // Embedding KVStore

// AcquireLock tries to acquire a lock and returns true if successful.
AcquireLock(lockKey string, expiration time.Duration) bool
// ReleaseLock releases the acquired lock.
ReleaseLock(lockKey string) error
}
16 changes: 14 additions & 2 deletions common/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ratelimit

import (
"context"
"errors"
"log"
"time"

"github.com/Layr-Labs/eigenda/common"
)

type BucketStore = common.KVStore[common.RateBucketParams]
type BucketStore = common.LockableKVStore[common.RateBucketParams]

type rateLimiter struct {
globalRateParams common.GlobalRateParams
Expand All @@ -29,8 +31,18 @@ func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore,
func (d *rateLimiter) AllowRequest(ctx context.Context, requesterID common.RequesterID, blobSize uint, rate common.RateParam) (bool, error) {

// Retrieve bucket params for the requester ID
// This will be from dynamo for Disperser and from local storage for DA node
if !d.bucketStore.AcquireLock(requesterID, time.Second) {
return false, errors.New("unable to acquire lock")
}

defer func() {
if err := d.bucketStore.ReleaseLock(requesterID); err != nil {
// Handle the error, e.g., log it
log.Printf("Failed to release lock: %v", err)
}
}()

// This will be from Redis for Disperser and from local storage for DA node
bucketParams, err := d.bucketStore.GetItem(ctx, requesterID)
if err != nil {

Expand Down
2 changes: 1 addition & 1 deletion common/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func makeTestRatelimiter() (common.RateLimiter, error) {
}
bucketStoreSize := 1000

bucketStore, err := store.NewLocalParamStore[common.RateBucketParams](bucketStoreSize)
bucketStore, err := store.NewLocalParamStore[common.RateBucketParams](bucketStoreSize, "test_lock")
if err != nil {
return nil, err
}
Expand Down
53 changes: 50 additions & 3 deletions common/store/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,35 @@ package store
import (
"context"
"errors"
"sync"
"time"

"github.com/Layr-Labs/eigenda/common"
lru "github.com/hashicorp/golang-lru/v2"
)

type lockInfo struct {
mu sync.Mutex
locked bool
}

// Global map for locks
var globalLocks sync.Map

type localParamStore[T any] struct {
cache *lru.Cache[string, T]
cache *lru.Cache[string, T]
lockValue string // global lock value
}

func NewLocalParamStore[T any](size int) (common.KVStore[T], error) {
func NewLocalParamStore[T any](size int, lockValue string) (common.LockableKVStore[T], error) {
cache, err := lru.New[string, T](size)
if err != nil {
return nil, err
}

return &localParamStore[T]{
cache: cache,
cache: cache,
lockValue: lockValue,
}, nil
}

Expand All @@ -40,3 +52,38 @@ func (s *localParamStore[T]) UpdateItem(ctx context.Context, key string, params

return nil
}

// AcquireLock implementation for RedisStore
func (s *localParamStore[T]) AcquireLock(key string, expiration time.Duration) bool {

val, _ := globalLocks.LoadOrStore(key, &lockInfo{})
lock := val.(*lockInfo)

lock.mu.Lock()
defer lock.mu.Unlock()

if !lock.locked {
lock.locked = true
return true
}
return false
}

// ReleaseLock implementation for RedisStore
func (s *localParamStore[T]) ReleaseLock(key string) error {
val, ok := globalLocks.Load(key)
if !ok {
return errors.New("no lock found for the given key")
}

lock := val.(*lockInfo)

lock.mu.Lock()
defer lock.mu.Unlock()

if lock.locked {
lock.locked = false
return nil
}
return errors.New("lock not acquired or already released")
}
14 changes: 13 additions & 1 deletion common/store/local_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (

func TestLocalStore(t *testing.T) {

localStore, err := store.NewLocalParamStore[common.RateBucketParams](inmemBucketStoreSize)
localStore, err := store.NewLocalParamStore[common.RateBucketParams](inmemBucketStoreSize, "disperser_lock")
assert.NoError(t, err)

ctx := context.Background()
Expand All @@ -39,3 +39,15 @@ func TestLocalStore(t *testing.T) {
assert.Equal(t, p, p2)

}

func TestLockingMechanism(t *testing.T) {
localStore, err := store.NewLocalParamStore[common.RateBucketParams](inmemBucketStoreSize, "disperser_lock")
assert.NoError(t, err)
key := "lockKey"
if localStore.AcquireLock(key, time.Second*5) {
err := localStore.ReleaseLock(key)
assert.NoError(t, err)
} else {
t.Error("Failed to acquire lock")
}
}
23 changes: 15 additions & 8 deletions common/store/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@ package store
import (
"context"
"encoding/json"
"time"

"github.com/Layr-Labs/eigenda/common"
elasticCache "github.com/Layr-Labs/eigenda/common/aws/elasticcache"
)

type RedisStore[T any] struct {
client *elasticCache.RedisClient
lockKey string
lockValue string // Unique value for the lock, e.g., UUID or server identifier
lockValue string // global lock value
}

func NewRedisStore[T any](client *elasticCache.RedisClient, lockKey string, lockValue string) common.KVStore[T] {
return &RedisStore[T]{client: client,
lockKey: lockKey,
lockValue: lockValue,
}
func NewRedisStore[T any](client *elasticCache.RedisClient, lockValue string) common.LockableKVStore[T] {
return &RedisStore[T]{client: client, lockValue: lockValue}
}

func (s *RedisStore[T]) GetItem(ctx context.Context, key string) (*T, error) {
Expand All @@ -42,6 +39,16 @@ func (s *RedisStore[T]) UpdateItem(ctx context.Context, key string, value *T) er
return err
}

_, err = s.client.Set(ctx, key, jsonData, s.lockKey, s.lockValue, 0) // 0 means no expiration
_, err = s.client.Set(ctx, key, jsonData, 0) // 0 means no expiration
return err
}

// AcquireLock implementation for RedisStore
func (s *RedisStore[T]) AcquireLock(key string, expiration time.Duration) bool {
return s.client.AcquireLock(key, s.lockValue, expiration)
}

// ReleaseLock implementation for RedisStore
func (s *RedisStore[T]) ReleaseLock(key string) error {
return s.client.ReleaseLock(key, s.lockValue)
}
39 changes: 38 additions & 1 deletion common/store/redis_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestRedisStore(t *testing.T) {
t.Fatalf("Failed to create Redis client: %v", err)
}

redisStore := store.NewRedisStore[common.RateBucketParams](redisClient, "testKey", "testValue")
redisStore := store.NewRedisStore[common.RateBucketParams](redisClient, "testKey")

// Run your tests here
// Example: Test Set and Get
Expand All @@ -57,3 +57,40 @@ func TestRedisStore(t *testing.T) {
assert.NoError(t, err, "GetItem should not return an error")
assert.Equal(t, testValue, *result, "GetItem should return the value that was set")
}

func TestRedisStoreAcquireAndReleaseLock(t *testing.T) {

deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false")
if !deployLocalStack {
localStackPort = os.Getenv("LOCALSTACK_PORT")
}

if deployLocalStack {
var err error
dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort)
if err != nil {
teardown()
panic("failed to start localstack container")
}
}

// Set up the Redis client to point to your local Redis server
clientConfig := elasticcache.RedisClientConfig{
EndpointURL: "localhost",
Port: "6379",
}
redisClient, err := elasticcache.NewClient(clientConfig, nil) // Assuming logger can be nil
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}

redisStore := store.NewRedisStore[common.RateBucketParams](redisClient, "testKey")

// Acquire and Release Lock
testKey := "testKey"
locked := redisStore.AcquireLock(testKey, 0)
assert.True(t, locked)

err = redisStore.ReleaseLock(testKey)
assert.NoError(t, err, "Release should not return an error")
}
4 changes: 0 additions & 4 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob

func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *core.Blob, origin string) error {

// TODO(robert): Remove these locks once we have resolved ratelimiting approach
s.mu.Lock()
defer s.mu.Unlock()

for _, param := range blob.RequestHeader.SecurityParams {

rates, ok := s.rateConfig.QuorumRateInfos[param.QuorumID]
Expand Down
Loading

0 comments on commit cac367f

Please sign in to comment.