From 7c7cf2f3e12154b56c5a0c2b989cc41dfba00f9d Mon Sep 17 00:00:00 2001 From: Maxime VISONNEAU Date: Wed, 24 Mar 2021 17:26:40 +0000 Subject: [PATCH] fixed panics on msg.Name != "" & redis is disabled --- queue.go | 6 ++++- storage.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ taskq.go | 58 ------------------------------------------------- 3 files changed, 69 insertions(+), 59 deletions(-) create mode 100644 storage.go diff --git a/queue.go b/queue.go index d659860..73ab3f3 100644 --- a/queue.go +++ b/queue.go @@ -108,7 +108,11 @@ func (opt *QueueOptions) Init() { } if opt.Storage == nil { - opt.Storage = newRedisStorage(opt.Redis) + if opt.Redis != nil { + opt.Storage = newRedisStorage(opt.Redis) + } else { + opt.Storage = localStorage{} + } } if !opt.RateLimit.IsZero() && opt.RateLimiter == nil && opt.Redis != nil { diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..fd438c2 --- /dev/null +++ b/storage.go @@ -0,0 +1,64 @@ +package taskq + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/golang-lru/simplelru" +) + +type Storage interface { + Exists(ctx context.Context, key string) bool +} + +var _ Storage = (*localStorage)(nil) +var _ Storage = (*redisStorage)(nil) + +// LOCAL + +type localStorage struct { + mu sync.Mutex + cache *simplelru.LRU +} + +func (s localStorage) Exists(_ context.Context, key string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cache == nil { + var err error + s.cache, err = simplelru.NewLRU(128000, nil) + if err != nil { + panic(err) + } + } + + _, ok := s.cache.Get(key) + if ok { + return true + } + + s.cache.Add(key, nil) + return false +} + +// REDIS + +type redisStorage struct { + redis Redis +} + +func newRedisStorage(redis Redis) redisStorage { + return redisStorage{ + redis: redis, + } +} + +func (s redisStorage) Exists(ctx context.Context, key string) bool { + val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result() + if err != nil { + return true + } + return !val +} diff --git a/taskq.go b/taskq.go index 1f21366..026825e 100644 --- a/taskq.go +++ b/taskq.go @@ -4,11 +4,9 @@ import ( "context" "log" "os" - "sync" "time" "github.com/go-redis/redis/v8" - "github.com/hashicorp/golang-lru/simplelru" "github.com/vmihailenco/taskq/v3/internal" ) @@ -41,59 +39,3 @@ type Redis interface { ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd ScriptLoad(ctx context.Context, script string) *redis.StringCmd } - -type Storage interface { - Exists(ctx context.Context, key string) bool -} - -type redisStorage struct { - redis Redis -} - -var _ Storage = (*redisStorage)(nil) - -func newRedisStorage(redis Redis) redisStorage { - return redisStorage{ - redis: redis, - } -} - -func (s redisStorage) Exists(ctx context.Context, key string) bool { - if localCacheExists(key) { - return true - } - - val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result() - if err != nil { - return true - } - return !val -} - -//------------------------------------------------------------------------------ - -var ( - mu sync.Mutex - cache *simplelru.LRU -) - -func localCacheExists(key string) bool { - mu.Lock() - defer mu.Unlock() - - if cache == nil { - var err error - cache, err = simplelru.NewLRU(128000, nil) - if err != nil { - panic(err) - } - } - - _, ok := cache.Get(key) - if ok { - return true - } - - cache.Add(key, nil) - return false -}