Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to exit TaskTiger after a certain amount of time #324

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def run_worker(
max_workers_per_queue: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor_class: Optional[Type[Executor]] = None,
exit_after: Optional[datetime.timedelta] = None,
) -> None:
"""
Main worker entry point method.
Expand All @@ -423,7 +424,7 @@ def run_worker(
store_tracebacks=store_tracebacks,
executor_class=executor_class,
)
worker.run()
worker.run(exit_after=exit_after)
except Exception:
self.log.exception("Unhandled exception")
raise
Expand Down Expand Up @@ -698,6 +699,11 @@ def would_process_configured_queue(self, queue_name: str) -> bool:
"--executor",
help="Task executor. Possible values are sync or fork (default).",
)
@click.option(
"--exit-after",
type=click.INT,
help="Exit TaskTiger after the time in minutes has elapsed.",
)
@click.pass_context
def run_worker(
context: Any,
Expand All @@ -711,23 +717,32 @@ def run_worker(
max_workers_per_queue: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor: Optional[str] = "fork",
exit_after: Optional[int] = None,
) -> None:
conn = redis.Redis(
host, int(port or 6379), int(db or 0), password, decode_responses=True
)
tiger = context.obj or TaskTiger(setup_structlog=True, connection=conn)

executor_class: Type[Executor]
if not executor or executor == "fork":
executor_class = ForkExecutor
elif executor == "sync":
executor_class = SyncExecutor
else:
raise click.ClickException("Invalid executor.")

if exit_after:
exit_after_td = datetime.timedelta(minutes=exit_after)
else:
exit_after_td = None

tiger.run_worker(
queues=queues,
module=module,
exclude_queues=exclude_queues,
max_workers_per_queue=max_workers_per_queue,
store_tracebacks=store_tracebacks,
executor_class=executor_class,
exit_after=exit_after_td,
)
31 changes: 27 additions & 4 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import hashlib
import json
import os
Expand Down Expand Up @@ -959,13 +960,21 @@ def store_task_execution(self, tasks: List[Task], execution: Dict) -> None:

pipeline.execute()

def run(self, once: bool = False, force_once: bool = False) -> None:
def run(
self,
once: bool = False,
force_once: bool = False,
exit_after: Optional[datetime.timedelta] = None,
) -> None:
"""
Main loop of the worker.

Use once=True to execute any queued tasks and then exit.
Use force_once=True with once=True to always exit after one processing
loop even if tasks remain queued.
Args:
once: If True, execute any queued tasks and then exit.
force_once: If set to True together with once, always exit after
one processing loop even if tasks remain queued.
exit_after: If set, exit the worker after the given duration
elapses.
"""

self.log.info(
Expand All @@ -976,8 +985,16 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
single_worker_queues=sorted(self.single_worker_queues),
max_workers=self.max_workers_per_queue,
executor=self.executor.__class__.__name__,
exit_after=str(exit_after) if exit_after else None,
)

if exit_after:
exit_after_dt = (
datetime.datetime.now(datetime.timezone.utc) + exit_after
)
else:
exit_after_dt = None

if not self.scripts.can_replicate_commands:
# Older Redis versions may create additional overhead when
# executing pipelines.
Expand Down Expand Up @@ -1022,6 +1039,12 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
self._uninstall_signal_handlers()
if once and (not self._queue_set or force_once):
break
if (
exit_after_dt
and datetime.datetime.now(datetime.timezone.utc)
> exit_after_dt
):
break
if self._stop_requested:
raise KeyboardInterrupt()
except KeyboardInterrupt:
Expand Down
Loading