diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 7c2f54ea..95e65050 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -16,6 +16,7 @@ from cognite.extractorutils.unstable.core._dto import TaskUpdate from cognite.extractorutils.unstable.core._messaging import RuntimeMessage from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel +from cognite.extractorutils.unstable.core.restart_policy import WHEN_CONTINUOUS_TASKS_CRASHES, RestartPolicy from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task from cognite.extractorutils.unstable.scheduling import TaskScheduler from cognite.extractorutils.util import now @@ -32,6 +33,8 @@ class Extractor(Generic[ConfigType]): CONFIG_TYPE: Type[ConfigType] + RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES + def __init__( self, connection_config: ConnectionConfig, @@ -133,6 +136,7 @@ def error( ) def restart(self) -> None: + self.logger.info("Restarting extractor") if self._runtime_messages: self._runtime_messages.put(RuntimeMessage.RESTART) self.cancellation_token.cancel() @@ -150,7 +154,7 @@ def add_task(self, task: Task) -> None: # Store this for later, since we'll override it with the wrapped version target = task.target - def wrapped() -> None: + def run_task() -> None: """ A wrapped version of the task's target, with tracking and error handling """ @@ -170,6 +174,8 @@ def wrapped() -> None: target() except Exception as e: + self.logger.exception(f"Unexpected error in {task.name}") + # Task crashed, record it as a fatal error self.error( ErrorLevel.fatal, @@ -177,7 +183,8 @@ def wrapped() -> None: details="".join(format_exception(e)), ).instant() - raise e + if self.__class__.RESTART_POLICY(task, e): + self.restart() finally: # Unset the current task @@ -190,7 +197,7 @@ def wrapped() -> None: TaskUpdate(type="ended", name=task.name, timestamp=now()), ) - task.target = wrapped + task.target = run_task self._tasks.append(task) match task: diff --git a/cognite/extractorutils/unstable/core/restart_policy.py b/cognite/extractorutils/unstable/core/restart_policy.py new file mode 100644 index 00000000..a3f1942c --- /dev/null +++ b/cognite/extractorutils/unstable/core/restart_policy.py @@ -0,0 +1,29 @@ +from typing import Callable + +from cognite.extractorutils.unstable.core.tasks import ContinuousTask, Task + +RestartPolicy = Callable[[Task, Exception], bool] + + +def _false(_task: Task, _exception: Exception) -> bool: + return False + + +def _true(_task: Task, _exception: Exception) -> bool: + return True + + +def _is_continuous(task: Task, _exception: Exception) -> bool: + return isinstance(task, ContinuousTask) + + +NEVER = _false +WHEN_CONTINUOUS_TASKS_CRASHES = _is_continuous +WHEN_ANY_TASK_CRASHES = _true + +__all__ = [ + "RestartPolicy", + "NEVER", + "WHEN_CONTINUOUS_TASKS_CRASHES", + "WHEN_ANY_TASK_CRASHES", +] diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index 67395335..bd9779a1 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -65,7 +65,7 @@ def _create_argparser(self) -> ArgumentParser: def _setup_logging(self) -> None: # TODO: Figure out file logging for runtime fmt = logging.Formatter( - "%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(threadName)s - %(message)s", + "%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(process)d %(threadName)s - %(message)s", "%Y-%m-%d %H:%M:%S", ) # Set logging to UTC diff --git a/cognite/extractorutils/unstable/scheduling/_scheduler.py b/cognite/extractorutils/unstable/scheduling/_scheduler.py index 4c64d016..be675b4e 100644 --- a/cognite/extractorutils/unstable/scheduling/_scheduler.py +++ b/cognite/extractorutils/unstable/scheduling/_scheduler.py @@ -71,7 +71,7 @@ def wrap() -> None: with self._running_lock: self._running.remove(job) - Thread(target=wrap, name=f"Run{pascalize(job.name)}").start() + Thread(target=wrap, name=f"{pascalize(job.name)}").start() return True def trigger(self, name: str) -> bool: