From 5ebc1f5c1cce3522d8909dbfdea8be1f7cc49b01 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 25 Nov 2024 07:47:22 +0000 Subject: [PATCH 1/2] chore(deps): update dependency ruff to ^0.8.0 (#395) * chore(deps): update dependency ruff to ^0.8.0 * MYPY BS --------- Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: Einar Omang --- cognite/extractorutils/unstable/configuration/models.py | 7 ++++++- pyproject.toml | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cognite/extractorutils/unstable/configuration/models.py b/cognite/extractorutils/unstable/configuration/models.py index dc1ffe2d..e97ab3a2 100644 --- a/cognite/extractorutils/unstable/configuration/models.py +++ b/cognite/extractorutils/unstable/configuration/models.py @@ -235,5 +235,10 @@ class LogConsoleHandlerConfig(ConfigModel): LogHandlerConfig = Union[LogFileHandlerConfig, LogConsoleHandlerConfig] +# Mypy BS +def _log_handler_default() -> List[Union[LogFileHandlerConfig, LogConsoleHandlerConfig]]: + return [LogConsoleHandlerConfig(level=LogLevel.INFO)] + + class ExtractorConfig(ConfigModel): - log_handlers: List[LogHandlerConfig] = Field(default_factory=lambda: [LogConsoleHandlerConfig(level=LogLevel.INFO)]) + log_handlers: List[LogHandlerConfig] = Field(default_factory=_log_handler_default) diff --git a/pyproject.toml b/pyproject.toml index c6cf6f4c..544be394 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,7 @@ experimental = ["cognite-sdk-experimental"] [tool.poetry.group.dev.dependencies] mypy = "1.13.0" -ruff = "^0.7.0" +ruff = "^0.8.0" pytest = "^8.0.0" pytest-cov = "^6.0.0" sphinx = "^7.0.0" From 88f370e190ecad3a8bdc3bc769f6a79443d14bbe Mon Sep 17 00:00:00 2001 From: Mathias Lohne Date: Tue, 26 Nov 2024 11:01:47 +0100 Subject: [PATCH 2/2] Add error reporting to integrations POC (#396) Add an `error` method to the `Extractor` class that starts tracking of an error. This can be used in one of two ways, either by manually starting and stopping the error state: ``` python e = extractor.error(...) # handle error e.finish() ``` or by using it as a context ``` python with extractor.error(...): # Handle error ``` You can create an instant error (with no duration) by using the `instant()` method: ``` python extractor.error(...).instant() ``` Tracking of start/end times, generating and keeping track of external IDs, reporting in checkins, and so on is all handled automatically. --- cognite/extractorutils/unstable/core/_dto.py | 10 + cognite/extractorutils/unstable/core/base.py | 74 ++++++- .../extractorutils/unstable/core/errors.py | 70 +++++++ .../extractorutils/unstable/core/runtime.py | 2 +- tests/test_unstable/test_errors.py | 192 ++++++++++++++++++ 5 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 cognite/extractorutils/unstable/core/errors.py create mode 100644 tests/test_unstable/test_errors.py diff --git a/cognite/extractorutils/unstable/core/_dto.py b/cognite/extractorutils/unstable/core/_dto.py index 8d605043..8ea66919 100644 --- a/cognite/extractorutils/unstable/core/_dto.py +++ b/cognite/extractorutils/unstable/core/_dto.py @@ -32,3 +32,13 @@ class TaskUpdate(CogniteModel): type: Literal["started"] | Literal["ended"] name: str timestamp: int + + +class Error(CogniteModel): + external_id: str + level: str + description: str + details: str | None + start_time: int + end_time: int | None + task: str | None diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 323ea62e..2c06b64f 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -1,7 +1,9 @@ import logging from concurrent.futures import ThreadPoolExecutor +from contextvars import ContextVar, Token from multiprocessing import Queue from threading import RLock, Thread +from traceback import format_exception from types import TracebackType from typing import Generic, Literal, Optional, Type, TypeVar, Union @@ -10,8 +12,10 @@ from cognite.extractorutils.threading import CancellationToken from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig +from cognite.extractorutils.unstable.core._dto import Error as DtoError from cognite.extractorutils.unstable.core._dto import TaskUpdate from cognite.extractorutils.unstable.core._messaging import RuntimeMessage +from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task from cognite.extractorutils.unstable.scheduling import TaskScheduler from cognite.extractorutils.util import now @@ -50,9 +54,12 @@ def __init__( self._tasks: list[Task] = [] self._task_updates: list[TaskUpdate] = [] + self._errors: dict[str, Error] = {} self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") + self._current_task: ContextVar[str | None] = ContextVar("current_task", default=None) + def _set_runtime_message_queue(self, queue: Queue) -> None: self._runtime_messages = queue @@ -61,11 +68,26 @@ def _checkin(self) -> None: task_updates = [t.model_dump() for t in self._task_updates] self._task_updates.clear() + error_updates = [ + DtoError( + external_id=e.external_id, + level=e.level.value, + description=e.description, + details=e.details, + start_time=e.start_time, + end_time=e.end_time, + task=e._task_name if e._task_name is not None else None, + ).model_dump() + for e in self._errors.values() + ] + self._errors.clear() + res = self.cognite_client.post( f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin", json={ "externalId": self.connection_config.extraction_pipeline, "taskEvents": task_updates, + "errors": error_updates, }, headers={"cdf-version": "alpha"}, ) @@ -83,13 +105,35 @@ def _run_checkin(self) -> None: self.logger.exception("Error during checkin") self.cancellation_token.wait(10) + def _report_error(self, error: Error) -> None: + with self._checkin_lock: + self._errors[error.external_id] = error + + def error( + self, + level: ErrorLevel, + description: str, + details: str | None = None, + *, + force_global: bool = False, + ) -> Error: + task_name = self._current_task.get() + + return Error( + level=level, + description=description, + details=details, + extractor=self, + task_name=None if force_global else task_name, + ) + def restart(self) -> None: if self._runtime_messages: self._runtime_messages.put(RuntimeMessage.RESTART) self.cancellation_token.cancel() @classmethod - def init_from_runtime( + def _init_from_runtime( cls, connection_config: ConnectionConfig, application_config: ConfigType, @@ -98,18 +142,44 @@ def init_from_runtime( return cls(connection_config, application_config, current_config_revision) def add_task(self, task: Task) -> None: + # Store this for later, since we'll override it with the wrapped version target = task.target def wrapped() -> None: + """ + A wrapped version of the task's target, with tracking and error handling + """ + # Record a task start with self._checkin_lock: self._task_updates.append( TaskUpdate(type="started", name=task.name, timestamp=now()), ) + context_token: Token[str | None] | None = None + try: + # Set the current task context var, used to track that we're in a task for error reporting + context_token = self._current_task.set(task.name) + + # Run task target() + except Exception as e: + # Task crashed, record it as a fatal error + self.error( + ErrorLevel.fatal, + description="Task crashed unexpectedly", + details="".join(format_exception(e)), + ).instant() + + raise e + finally: + # Unset the current task + if context_token is not None: + self._current_task.reset(context_token) + + # Record task end with self._checkin_lock: self._task_updates.append( TaskUpdate(type="ended", name=task.name, timestamp=now()), @@ -186,7 +256,7 @@ def run(self) -> None: case _: assert_never(task) - self.logger.info("Starting up extractor") + self.logger.info("Starting extractor") if startup: with ThreadPoolExecutor() as pool: for task in startup: diff --git a/cognite/extractorutils/unstable/core/errors.py b/cognite/extractorutils/unstable/core/errors.py new file mode 100644 index 00000000..2df0172a --- /dev/null +++ b/cognite/extractorutils/unstable/core/errors.py @@ -0,0 +1,70 @@ +import typing +from enum import Enum +from types import TracebackType +from uuid import uuid4 + +from cognite.extractorutils.util import now + +if typing.TYPE_CHECKING: + from .base import Extractor + + +class ErrorLevel(Enum): + warning = "warning" + error = "error" + fatal = "fatal" + + +class Error: + def __init__( + self, + level: ErrorLevel, + description: str, + details: str | None, + task_name: str | None, + extractor: "Extractor", + ) -> None: + self.level = level + self.description = description + self.details = details + + self.external_id = str(uuid4()) + self.start_time = now() + self.end_time: int | None = None + + self._extractor = extractor + self._task_name = task_name + + self._extractor._report_error(self) + + def instant(self) -> None: + # Only end the error once + if self.end_time is not None: + return + + self.end_time = self.start_time + + # Re-add in case the error has already been reported and dict cleared + self._extractor._report_error(self) + + def finish(self) -> None: + # Only end the error once + if self.end_time is not None: + return + + self.end_time = now() + + # Re-add in case the error has already been reported and dict cleared + self._extractor._report_error(self) + + def __enter__(self) -> "Error": + return self + + def __exit__( + self, + exc_type: typing.Type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool: + self.finish() + return exc_val is None diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index c6621a86..67395335 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -87,7 +87,7 @@ def _inner_run( current_config_revision: ConfigRevision, ) -> None: # This code is run inside the new extractor process - extractor = self._extractor_class.init_from_runtime( + extractor = self._extractor_class._init_from_runtime( connection_config, application_config, current_config_revision, diff --git a/tests/test_unstable/test_errors.py b/tests/test_unstable/test_errors.py new file mode 100644 index 00000000..23bc5f25 --- /dev/null +++ b/tests/test_unstable/test_errors.py @@ -0,0 +1,192 @@ +from time import sleep + +import pytest + +from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, IntervalConfig, TimeIntervalConfig +from cognite.extractorutils.unstable.core.errors import ErrorLevel +from cognite.extractorutils.unstable.core.tasks import ScheduledTask +from tests.test_unstable.conftest import TestConfig, TestExtractor + + +def test_global_error( + connection_config: ConnectionConfig, + application_config: TestConfig, +) -> None: + extractor = TestExtractor( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + ) + + err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") + + assert len(extractor._errors) == 1 + assert err.external_id in extractor._errors + + wait_time = 50 + sleep(wait_time / 1000) + + err.finish() + + slack = 5 + + assert len(extractor._errors) == 1 + assert err.start_time + wait_time - slack < err.end_time < err.start_time + wait_time + slack + assert extractor._errors[err.external_id].end_time == err.end_time + assert err._task_name is None + + +def test_instant_error( + connection_config: ConnectionConfig, + application_config: TestConfig, +) -> None: + extractor = TestExtractor( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + ) + + err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") + + assert len(extractor._errors) == 1 + assert err.external_id in extractor._errors + + sleep(0.05) + + err.instant() + + assert len(extractor._errors) == 1 + assert err.end_time == err.start_time + assert extractor._errors[err.external_id].end_time == err.end_time + assert err._task_name is None + + +def test_task_error( + connection_config: ConnectionConfig, + application_config: TestConfig, +) -> None: + extractor = TestExtractor( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + ) + + def task() -> None: + sleep(0.05) + extractor.error(level=ErrorLevel.warning, description="Hey now").instant() + sleep(0.05) + + extractor.add_task( + ScheduledTask( + "TestTask", + target=task, + schedule=IntervalConfig( + type="interval", + expression=TimeIntervalConfig("15m"), + ), + ) + ) + + extractor._report_extractor_info() + extractor._scheduler.trigger("TestTask") + + sleep(0.3) + + assert len(extractor._task_updates) == 2 + assert len(extractor._errors) == 1 + + error = list(extractor._errors.values())[0] + assert error.description == "Hey now" + assert error.level == ErrorLevel.warning + + # Make sure error was recorded as a task error + assert error._task_name == "TestTask" + + +def test_crashing_task( + connection_config: ConnectionConfig, + application_config: TestConfig, +) -> None: + extractor = TestExtractor( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + ) + + def task() -> None: + sleep(0.05) + raise ValueError("Try catching this!") + + extractor.add_task( + ScheduledTask( + "TestTask", + target=task, + schedule=IntervalConfig( + type="interval", + expression=TimeIntervalConfig("15m"), + ), + ) + ) + + extractor._report_extractor_info() + extractor._scheduler.trigger("TestTask") + + sleep(0.3) + + assert len(extractor._task_updates) == 2 + assert len(extractor._errors) == 1 + + error = list(extractor._errors.values())[0] + assert error.description == "Task crashed unexpectedly" + assert error.level == ErrorLevel.fatal + + # Make sure error was recorded as a task error + assert error._task_name == "TestTask" + + +@pytest.mark.parametrize("checkin_between", [True, False]) +def test_reporting_errors( + connection_config: ConnectionConfig, + application_config: TestConfig, + checkin_between: bool, +) -> None: + extractor = TestExtractor( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + ) + + err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") + + assert len(extractor._errors) == 1 + assert err.external_id in extractor._errors + + if checkin_between: + extractor._checkin() + + res = extractor.cognite_client.get( + f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/errors?extpipe={connection_config.extraction_pipeline}", + headers={"cdf-version": "alpha"}, + ).json()["items"] + assert len(res) == 1 + + assert res[0]["externalId"] == err.external_id + assert res[0]["startTime"] == err.start_time + assert res[0]["description"] == err.description + assert "endTime" not in res[0] + + sleep(0.05) + + err.finish() + + extractor._checkin() + + res = extractor.cognite_client.get( + f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/errors?extpipe={connection_config.extraction_pipeline}", + headers={"cdf-version": "alpha"}, + ).json()["items"] + assert len(res) == 1 + assert res[0]["externalId"] == err.external_id + assert res[0]["startTime"] == err.start_time + assert res[0]["endTime"] == err.end_time + assert res[0]["description"] == err.description