diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 38a0734..ceb261a 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -399,6 +399,7 @@ def run_worker( max_workers_per_queue: Optional[int] = None, store_tracebacks: Optional[bool] = None, executor_class: Optional[Type[Executor]] = None, + exit_after: Optional[datetime.timedelta] = None, ) -> None: """ Main worker entry point method. @@ -423,7 +424,7 @@ def run_worker( store_tracebacks=store_tracebacks, executor_class=executor_class, ) - worker.run() + worker.run(exit_after=exit_after) except Exception: self.log.exception("Unhandled exception") raise @@ -698,6 +699,11 @@ def would_process_configured_queue(self, queue_name: str) -> bool: "--executor", help="Task executor. Possible values are sync or fork (default).", ) +@click.option( + "--exit-after", + type=click.INT, + help="Exit TaskTiger after the time in minutes has elapsed.", +) @click.pass_context def run_worker( context: Any, @@ -711,11 +717,13 @@ def run_worker( max_workers_per_queue: Optional[int] = None, store_tracebacks: Optional[bool] = None, executor: Optional[str] = "fork", + exit_after: Optional[int] = None, ) -> None: conn = redis.Redis( host, int(port or 6379), int(db or 0), password, decode_responses=True ) tiger = context.obj or TaskTiger(setup_structlog=True, connection=conn) + executor_class: Type[Executor] if not executor or executor == "fork": executor_class = ForkExecutor @@ -723,6 +731,12 @@ def run_worker( executor_class = SyncExecutor else: raise click.ClickException("Invalid executor.") + + if exit_after: + exit_after_td = datetime.timedelta(minutes=exit_after) + else: + exit_after_td = None + tiger.run_worker( queues=queues, module=module, @@ -730,4 +744,5 @@ def run_worker( max_workers_per_queue=max_workers_per_queue, store_tracebacks=store_tracebacks, executor_class=executor_class, + exit_after=exit_after_td, ) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 45d6f32..6c1c49c 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -1,3 +1,4 @@ +import datetime import hashlib import json import os @@ -971,13 +972,21 @@ def store_task_execution(self, tasks: List[Task], execution: Dict) -> None: pipeline.execute() - def run(self, once: bool = False, force_once: bool = False) -> None: + def run( + self, + once: bool = False, + force_once: bool = False, + exit_after: Optional[datetime.timedelta] = None, + ) -> None: """ Main loop of the worker. - Use once=True to execute any queued tasks and then exit. - Use force_once=True with once=True to always exit after one processing - loop even if tasks remain queued. + Args: + once: If True, execute any queued tasks and then exit. + force_once: If set to True together with once, always exit after + one processing loop even if tasks remain queued. + exit_after: If set, exit the worker after the given duration + elapses. """ self.log.info( @@ -988,8 +997,16 @@ def run(self, once: bool = False, force_once: bool = False) -> None: single_worker_queues=sorted(self.single_worker_queues), max_workers=self.max_workers_per_queue, executor=self.executor.__class__.__name__, + exit_after=str(exit_after) if exit_after else None, ) + if exit_after: + exit_after_dt = ( + datetime.datetime.now(datetime.timezone.utc) + exit_after + ) + else: + exit_after_dt = None + if not self.scripts.can_replicate_commands: # Older Redis versions may create additional overhead when # executing pipelines. @@ -1034,6 +1051,12 @@ def run(self, once: bool = False, force_once: bool = False) -> None: self._uninstall_signal_handlers() if once and (not self._queue_set or force_once): break + if ( + exit_after_dt + and datetime.datetime.now(datetime.timezone.utc) + > exit_after_dt + ): + break if self._stop_requested: raise KeyboardInterrupt() except KeyboardInterrupt: