Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fallback MGetCache to MGet #574

Merged
merged 11 commits into from
Jun 25, 2024
15 changes: 8 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
)

type singleClient struct {
conn conn
stop uint32
cmd Builder
retry bool
conn conn
stop uint32
cmd Builder
retry bool
DisableCache bool
}

func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient, error) {
Expand All @@ -30,11 +31,11 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn) (*singleClient
if err := conn.Dial(); err != nil {
return nil, err
}
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry), nil
return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache), nil
}

func newSingleClientWithConn(conn conn, builder Builder, retry bool) *singleClient {
return &singleClient{cmd: builder, conn: conn, retry: retry}
func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool) *singleClient {
return &singleClient{cmd: builder, conn: conn, retry: retry, DisableCache: disableCache}
}

func (c *singleClient) B() Builder {
Expand Down
3 changes: 2 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,9 @@ func (c *clusterClient) Dedicate() (DedicatedClient, func()) {
func (c *clusterClient) Nodes() map[string]Client {
c.mu.RLock()
nodes := make(map[string]Client, len(c.conns))
disableCache := c.opt != nil && c.opt.DisableCache
for addr, cc := range c.conns {
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry)
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache)
}
c.mu.RUnlock()
return nodes
Expand Down
15 changes: 15 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []str
if len(keys) == 0 {
return make(map[string]RedisMessage), nil
}
if isCacheDisabled(client) {
return MGet(client, ctx, keys)
SoulPancake marked this conversation as resolved.
Show resolved Hide resolved
}
cmds := mgetcachecmdsp.Get(len(keys), len(keys))
defer mgetcachecmdsp.Put(cmds)
for i := range cmds.s {
Expand All @@ -54,6 +57,18 @@ func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []str
return doMultiCache(client, ctx, cmds.s, keys)
}

func isCacheDisabled(client Client) bool {
switch c := client.(type) {
case *singleClient:
return c.DisableCache
case *sentinelClient:
return c.mOpt != nil && c.mOpt.DisableCache
case *clusterClient:
return c.opt != nil && c.opt.DisableCache
}
SoulPancake marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// MGet is a helper that consults the redis directly with multiple keys by grouping keys within same slot into MGET or multiple GETs
func MGet(client Client, ctx context.Context, keys []string) (ret map[string]RedisMessage, err error) {
if len(keys) == 0 {
Expand Down
46 changes: 46 additions & 0 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ func TestMGetCache(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err %v", err)
}
disabledCacheClient, err := newSingleClient(&ClientOption{InitAddress: []string{""}, DisableCache: true}, m, func(dst string, opt *ClientOption) conn {
SoulPancake marked this conversation as resolved.
Show resolved Hide resolved
return m
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("Delegate DisabledCache MGetCache", func(t *testing.T) {
m.DoFn = func(cmd Completed) RedisResult {
if !reflect.DeepEqual(cmd.Commands(), []string{"MGET", "1", "2"}) {
t.Fatalf("unexpected command %v", cmd)
}
return newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "1"}, {typ: '+', string: "2"}}}, nil)
}
if v, err := MGetCache(disabledCacheClient, context.Background(), 100, []string{"1", "2"}); err != nil || v == nil {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoCache", func(t *testing.T) {
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
if reflect.DeepEqual(multi[0].Cmd.Commands(), []string{"GET", "1"}) && multi[0].TTL == 100 &&
Expand Down Expand Up @@ -108,6 +125,35 @@ func TestMGetCache(t *testing.T) {
}
})
})
// TODO: @SoulPancake I have some doubts about this @rueian
SoulPancake marked this conversation as resolved.
Show resolved Hide resolved
//t.Run("sentinel client", func(t *testing.T) {
// m := &mockConn{
// DoFn: func(cmd Completed) RedisResult {
// return slotsResp
// },
// }
// sentinelClient, err := newSentinelClient(&ClientOption{
// InitAddress: []string{":0"},
// Sentinel: SentinelOption{MasterSet: "masters"},
// DisableCache: true,
// }, func(dst string, opt *ClientOption) conn {
// return m
// })
// if err != nil {
// t.Fatalf("unexpected err %v", err)
// }
// t.Run("Delegate DisabledCache MGetCache", func(t *testing.T) {
// m.DoFn = func(cmd Completed) RedisResult {
// if !reflect.DeepEqual(cmd.Commands(), []string{"MGET", "1", "2"}) {
// t.Fatalf("unexpected command %v", cmd)
// }
// return newResult(RedisMessage{typ: '*', values: []RedisMessage{{typ: '+', string: "1"}, {typ: '+', string: "2"}}}, nil)
// }
// if v, err := MGetCache(sentinelClient, context.Background(), 100, []string{"1", "2"}); err != nil || v == nil {
// t.Fatalf("unexpected response %v %v", v, err)
// }
// })
//})
}

//gocyclo:ignore
Expand Down
3 changes: 2 additions & 1 deletion sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func (c *sentinelClient) Dedicate() (DedicatedClient, func()) {

func (c *sentinelClient) Nodes() map[string]Client {
conn := c.mConn.Load().(conn)
return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry)}
disableCache := c.mOpt != nil && c.mOpt.DisableCache
return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry, disableCache)}
}

func (c *sentinelClient) Close() {
Expand Down