Skip to content

Commit

Permalink
Merge branch 'master' into DOG-4448-file-extractor-simplify-extractio…
Browse files Browse the repository at this point in the history
…n-failure-analysis
  • Loading branch information
nithinb authored Nov 26, 2024
2 parents 353f99c + 88f370e commit 73d8304
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 5 deletions.
7 changes: 6 additions & 1 deletion cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,10 @@ class LogConsoleHandlerConfig(ConfigModel):
LogHandlerConfig = Union[LogFileHandlerConfig, LogConsoleHandlerConfig]


# Mypy BS
def _log_handler_default() -> List[Union[LogFileHandlerConfig, LogConsoleHandlerConfig]]:
return [LogConsoleHandlerConfig(level=LogLevel.INFO)]


class ExtractorConfig(ConfigModel):
log_handlers: List[LogHandlerConfig] = Field(default_factory=lambda: [LogConsoleHandlerConfig(level=LogLevel.INFO)])
log_handlers: List[LogHandlerConfig] = Field(default_factory=_log_handler_default)
10 changes: 10 additions & 0 deletions cognite/extractorutils/unstable/core/_dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ class TaskUpdate(CogniteModel):
type: Literal["started"] | Literal["ended"]
name: str
timestamp: int


class Error(CogniteModel):
external_id: str
level: str
description: str
details: str | None
start_time: int
end_time: int | None
task: str | None
74 changes: 72 additions & 2 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar, Token
from multiprocessing import Queue
from threading import RLock, Thread
from traceback import format_exception
from types import TracebackType
from typing import Generic, Literal, Optional, Type, TypeVar, Union

Expand All @@ -10,8 +12,10 @@

from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.core._dto import Error as DtoError
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task
from cognite.extractorutils.unstable.scheduling import TaskScheduler
from cognite.extractorutils.util import now
Expand Down Expand Up @@ -50,9 +54,12 @@ def __init__(

self._tasks: list[Task] = []
self._task_updates: list[TaskUpdate] = []
self._errors: dict[str, Error] = {}

self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")

self._current_task: ContextVar[str | None] = ContextVar("current_task", default=None)

def _set_runtime_message_queue(self, queue: Queue) -> None:
self._runtime_messages = queue

Expand All @@ -61,11 +68,26 @@ def _checkin(self) -> None:
task_updates = [t.model_dump() for t in self._task_updates]
self._task_updates.clear()

error_updates = [
DtoError(
external_id=e.external_id,
level=e.level.value,
description=e.description,
details=e.details,
start_time=e.start_time,
end_time=e.end_time,
task=e._task_name if e._task_name is not None else None,
).model_dump()
for e in self._errors.values()
]
self._errors.clear()

res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin",
json={
"externalId": self.connection_config.extraction_pipeline,
"taskEvents": task_updates,
"errors": error_updates,
},
headers={"cdf-version": "alpha"},
)
Expand All @@ -83,13 +105,35 @@ def _run_checkin(self) -> None:
self.logger.exception("Error during checkin")
self.cancellation_token.wait(10)

def _report_error(self, error: Error) -> None:
with self._checkin_lock:
self._errors[error.external_id] = error

def error(
self,
level: ErrorLevel,
description: str,
details: str | None = None,
*,
force_global: bool = False,
) -> Error:
task_name = self._current_task.get()

return Error(
level=level,
description=description,
details=details,
extractor=self,
task_name=None if force_global else task_name,
)

def restart(self) -> None:
if self._runtime_messages:
self._runtime_messages.put(RuntimeMessage.RESTART)
self.cancellation_token.cancel()

@classmethod
def init_from_runtime(
def _init_from_runtime(
cls,
connection_config: ConnectionConfig,
application_config: ConfigType,
Expand All @@ -98,18 +142,44 @@ def init_from_runtime(
return cls(connection_config, application_config, current_config_revision)

def add_task(self, task: Task) -> None:
# Store this for later, since we'll override it with the wrapped version
target = task.target

def wrapped() -> None:
"""
A wrapped version of the task's target, with tracking and error handling
"""
# Record a task start
with self._checkin_lock:
self._task_updates.append(
TaskUpdate(type="started", name=task.name, timestamp=now()),
)

context_token: Token[str | None] | None = None

try:
# Set the current task context var, used to track that we're in a task for error reporting
context_token = self._current_task.set(task.name)

# Run task
target()

except Exception as e:
# Task crashed, record it as a fatal error
self.error(
ErrorLevel.fatal,
description="Task crashed unexpectedly",
details="".join(format_exception(e)),
).instant()

raise e

finally:
# Unset the current task
if context_token is not None:
self._current_task.reset(context_token)

# Record task end
with self._checkin_lock:
self._task_updates.append(
TaskUpdate(type="ended", name=task.name, timestamp=now()),
Expand Down Expand Up @@ -186,7 +256,7 @@ def run(self) -> None:
case _:
assert_never(task)

self.logger.info("Starting up extractor")
self.logger.info("Starting extractor")
if startup:
with ThreadPoolExecutor() as pool:
for task in startup:
Expand Down
70 changes: 70 additions & 0 deletions cognite/extractorutils/unstable/core/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import typing
from enum import Enum
from types import TracebackType
from uuid import uuid4

from cognite.extractorutils.util import now

if typing.TYPE_CHECKING:
from .base import Extractor


class ErrorLevel(Enum):
warning = "warning"
error = "error"
fatal = "fatal"


class Error:
def __init__(
self,
level: ErrorLevel,
description: str,
details: str | None,
task_name: str | None,
extractor: "Extractor",
) -> None:
self.level = level
self.description = description
self.details = details

self.external_id = str(uuid4())
self.start_time = now()
self.end_time: int | None = None

self._extractor = extractor
self._task_name = task_name

self._extractor._report_error(self)

def instant(self) -> None:
# Only end the error once
if self.end_time is not None:
return

self.end_time = self.start_time

# Re-add in case the error has already been reported and dict cleared
self._extractor._report_error(self)

def finish(self) -> None:
# Only end the error once
if self.end_time is not None:
return

self.end_time = now()

# Re-add in case the error has already been reported and dict cleared
self._extractor._report_error(self)

def __enter__(self) -> "Error":
return self

def __exit__(
self,
exc_type: typing.Type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
self.finish()
return exc_val is None
2 changes: 1 addition & 1 deletion cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _inner_run(
current_config_revision: ConfigRevision,
) -> None:
# This code is run inside the new extractor process
extractor = self._extractor_class.init_from_runtime(
extractor = self._extractor_class._init_from_runtime(
connection_config,
application_config,
current_config_revision,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ experimental = ["cognite-sdk-experimental"]

[tool.poetry.group.dev.dependencies]
mypy = "1.13.0"
ruff = "^0.7.0"
ruff = "^0.8.0"
pytest = "^8.0.0"
pytest-cov = "^6.0.0"
sphinx = "^7.0.0"
Expand Down
Loading

0 comments on commit 73d8304

Please sign in to comment.