diff --git a/cmd/root.go b/cmd/root.go index 82d3e88..72b82a6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -297,10 +297,11 @@ func mustInitializeQueueBackend() { queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: cmdOpts.Backend, Redis: &backendconfig.RedisConfig{ - KeyPrefix: cmdOpts.Redis.KeyPrefix, - Client: cmdOpts.Redis.NewClient(), - Backoff: cmdOpts.Redis.Backoff, - ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + KeyPrefix: cmdOpts.Redis.KeyPrefix, + Client: cmdOpts.Redis.NewClient(), + Backoff: cmdOpts.Redis.Backoff, + ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, }, }) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 1d148ff..61906e4 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -30,10 +30,11 @@ type Config struct { } type RedisConfig struct { - KeyPrefix string - Client *redis.Client - Backoff BackoffConfig - ChunkSizeInGet int + KeyPrefix string + Client *redis.Client + Backoff BackoffConfig + ChunkSizeInGet int + ChunkSizeInDelete int } // TODO: support UniversalOptions @@ -52,7 +53,8 @@ type RedisClientConfig struct { IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"` IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` - ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..9b085d9 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { // .. task_keys = collect task keys // WATCh task_keys // MULTI - // DEL {queue_key} worker_keys task_keys + // UNLINK {queue_key} worker_keys task_keys (chunked) // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,8 +240,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) + chunkSize := b.ChunkSizeInDelete + numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { - pipe.Del(keysToDelete...) + for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { + end := begin + chunkSize + if end > numOfKeysToDelete { + end = numOfKeysToDelete + } + pipe.Unlink(keysToDelete[begin:end]...) + } pipe.HDel(b.allQueuesKey(), queue.Spec.Name) return nil }) diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 36ce2c5..1ca3485 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() { ibackend, err := NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - KeyPrefix: "test", - Client: client, - Backoff: backoffConfig, - ChunkSizeInGet: 1000, + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) @@ -277,6 +278,26 @@ var _ = Describe("Backend", func() { Expect(err).To(Equal(iface.TaskQueueNotFound)) }) }) + When("the large queue exists", func() { + It("can delete the queue", func() { + queue := testutil.MustCreateQueue(backend, SampleQueueSpec) + // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize + numOfTasks := 12345 + for i := 0; i < numOfTasks; i++ { + _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred()) + + queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(queuesHash)).To(Equal(0)) + keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(keys)).To(Equal(0)) + }) + }) }) }) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index dec1c6d..e90a6a2 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -162,10 +162,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func } func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) { - taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []*task.Task{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID) if err != nil { return nil, err } @@ -938,10 +935,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) b.deadletterQueueKey(queueUID), b.pendingTaskQueueKey(queueUID), } - taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []string{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID) if err != nil { return []string{}, err } @@ -950,3 +944,24 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) } return keysToDelete, nil } + +func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) { + var chunkSize = int64(b.ChunkSizeInGet) + var cursor uint64 + var taskUIDs []string + for { + keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", chunkSize).Result() + if err == redis.Nil { + return []string{}, nil + } + if err != nil { + return []string{}, err + } + taskUIDs = append(taskUIDs, keys...) + cursor = nextCursor + if cursor == 0 { + break + } + } + return taskUIDs, nil +} diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index e3b67fc..5f32855 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() { bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - Client: client, - Backoff: backendConfig, - ChunkSizeInGet: 1000, + Client: client, + Backoff: backendConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred())