Skip to content

Commit

Permalink
Option to exit TaskTiger after a certain amount of time (#324)
Browse files Browse the repository at this point in the history
This can be useful to avoid memory leaks when using a non-forking executor.
  • Loading branch information
thomasst authored Feb 28, 2024
1 parent 8bffe61 commit eb7ed31
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
17 changes: 16 additions & 1 deletion tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def run_worker(
max_workers_per_queue: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor_class: Optional[Type[Executor]] = None,
exit_after: Optional[datetime.timedelta] = None,
) -> None:
"""
Main worker entry point method.
Expand All @@ -423,7 +424,7 @@ def run_worker(
store_tracebacks=store_tracebacks,
executor_class=executor_class,
)
worker.run()
worker.run(exit_after=exit_after)
except Exception:
self.log.exception("Unhandled exception")
raise
Expand Down Expand Up @@ -698,6 +699,11 @@ def would_process_configured_queue(self, queue_name: str) -> bool:
"--executor",
help="Task executor. Possible values are sync or fork (default).",
)
@click.option(
"--exit-after",
type=click.INT,
help="Exit TaskTiger after the time in minutes has elapsed.",
)
@click.pass_context
def run_worker(
context: Any,
Expand All @@ -711,23 +717,32 @@ def run_worker(
max_workers_per_queue: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
executor: Optional[str] = "fork",
exit_after: Optional[int] = None,
) -> None:
conn = redis.Redis(
host, int(port or 6379), int(db or 0), password, decode_responses=True
)
tiger = context.obj or TaskTiger(setup_structlog=True, connection=conn)

executor_class: Type[Executor]
if not executor or executor == "fork":
executor_class = ForkExecutor
elif executor == "sync":
executor_class = SyncExecutor
else:
raise click.ClickException("Invalid executor.")

if exit_after:
exit_after_td = datetime.timedelta(minutes=exit_after)
else:
exit_after_td = None

tiger.run_worker(
queues=queues,
module=module,
exclude_queues=exclude_queues,
max_workers_per_queue=max_workers_per_queue,
store_tracebacks=store_tracebacks,
executor_class=executor_class,
exit_after=exit_after_td,
)
31 changes: 27 additions & 4 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import hashlib
import json
import os
Expand Down Expand Up @@ -971,13 +972,21 @@ def store_task_execution(self, tasks: List[Task], execution: Dict) -> None:

pipeline.execute()

def run(self, once: bool = False, force_once: bool = False) -> None:
def run(
self,
once: bool = False,
force_once: bool = False,
exit_after: Optional[datetime.timedelta] = None,
) -> None:
"""
Main loop of the worker.
Use once=True to execute any queued tasks and then exit.
Use force_once=True with once=True to always exit after one processing
loop even if tasks remain queued.
Args:
once: If True, execute any queued tasks and then exit.
force_once: If set to True together with once, always exit after
one processing loop even if tasks remain queued.
exit_after: If set, exit the worker after the given duration
elapses.
"""

self.log.info(
Expand All @@ -988,8 +997,16 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
single_worker_queues=sorted(self.single_worker_queues),
max_workers=self.max_workers_per_queue,
executor=self.executor.__class__.__name__,
exit_after=str(exit_after) if exit_after else None,
)

if exit_after:
exit_after_dt = (
datetime.datetime.now(datetime.timezone.utc) + exit_after
)
else:
exit_after_dt = None

if not self.scripts.can_replicate_commands:
# Older Redis versions may create additional overhead when
# executing pipelines.
Expand Down Expand Up @@ -1034,6 +1051,12 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
self._uninstall_signal_handlers()
if once and (not self._queue_set or force_once):
break
if (
exit_after_dt
and datetime.datetime.now(datetime.timezone.utc)
> exit_after_dt
):
break
if self._stop_requested:
raise KeyboardInterrupt()
except KeyboardInterrupt:
Expand Down

0 comments on commit eb7ed31

Please sign in to comment.