Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task lock should be non-local #333

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions tasktiger/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,11 @@ def _periodic_heartbeat(
queue_lock: Optional[Semaphore],
stop_event: threading.Event,
) -> None:
while not stop_event.is_set():
stop_event.wait(self.config["ACTIVE_TASK_UPDATE_TIMER"])
self.heartbeat(queue, task_ids, log, locks, queue_lock)
while not stop_event.wait(self.config["ACTIVE_TASK_UPDATE_TIMER"]):
try:
self.heartbeat(queue, task_ids, log, locks, queue_lock)
except Exception:
log.exception("task heartbeat failed")

def execute(
self,
Expand Down
2 changes: 2 additions & 0 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ def _execute_task_group(
lock = self.connection.lock(
self._key("lockv2", lock_id),
timeout=self.config["ACTIVE_TASK_UPDATE_TIMEOUT"],
# Sync worker uses a thread to renew the lock.
thread_local=False,
)
if not lock.acquire(blocking=False):
log.info("could not acquire lock", task_id=task.id)
Expand Down
39 changes: 33 additions & 6 deletions tests/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from tasktiger import Task, Worker
from tasktiger._internal import ACTIVE
from tasktiger.executor import SyncExecutor
from tasktiger.worker import LOCK_REDIS_KEY

from .config import DELAY
from .tasks import (
Expand Down Expand Up @@ -188,26 +189,52 @@ def test_handles_timeout(self, tiger, ensure_queues):
ensure_queues(error={"default": 1})

def test_heartbeat(self, tiger):
task = Task(tiger, sleep_task)
# Test both task heartbeat and lock renewal.
# We set unique=True so the task ID matches the lock key.
task = Task(tiger, sleep_task, lock=True, unique=True)
task.delay()

# Start a worker and wait until it starts processing.
worker = Process(
target=external_worker,
kwargs={
"patch_config": {"ACTIVE_TASK_UPDATE_TIMER": DELAY / 2},
"worker_kwargs": {"executor_class": SyncExecutor},
"worker_kwargs": {
# Test queue lock.
"max_workers_per_queue": 1,
"executor_class": SyncExecutor,
},
},
)
worker.start()

time.sleep(DELAY / 2)
time.sleep(DELAY)

queue_key = tiger._key(ACTIVE, "default")
queue_lock_key = tiger._key(LOCK_REDIS_KEY, "default")
task_lock_key = tiger._key("lockv2", task.id)

key = tiger._key(ACTIVE, "default")
conn = tiger.connection
heartbeat_1 = conn.zscore(key, task.id)

heartbeat_1 = conn.zscore(queue_key, task.id)
queue_lock_1 = conn.zrange(queue_lock_key, 0, -1, withscores=True)[0][
1
]
task_lock_1 = conn.pttl(task_lock_key)

time.sleep(DELAY / 2)
heartbeat_2 = conn.zscore(key, task.id)

heartbeat_2 = conn.zscore(queue_key, task.id)
queue_lock_2 = conn.zrange(queue_lock_key, 0, -1, withscores=True)[0][
1
]
task_lock_2 = conn.pttl(task_lock_key)

assert heartbeat_2 > heartbeat_1 > 0
assert queue_lock_2 > queue_lock_1 > 0

# Active task update timeout is 2 * DELAY and we renew every DELAY / 2.
assert task_lock_1 > DELAY
assert task_lock_2 > DELAY

worker.kill()
Loading