From 3507f6a7a9e336662670b411f224bcde80303cec Mon Sep 17 00:00:00 2001 From: Francisco Date: Wed, 30 Nov 2022 20:31:09 +0700 Subject: [PATCH 1/7] feature: get multiple with redis pipeline --- keeper.go | 22 ++++++++++++++++++++++ keeper_test.go | 39 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/keeper.go b/keeper.go index 7c67f8e..b5d3e3e 100644 --- a/keeper.go +++ b/keeper.go @@ -32,6 +32,7 @@ type ( Get(key string) (any, error) GetOrLock(key string) (any, *redsync.Mutex, error) GetOrSet(key string, fn GetterFn, opts ...func(Item)) ([]byte, error) + GetMultiple(keys []string) ([]any, error) Store(*redsync.Mutex, Item) error StoreWithoutBlocking(Item) error StoreMultiWithoutBlocking([]Item) error @@ -155,6 +156,27 @@ func (k *keeper) Get(key string) (cachedItem any, err error) { return nil, nil } +// GetMultiple :nodoc: +func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { + if k.disableCaching { + return + } + c := k.connPool.Get() + c.Send("MULTI") + for _, key := range keys { + err = c.Send("GET", key) + if err != nil { + return + } + } + r, err := c.Do("EXEC") + if err != nil { + return nil, err + } + + return redigo.Values(r, err) +} + // GetOrLock :nodoc: func (k *keeper) GetOrLock(key string) (cachedItem any, mutex *redsync.Mutex, err error) { if k.disableCaching { diff --git a/keeper_test.go b/keeper_test.go index d8a4c6b..3654f91 100644 --- a/keeper_test.go +++ b/keeper_test.go @@ -7,10 +7,9 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - redigo "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/alicebob/miniredis/v2" ) @@ -1166,3 +1165,39 @@ func TestHashScan_Empty(t *testing.T) { assert.Empty(t, result) assert.EqualValues(t, 0, cursor) } + +func TestGetMultiple(t *testing.T) { + k := NewKeeper() + + m, err := miniredis.Run() + assert.NoError(t, err) + + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + k.SetWaitTime(1 * time.Second) // override wait time to 1 second + + keys := []string{"a", "b", "c"} + items := map[string]string{"a": "A", "b": "B", "c": "C"} + for key, val := range items { + k.StoreWithoutBlocking(NewItem(key, val)) + } + + t.Run("success", func(t *testing.T) { + res, err := k.GetMultiple(keys) + assert.NoError(t, err) + for i, key := range keys { + assert.EqualValues(t, items[key], res[i]) + } + }) + + t.Run("success with missing cache", func(t *testing.T) { + keys2 := append(keys, "d") + res, err := k.GetMultiple(keys2) + assert.NoError(t, err) + for i, key := range keys { + assert.EqualValues(t, items[key], res[i]) + } + assert.EqualValues(t, res[len(res)-1], nil) + }) +} From 8e974ac1e4e9ce7109c36ccd028c9c304e2419c0 Mon Sep 17 00:00:00 2001 From: Francisco Date: Thu, 1 Dec 2022 14:15:04 +0700 Subject: [PATCH 2/7] feature-fix: check error --- keeper.go | 8 +++++++- keeper_test.go | 25 ++++++++++++++++--------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/keeper.go b/keeper.go index b5d3e3e..7b6626b 100644 --- a/keeper.go +++ b/keeper.go @@ -162,13 +162,19 @@ func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { return } c := k.connPool.Get() - c.Send("MULTI") + + err = c.Send("MULTI") + if err != nil { + return nil, err + } + for _, key := range keys { err = c.Send("GET", key) if err != nil { return } } + r, err := c.Do("EXEC") if err != nil { return nil, err diff --git a/keeper_test.go b/keeper_test.go index 3654f91..1d67e24 100644 --- a/keeper_test.go +++ b/keeper_test.go @@ -1177,13 +1177,12 @@ func TestGetMultiple(t *testing.T) { k.SetLockConnectionPool(r) k.SetWaitTime(1 * time.Second) // override wait time to 1 second - keys := []string{"a", "b", "c"} - items := map[string]string{"a": "A", "b": "B", "c": "C"} - for key, val := range items { - k.StoreWithoutBlocking(NewItem(key, val)) - } - t.Run("success", func(t *testing.T) { + keys := []string{"a", "b", "c"} + items := map[string]string{"a": "A", "b": "B", "c": "C"} + for key, val := range items { + k.StoreWithoutBlocking(NewItem(key, val)) + } res, err := k.GetMultiple(keys) assert.NoError(t, err) for i, key := range keys { @@ -1192,12 +1191,20 @@ func TestGetMultiple(t *testing.T) { }) t.Run("success with missing cache", func(t *testing.T) { - keys2 := append(keys, "d") - res, err := k.GetMultiple(keys2) + keys := []string{"d", "b", "a", "o", "c"} + items := map[string]string{"b": "B", "o": "O"} + for key, val := range items { + k.StoreWithoutBlocking(NewItem(key, val)) + } + + res, err := k.GetMultiple(keys) assert.NoError(t, err) for i, key := range keys { + if _, ok := items[key]; !ok { + assert.EqualValues(t, nil, res[i]) + continue + } assert.EqualValues(t, items[key], res[i]) } - assert.EqualValues(t, res[len(res)-1], nil) }) } From c7207e73769789018d2ce8046eccef7a29ae596e Mon Sep 17 00:00:00 2001 From: Francisco Date: Wed, 21 Dec 2022 16:44:56 +0700 Subject: [PATCH 3/7] feature: get multiple without transaction --- keeper.go | 35 +++++++++++++++++++++++++++++++++-- keeper_test.go | 28 +++++++++++++++++----------- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/keeper.go b/keeper.go index 7b6626b..4ae3a82 100644 --- a/keeper.go +++ b/keeper.go @@ -33,6 +33,7 @@ type ( GetOrLock(key string) (any, *redsync.Mutex, error) GetOrSet(key string, fn GetterFn, opts ...func(Item)) ([]byte, error) GetMultiple(keys []string) ([]any, error) + GetMultipleTX(keys []string) ([]any, error) Store(*redsync.Mutex, Item) error StoreWithoutBlocking(Item) error StoreMultiWithoutBlocking([]Item) error @@ -156,8 +157,8 @@ func (k *keeper) Get(key string) (cachedItem any, err error) { return nil, nil } -// GetMultiple :nodoc: -func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { +// GetMultipleTX :nodoc: +func (k *keeper) GetMultipleTX(keys []string) (cachedItems []any, err error) { if k.disableCaching { return } @@ -183,6 +184,36 @@ func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { return redigo.Values(r, err) } +// GetMultiple :nodoc: +func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { + if k.disableCaching { + return + } + c := k.connPool.Get() + + for _, key := range keys { + err = c.Send("GET", key) + if err != nil { + return + } + } + + err = c.Flush() + if err != nil { + return + } + + for _ = range keys { + rep, err := redigo.Bytes(c.Receive()) + if err != nil && err != redigo.ErrNil { + return nil, err + } + cachedItems = append(cachedItems, rep) + } + + return +} + // GetOrLock :nodoc: func (k *keeper) GetOrLock(key string) (cachedItem any, mutex *redsync.Mutex, err error) { if k.disableCaching { diff --git a/keeper_test.go b/keeper_test.go index 1d67e24..926b2a6 100644 --- a/keeper_test.go +++ b/keeper_test.go @@ -1167,17 +1167,15 @@ func TestHashScan_Empty(t *testing.T) { } func TestGetMultiple(t *testing.T) { - k := NewKeeper() - - m, err := miniredis.Run() - assert.NoError(t, err) - - r := newRedisConn(m.Addr()) - k.SetConnectionPool(r) - k.SetLockConnectionPool(r) - k.SetWaitTime(1 * time.Second) // override wait time to 1 second - t.Run("success", func(t *testing.T) { + k := NewKeeper() + m, err := miniredis.Run() + assert.NoError(t, err) + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + k.SetWaitTime(1 * time.Second) // override wait time to 1 second + keys := []string{"a", "b", "c"} items := map[string]string{"a": "A", "b": "B", "c": "C"} for key, val := range items { @@ -1191,6 +1189,14 @@ func TestGetMultiple(t *testing.T) { }) t.Run("success with missing cache", func(t *testing.T) { + k := NewKeeper() + m, err := miniredis.Run() + assert.NoError(t, err) + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + k.SetWaitTime(1 * time.Second) // override wait time to 1 second + keys := []string{"d", "b", "a", "o", "c"} items := map[string]string{"b": "B", "o": "O"} for key, val := range items { @@ -1201,7 +1207,7 @@ func TestGetMultiple(t *testing.T) { assert.NoError(t, err) for i, key := range keys { if _, ok := items[key]; !ok { - assert.EqualValues(t, nil, res[i]) + assert.Nil(t, res[i]) continue } assert.EqualValues(t, items[key], res[i]) From b97496bd20b63f52f82487aff89010ec6c34680f Mon Sep 17 00:00:00 2001 From: Francisco Date: Wed, 21 Dec 2022 17:39:15 +0700 Subject: [PATCH 4/7] feature-fix: close connection --- keeper.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/keeper.go b/keeper.go index 4ae3a82..891c664 100644 --- a/keeper.go +++ b/keeper.go @@ -163,6 +163,7 @@ func (k *keeper) GetMultipleTX(keys []string) (cachedItems []any, err error) { return } c := k.connPool.Get() + defer c.Close() err = c.Send("MULTI") if err != nil { @@ -190,6 +191,7 @@ func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { return } c := k.connPool.Get() + defer c.Close() for _, key := range keys { err = c.Send("GET", key) From 6312358fc740531c865750e8025e54bba3847ab2 Mon Sep 17 00:00:00 2001 From: Francisco Date: Wed, 21 Dec 2022 17:46:12 +0700 Subject: [PATCH 5/7] feature-fix: fix --- keeper.go | 8 ++++++-- keeper_test.go | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/keeper.go b/keeper.go index 891c664..2aac9cc 100644 --- a/keeper.go +++ b/keeper.go @@ -163,7 +163,9 @@ func (k *keeper) GetMultipleTX(keys []string) (cachedItems []any, err error) { return } c := k.connPool.Get() - defer c.Close() + defer func() { + _ = c.Close() + }() err = c.Send("MULTI") if err != nil { @@ -191,7 +193,9 @@ func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { return } c := k.connPool.Get() - defer c.Close() + defer func() { + _ = c.Close() + }() for _, key := range keys { err = c.Send("GET", key) diff --git a/keeper_test.go b/keeper_test.go index 926b2a6..fe399c3 100644 --- a/keeper_test.go +++ b/keeper_test.go @@ -1179,7 +1179,7 @@ func TestGetMultiple(t *testing.T) { keys := []string{"a", "b", "c"} items := map[string]string{"a": "A", "b": "B", "c": "C"} for key, val := range items { - k.StoreWithoutBlocking(NewItem(key, val)) + _ = k.StoreWithoutBlocking(NewItem(key, val)) } res, err := k.GetMultiple(keys) assert.NoError(t, err) @@ -1200,7 +1200,7 @@ func TestGetMultiple(t *testing.T) { keys := []string{"d", "b", "a", "o", "c"} items := map[string]string{"b": "B", "o": "O"} for key, val := range items { - k.StoreWithoutBlocking(NewItem(key, val)) + _ = k.StoreWithoutBlocking(NewItem(key, val)) } res, err := k.GetMultiple(keys) From 886ceeb5fa34a7f47c5eae293596857fba887833 Mon Sep 17 00:00:00 2001 From: Francisco Date: Wed, 21 Dec 2022 17:47:56 +0700 Subject: [PATCH 6/7] feature-fix: fix --- keeper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keeper.go b/keeper.go index 2aac9cc..494f494 100644 --- a/keeper.go +++ b/keeper.go @@ -209,7 +209,7 @@ func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { return } - for _ = range keys { + for range keys { rep, err := redigo.Bytes(c.Receive()) if err != nil && err != redigo.ErrNil { return nil, err From 8cbb6a149f3102ddb072ae7ed1d38a333dc7760d Mon Sep 17 00:00:00 2001 From: Francisco Date: Mon, 16 Jan 2023 20:35:04 +0700 Subject: [PATCH 7/7] feature: get multiple or lock --- common.go | 8 +- keeper.go | 144 +++++++++++++++++++++++++++++++++++ keeper_test.go | 104 +++++++++++++++++++++++++ keeper_with_failover_test.go | 2 +- 4 files changed, 254 insertions(+), 4 deletions(-) diff --git a/common.go b/common.go index bbd2704..7eea3bc 100644 --- a/common.go +++ b/common.go @@ -8,9 +8,11 @@ import ( ) // SafeUnlock safely unlock mutex -func SafeUnlock(mutex *redsync.Mutex) { - if mutex != nil { - _, _ = mutex.Unlock() +func SafeUnlock(mutex ...*redsync.Mutex) { + for _, m := range mutex { + if m != nil { + _, _ = m.Unlock() + } } } diff --git a/keeper.go b/keeper.go index 494f494..0af7285 100644 --- a/keeper.go +++ b/keeper.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" redigo "github.com/gomodule/redigo/redis" @@ -34,6 +35,7 @@ type ( GetOrSet(key string, fn GetterFn, opts ...func(Item)) ([]byte, error) GetMultiple(keys []string) ([]any, error) GetMultipleTX(keys []string) ([]any, error) + GetMultipleOrLock(keys []string) ([]any, []*redsync.Mutex, error) Store(*redsync.Mutex, Item) error StoreWithoutBlocking(Item) error StoreMultiWithoutBlocking([]Item) error @@ -220,6 +222,148 @@ func (k *keeper) GetMultiple(keys []string) (cachedItems []any, err error) { return } +// GetMultipleOrLock get multiple and apply locks for non-existing keys on redis. +// Returned cached items will be in order based on keys provided, if the value for some key is not exist then it will be marked as nil on +// returned cached items slice. +func (k *keeper) GetMultipleOrLock(keys []string) (cachedItems []any, mutexes []*redsync.Mutex, err error) { + if k.disableCaching { + return + } + + c := k.connPool.Get() + defer func() { + _ = c.Close() + }() + + for _, key := range keys { + err = c.Send("GET", key) + if err != nil { + return + } + } + + err = c.Flush() + if err != nil { + return + } + + var ( + keysToLock []string + cachedItemsBuf = make(map[string]any) + mutexesBuf = make(map[string]*redsync.Mutex) + ) + for _, k := range keys { + rep, err := redigo.Bytes(c.Receive()) + if err != nil && err != redigo.ErrNil { + return nil, nil, err + } + if rep == nil { + keysToLock = append(keysToLock, k) + continue + } + cachedItemsBuf[k] = rep + } + + type itemWithKey struct { + Key string + Item any + } + + type mutexWithKey struct { + Key string + Mutex *redsync.Mutex + } + + var ( + itemCh = make(chan *itemWithKey) + errCh = make(chan error) + mutexCh = make(chan *mutexWithKey) + ) + + for _, key := range keysToLock { + go func(key string) { + mutex, err := k.AcquireLock(key) + if err == nil { + mutexCh <- &mutexWithKey{Mutex: mutex, Key: key} + return + } + start := time.Now() + for { + b := &backoff.Backoff{ + Jitter: true, + Min: 20 * time.Millisecond, + Max: 200 * time.Millisecond, + } + + if !k.isLocked(key) { + cachedItem, err := get(k.connPool.Get(), key) + if err != nil { + if err == ErrKeyNotExist { + mutex, err = k.AcquireLock(key) + if err == nil { + mutexCh <- &mutexWithKey{Mutex: mutex, Key: key} + return + } + goto Wait + } + errCh <- err + return + } + itemCh <- &itemWithKey{Item: cachedItem, Key: key} + return + } + + Wait: + elapsed := time.Since(start) + if elapsed >= k.waitTime { + errCh <- ErrWaitTooLong + return + } + time.Sleep(b.Duration()) + } + }(key) + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + counter := 0 + for { + select { + case i := <-itemCh: + cachedItemsBuf[i.Key] = i.Item + counter++ + case err = <-errCh: + return + case m := <-mutexCh: + mutexesBuf[m.Key] = m.Mutex + counter++ + default: + if counter == len(keysToLock) { + return + } + } + } + }() + + wg.Wait() + if err != nil { + return + } + + for _, k := range keys { + if v, ok := cachedItemsBuf[k]; ok { + cachedItems = append(cachedItems, v) + } else if m, ok := mutexesBuf[k]; ok { + mutexes = append(mutexes, m) + cachedItems = append(cachedItems, nil) + } + } + + return +} + // GetOrLock :nodoc: func (k *keeper) GetOrLock(key string) (cachedItem any, mutex *redsync.Mutex, err error) { if k.disableCaching { diff --git a/keeper_test.go b/keeper_test.go index fe399c3..b613046 100644 --- a/keeper_test.go +++ b/keeper_test.go @@ -1214,3 +1214,107 @@ func TestGetMultiple(t *testing.T) { } }) } + +func TestGetMultipleOrLock(t *testing.T) { + t.Run("success get all locks", func(t *testing.T) { + k := NewKeeper() + m, err := miniredis.Run() + assert.NoError(t, err) + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + + keys := []string{"key1", "key2", "key3"} + items, mutexes, err := k.GetMultipleOrLock(keys) + + assert.NotNil(t, mutexes) + assert.Equal(t, len(keys), len(mutexes)) + assert.Equal(t, len(keys), len(items)) + }) + + t.Run("success get locks for non existing items", func(t *testing.T) { + k := NewKeeper() + m, err := miniredis.Run() + assert.NoError(t, err) + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + k.SetDefaultTTL(time.Minute) + + keys := []string{"key1", "key2", "key3"} + + err = k.StoreMultiWithoutBlocking([]Item{NewItem("key2", "key2")}) + assert.NoError(t, err) + + items, mutexes, err := k.GetMultipleOrLock(keys) + assert.NoError(t, err) + assert.Equal(t, 2, len(mutexes)) + assert.Equal(t, len(keys), len(items)) + }) + + t.Run("success get all cached items", func(t *testing.T) { + k := NewKeeper() + m, err := miniredis.Run() + assert.NoError(t, err) + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + k.SetDefaultTTL(time.Minute) + + keys := []string{"key1", "key2", "key3"} + items := []Item{ + NewItem("key1", "key1"), + NewItem("key2", "key2"), + NewItem("key3", "key3"), + } + + err = k.StoreMultiWithoutBlocking(items) + assert.NoError(t, err) + + resp, mutexes, err := k.GetMultipleOrLock(keys) + assert.NoError(t, err) + assert.Nil(t, mutexes) + assert.Equal(t, len(keys), len(resp)) + }) + + t.Run("success with wait for cache key to be exists", func(t *testing.T) { + k := NewKeeper() + m, err := miniredis.Run() + assert.NoError(t, err) + r := newRedisConn(m.Addr()) + k.SetConnectionPool(r) + k.SetLockConnectionPool(r) + k.SetDefaultTTL(time.Minute) + + keys := []string{"key1", "key2", "key3"} + + _, mutexes, err := k.GetMultipleOrLock(keys) + assert.NoError(t, err) + assert.Equal(t, len(keys), len(mutexes)) + + items := map[string]string{ + "key1": "val1", + "key2": "val2", + "key3": "val3", + } + + // store item asynchronously so next call to GetMultipleOrLock will get the result + go func() { + defer SafeUnlock(mutexes...) + time.Sleep(1 * time.Second) + var cacheItems []Item + for k, v := range items { + cacheItems = append(cacheItems, NewItem(k, v)) + } + err := k.StoreMultiWithoutBlocking(cacheItems) + assert.NoError(t, err) + }() + + resp2, mutexes2, err := k.GetMultipleOrLock(keys) + assert.NoError(t, err) + assert.Nil(t, mutexes2) + for i, k := range keys { + assert.EqualValues(t, items[k], resp2[i]) + } + }) +} diff --git a/keeper_with_failover_test.go b/keeper_with_failover_test.go index 5175a28..6ce06e7 100644 --- a/keeper_with_failover_test.go +++ b/keeper_with_failover_test.go @@ -353,5 +353,5 @@ func Test_keeperWithFailover_DeleteHashMember(t *testing.T) { assert.True(t, m.Exists(identifier) && mFO.Exists(identifier)) err = k.DeleteHashMember(identifier, "key") assert.NoError(t, err) - assert.False(t, m.Exists(identifier) || m.Exists(identifier)) + assert.False(t, m.Exists(identifier) || mFO.Exists(identifier)) }