Celery Once allows you to prevent multiple execution and queuing of celery tasks.
Installing celery_once
is simple with pip, just run:
pip install -U celery_once
- Celery. Built to run with Celery 3.1. Older versions may work, but are not officially supported.
- Redis is used as a distributed locking mechanism.
To use celery_once
, your tasks need to inherit from an abstract base task called QueueOnce
.
You may need to tune the following Celery configuration options...
ONCE_REDIS_URL
should point towards a running Redis instance (defaults toredis://localhost:6379/0
)ONCE_DEFAULT_TIMEOUT
how many seconds after a lock has been set before it should automatically timeout (defaults to 3600 seconds, or 1 hour).
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE_REDIS_URL = 'redis://localhost:6379/0'
celery.conf.ONCE_DEFAULT_TIMEOUT = 60 * 60
@celery.task(base=QueueOnce)
def slow_task():
sleep(30)
return "Done!"
Behind the scenes, this overrides apply_async
and delay
. It does not affect calling the tasks directly.
When running the task, celery_once
checks that no lock is in place (against a Redis key).
If it isn't, the task will run as normal. Once the task completes (or ends due to an exception) the lock will clear.
If an attempt is made to run the task again before it completes an AlreadyQueued
exception will be raised.
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()
Optionally, instead of raising an AlreadyQueued
exception, the task can return None
if once={'graceful': True}
is set in the task's options or when run through apply_async
.
from celery_once import AlreadyQueued
# Either catch the exception,
try:
example.delay(10)
except AlreadyQueued:
pass
# Or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': True})
# or by default.
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
sleep(30)
return "Done!"
By default celery_once
creates a lock based on the task's name and its arguments and values.
Take for example, the following task below...
@celery.task(base=QueueOnce)
def slow_add(a, b):
sleep(30)
return a + b
Running the task with different arguments will default to checking against different locks.
slow_add(1, 1)
slow_add(1, 2)
If you want to specify locking based on a subset, or no arguments you can adjust the keys celery_once
looks at in the task's options with once={'keys': [..]}
@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
sleep(30)
return a + b
example.delay(1, 1)
# Checks if any tasks are running with the `a=1`
example.delay(1, 2)
Traceback (most recent call last):
..
AlreadyQueued()
example.delay(2, 2)
@celery.task(base=QueueOnce, once={'keys': []})
def slow_add(a, b):
sleep(30)
return a + b
# Will enforce only one task can run, no matter what arguments.
example.delay(1, 1)
example.delay(2, 2)
Traceback (most recent call last):
..
AlreadyQueued()
As a fall back, celery_once
will clear a lock after 60 minutes.
This is set globally in Celery's configuration with ONCE_DEFAULT_TIMEOUT
but can be set for individual tasks using...
@celery.task(base=QueueOnce, once={'timeout': 60 * 60 * 10})
def long_running_task():
sleep(60 * 60 * 3)
By default, the lock is removed after the task has executed (using celery's after_return). This behaviour can be changed setting the task's option unlock_before_run
. When set to True
, the lock will be removed just before executing the task.
Caveat: any retry of the task won't re-enable the lock!
@celery.task(base=QueueOnce, once={'unlock_before_run': True})
def slow_task():
sleep(30)
return "Done!"
Chaining off of an AlreadyQueued task will result in the subsequent tasks never being executed. To solve this problem, use CeleryOnce instead of Celery when creating your app:
from celery_once.app import CeleryOnce
celery = CeleryOnce('tasks', broker='amqp://guest@localhost//')
This will create an additional task which is called with the same callback as the task which threw AlreadyQueued. This will retry if the AlreadyQueued task is not ready, and if it is reraise any errors or rereturn any value.
- Tests are run against Python 2.7 and 3.3. Other versions may work, but are not officially supported.
Contributions are welcome, and they are greatly appreciated! See contributing guide for more details.