Skip to content

Commit

Permalink
Improve exception handling in tasks (#398)
Browse files Browse the repository at this point in the history
Log exception with stack trace, and optionally trigger a restart when
something crashes.
  • Loading branch information
mathialo authored Nov 27, 2024
1 parent 1ce72a2 commit 1162d24
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
13 changes: 10 additions & 3 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.restart_policy import WHEN_CONTINUOUS_TASKS_CRASHES, RestartPolicy
from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task
from cognite.extractorutils.unstable.scheduling import TaskScheduler
from cognite.extractorutils.util import now
Expand All @@ -32,6 +33,8 @@ class Extractor(Generic[ConfigType]):

CONFIG_TYPE: Type[ConfigType]

RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES

def __init__(
self,
connection_config: ConnectionConfig,
Expand Down Expand Up @@ -133,6 +136,7 @@ def error(
)

def restart(self) -> None:
self.logger.info("Restarting extractor")
if self._runtime_messages:
self._runtime_messages.put(RuntimeMessage.RESTART)
self.cancellation_token.cancel()
Expand All @@ -150,7 +154,7 @@ def add_task(self, task: Task) -> None:
# Store this for later, since we'll override it with the wrapped version
target = task.target

def wrapped() -> None:
def run_task() -> None:
"""
A wrapped version of the task's target, with tracking and error handling
"""
Expand All @@ -170,14 +174,17 @@ def wrapped() -> None:
target()

except Exception as e:
self.logger.exception(f"Unexpected error in {task.name}")

# Task crashed, record it as a fatal error
self.error(
ErrorLevel.fatal,
description="Task crashed unexpectedly",
details="".join(format_exception(e)),
).instant()

raise e
if self.__class__.RESTART_POLICY(task, e):
self.restart()

finally:
# Unset the current task
Expand All @@ -190,7 +197,7 @@ def wrapped() -> None:
TaskUpdate(type="ended", name=task.name, timestamp=now()),
)

task.target = wrapped
task.target = run_task
self._tasks.append(task)

match task:
Expand Down
29 changes: 29 additions & 0 deletions cognite/extractorutils/unstable/core/restart_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Callable

from cognite.extractorutils.unstable.core.tasks import ContinuousTask, Task

RestartPolicy = Callable[[Task, Exception], bool]


def _false(_task: Task, _exception: Exception) -> bool:
return False


def _true(_task: Task, _exception: Exception) -> bool:
return True


def _is_continuous(task: Task, _exception: Exception) -> bool:
return isinstance(task, ContinuousTask)


NEVER = _false
WHEN_CONTINUOUS_TASKS_CRASHES = _is_continuous
WHEN_ANY_TASK_CRASHES = _true

__all__ = [
"RestartPolicy",
"NEVER",
"WHEN_CONTINUOUS_TASKS_CRASHES",
"WHEN_ANY_TASK_CRASHES",
]
2 changes: 1 addition & 1 deletion cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _create_argparser(self) -> ArgumentParser:
def _setup_logging(self) -> None:
# TODO: Figure out file logging for runtime
fmt = logging.Formatter(
"%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(threadName)s - %(message)s",
"%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(process)d %(threadName)s - %(message)s",
"%Y-%m-%d %H:%M:%S",
)
# Set logging to UTC
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/unstable/scheduling/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def wrap() -> None:
with self._running_lock:
self._running.remove(job)

Thread(target=wrap, name=f"Run{pascalize(job.name)}").start()
Thread(target=wrap, name=f"{pascalize(job.name)}").start()
return True

def trigger(self, name: str) -> bool:
Expand Down

0 comments on commit 1162d24

Please sign in to comment.