Skip to content

Commit

Permalink
feat: fallback MGetCache to MGet (#574)
Browse files Browse the repository at this point in the history
* feat: fallback MGetCache to MGet

* feat: update struct

* feat: address comments

* feat: address comments

* fix : apply suggestions from code review

Co-authored-by: Rueian <[email protected]>

* feat: add test for disabled cache

* feat: add placeholder for sentinel client test

* feat: covering sentinel client disabledCache scenario

* feat: add cluster client test

* fix: test case

* feat: remove mockConn DoMultiFn

---------

Co-authored-by: Anuragkillswitch <[email protected]>
Co-authored-by: Rueian <[email protected]>
  • Loading branch information
3 people authored Jun 25, 2024
1 parent 37631b3 commit a5b0dc5
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 9 deletions.
15 changes: 8 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
)

type singleClient struct {
conn conn
stop uint32
cmd Builder
retry bool
conn conn
stop uint32
cmd Builder
retry bool
DisableCache bool
}

func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient, error) {
Expand All @@ -30,11 +31,11 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient
if err := conn.Dial(); err != nil {
return nil, err
}
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry), nil
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache), nil
}

func newSingleClientWithConn(conn conn, builder Builder, retry bool) *singleClient {
return &singleClient{cmd: builder, conn: conn, retry: retry}
func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool) *singleClient {
return &singleClient{cmd: builder, conn: conn, retry: retry, DisableCache: disableCache}
}

func (c *singleClient) B() Builder {
Expand Down
3 changes: 2 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,9 @@ func (c *clusterClient) Dedicate() (DedicatedClient, func()) {
func (c *clusterClient) Nodes() map[string]Client {
c.mu.RLock()
nodes := make(map[string]Client, len(c.conns))
disableCache := c.opt != nil && c.opt.DisableCache
for addr, cc := range c.conns {
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry)
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache)
}
c.mu.RUnlock()
return nodes
Expand Down
15 changes: 15 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []str
if len(keys) == 0 {
return make(map[string]RedisMessage), nil
}
if isCacheDisabled(client) {
return MGet(client, ctx, keys)
}
cmds := mgetcachecmdsp.Get(len(keys), len(keys))
defer mgetcachecmdsp.Put(cmds)
for i := range cmds.s {
Expand All @@ -54,6 +57,18 @@ func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []str
return doMultiCache(client, ctx, cmds.s, keys)
}

func isCacheDisabled(client Client) bool {
switch c := client.(type) {
case *singleClient:
return c.DisableCache
case *sentinelClient:
return c.mOpt != nil && c.mOpt.DisableCache
case *clusterClient:
return c.opt != nil && c.opt.DisableCache
}
return false
}

// MGet is a helper that consults the redis directly with multiple keys by grouping keys within same slot into MGET or multiple GETs
func MGet(client Client, ctx context.Context, keys []string) (ret map[string]RedisMessage, err error) {
if len(keys) == 0 {
Expand Down
50 changes: 50 additions & 0 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ func TestMGetCache(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err %v", err)
}
disabledCacheClient, err := newSingleClient(&ClientOption{InitAddress: []string{""}, DisableCache: true}, m, func(dst string, opt *ClientOption) conn {
return m
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("Delegate DisabledCache MGetCache", func(t *testing.T) {
m.DoFn = func(cmd Completed) RedisResult {
if !reflect.DeepEqual(cmd.Commands(), []string{"MGET", "1", "2"}) {
t.Fatalf("unexpected command %v", cmd)
}
return newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "1"}, {typ: '+', string: "2"}}}, nil)
}
if v, err := MGetCache(disabledCacheClient, context.Background(), 100, []string{"1", "2"}); err != nil || v == nil {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoCache", func(t *testing.T) {
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
if reflect.DeepEqual(multi[0].Cmd.Commands(), []string{"GET", "1"}) && multi[0].TTL == 100 &&
Expand Down Expand Up @@ -62,6 +79,39 @@ func TestMGetCache(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err %v", err)
}
disabledCacheClient, err := newClusterClient(&ClientOption{InitAddress: []string{":0"}, DisableCache: true}, func(dst string, opt *ClientOption) conn {
return m
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("Delegate DisabledCache DoCache", func(t *testing.T) {
keys := make([]string, 100)
for i := range keys {
keys[i] = strconv.Itoa(i)
}
m.DoMultiFn = func(cmd ...Completed) *redisresults {
result := make([]RedisResult, len(cmd))
for i, key := range keys {
if !reflect.DeepEqual(cmd[i].Commands(), []string{"GET", key}) {
t.Fatalf("unexpected command %v", cmd)
return nil
}
result[i] = newResult(RedisMessage{typ: '+', string: key}, nil)
}
return &redisresults{s: result}
}
v, err := MGetCache(disabledCacheClient, context.Background(), 100, keys)
if err != nil {
t.Fatalf("unexpected response %v %v", v, err)
}
for _, key := range keys {
if v[key].string != key {
t.Fatalf("unexpected response %v", v)
}
}
})

t.Run("Delegate DoCache", func(t *testing.T) {
keys := make([]string, 100)
for i := range keys {
Expand Down
3 changes: 2 additions & 1 deletion sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func (c *sentinelClient) Dedicate() (DedicatedClient, func()) {

func (c *sentinelClient) Nodes() map[string]Client {
conn := c.mConn.Load().(conn)
return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry)}
disableCache := c.mOpt != nil && c.mOpt.DisableCache
return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry, disableCache)}
}

func (c *sentinelClient) Close() {
Expand Down
40 changes: 40 additions & 0 deletions sentinel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,46 @@ func TestSentinelClientDelegate(t *testing.T) {
}
defer client.Close()

disabledCacheClient, err := newSentinelClient(&ClientOption{InitAddress: []string{":0"}, DisableCache: true}, func(dst string, opt *ClientOption) conn {
if dst == ":0" {
return s0
}
if dst == ":1" {
return m
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
defer disabledCacheClient.Close()

t.Run("Delegate MGetCache", func(t *testing.T) {
keys := []string{"key1", "key2"}
expectedCommand := []string{"MGET", "key1", "key2"}

m.DoMultiFn = func(cmd ...Completed) *redisresults {
if !reflect.DeepEqual(cmd[0].Commands(), expectedCommand) {
t.Fatalf("unexpected command %v", cmd)
}
return &redisresults{s: []RedisResult{
newResult(RedisMessage{typ: '+', string: "master"}, nil),
}}
}

ret, err := MGetCache(disabledCacheClient, context.Background(), time.Second, keys)
if err != nil {
t.Fatalf("unexpected error %v", err)
}

expected := map[string]RedisMessage{
"key1": {typ: '+', string: "master"},
}
if !reflect.DeepEqual(ret, expected) {
t.Fatalf("unexpected result %v, expected %v", ret, expected)
}
})

t.Run("Nodes", func(t *testing.T) {
if nodes := client.Nodes(); len(nodes) != 1 || nodes[":1"] == nil {
t.Fatalf("unexpected nodes")
Expand Down

0 comments on commit a5b0dc5

Please sign in to comment.