Skip to content

Commit

Permalink
Merge branch 'master' into DOG-4448-file-extractor-simplify-extractio…
Browse files Browse the repository at this point in the history
…n-failure-analysis
  • Loading branch information
nithinb authored Dec 3, 2024
2 parents 73d8304 + 5820486 commit c90f42f
Show file tree
Hide file tree
Showing 16 changed files with 457 additions and 92 deletions.
11 changes: 11 additions & 0 deletions cognite/extractorutils/_inner_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ def _resolve_log_level(level: str) -> int:
return {"NOTSET": 0, "DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, "CRITICAL": 50}[level.upper()]


def resolve_log_level_for_httpx(level: str) -> str:
return {
None: "WARNING",
"INFO": "WARNING",
"WARNING": "WARNING",
"ERROR": "ERROR",
"CRITICAL": "CRITICAL",
"DEBUG": "DEBUG",
}.get(level, "WARNING")


class _DecimalEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Dict[str, str]:
if isinstance(obj, Decimal):
Expand Down
9 changes: 9 additions & 0 deletions cognite/extractorutils/configtools/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
OAuthClientCredentials,
)
from cognite.client.data_classes import Asset, DataSet, ExtractionPipeline
from cognite.extractorutils._inner_util import resolve_log_level_for_httpx
from cognite.extractorutils.configtools._util import _load_certificate_data
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.metrics import (
Expand Down Expand Up @@ -491,6 +492,14 @@ def setup_logging(self, suppress_console: bool = False) -> None:
if root.getEffectiveLevel() > file_handler.level:
root.setLevel(file_handler.level)

log_level = logging.getLevelName(root.getEffectiveLevel())
httpx_log_level = resolve_log_level_for_httpx(log_level)
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(httpx_log_level)

http_core_logger = logging.getLogger("httpcore")
http_core_logger.setLevel(httpx_log_level)


@dataclass
class _PushGatewayConfig:
Expand Down
3 changes: 3 additions & 0 deletions cognite/extractorutils/configtools/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def ignore_unknown(self, node: yaml.Node) -> None:
SafeLoaderIgnoreUnknown.add_constructor(None, SafeLoaderIgnoreUnknown.ignore_unknown) # type: ignore
initial_load = yaml.load(source, Loader=SafeLoaderIgnoreUnknown) # noqa: S506

if not isinstance(initial_load, dict):
raise InvalidConfigError("The root node of the YAML document must be an object")

if not isinstance(source, str):
source.seek(0)

Expand Down
24 changes: 24 additions & 0 deletions cognite/extractorutils/unstable/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import List, Optional


class InvalidConfigError(Exception):
"""
Exception thrown from ``load_yaml`` and ``load_yaml_dict`` if config file is invalid. This can be due to
* Missing fields
* Incompatible types
* Unkown fields
"""

def __init__(self, message: str, details: Optional[List[str]] = None):
super(InvalidConfigError, self).__init__()
self.message = message
self.details = details

self.attempted_revision: int | None = None

def __str__(self) -> str:
return f"Invalid config: {self.message}"

def __repr__(self) -> str:
return self.__str__()
22 changes: 15 additions & 7 deletions cognite/extractorutils/unstable/configuration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from cognite.client import CogniteClient
from cognite.extractorutils.configtools.loaders import _load_yaml_dict_raw
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.exceptions import InvalidConfigError as OldInvalidConfigError
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
from cognite.extractorutils.unstable.configuration.models import ConfigModel

_T = TypeVar("_T", bound=ConfigModel)
Expand Down Expand Up @@ -44,7 +45,17 @@ def load_from_cdf(
)
response.raise_for_status()
data = response.json()
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]

try:
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]

except InvalidConfigError as e:
e.attempted_revision = data["revision"]
raise e
except OldInvalidConfigError as e:
new_e = InvalidConfigError(e.message)
new_e.attempted_revision = data["revision"]
raise new_e from e


def load_io(stream: TextIO, format: ConfigFormat, schema: Type[_T]) -> _T:
Expand Down Expand Up @@ -100,12 +111,9 @@ def load_dict(data: dict, schema: Type[_T]) -> _T:
if "ctx" in err and "error" in err["ctx"]:
exc = err["ctx"]["error"]
if isinstance(exc, ValueError) or isinstance(exc, AssertionError):
messages.append(f"{loc_str}: {str(exc)}")
messages.append(f"{str(exc)}: {loc_str}")
continue

if err.get("type") == "json_invalid":
messages.append(f"{err.get('msg')}: {loc_str}")
else:
messages.append(f"{loc_str}: {err.get('msg')}")
messages.append(f"{err.get('msg')}: {loc_str}")

raise InvalidConfigError(", ".join(messages), details=messages) from e
10 changes: 6 additions & 4 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ConnectionConfig(ConfigModel):
project: str
base_url: str

extraction_pipeline: str
integration: str

authentication: AuthenticationConfig

Expand Down Expand Up @@ -223,21 +223,23 @@ class LogLevel(Enum):


class LogFileHandlerConfig(ConfigModel):
type: Literal["file"]
path: Path
level: LogLevel
retention: int = 7


class LogConsoleHandlerConfig(ConfigModel):
type: Literal["console"]
level: LogLevel


LogHandlerConfig = Union[LogFileHandlerConfig, LogConsoleHandlerConfig]
LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")]


# Mypy BS
def _log_handler_default() -> List[Union[LogFileHandlerConfig, LogConsoleHandlerConfig]]:
return [LogConsoleHandlerConfig(level=LogLevel.INFO)]
def _log_handler_default() -> List[LogHandlerConfig]:
return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)]


class ExtractorConfig(ConfigModel):
Expand Down
124 changes: 101 additions & 23 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import logging.config
import time
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar, Token
from logging.handlers import TimedRotatingFileHandler
from multiprocessing import Queue
from threading import RLock, Thread
from traceback import format_exception
Expand All @@ -10,12 +13,19 @@
from humps import pascalize
from typing_extensions import Self, assert_never

from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
)
from cognite.extractorutils.unstable.core._dto import Error as DtoError
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.restart_policy import WHEN_CONTINUOUS_TASKS_CRASHES, RestartPolicy
from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task
from cognite.extractorutils.unstable.scheduling import TaskScheduler
from cognite.extractorutils.util import now
Expand All @@ -24,6 +34,23 @@
ConfigRevision = Union[Literal["local"], int]


_T = TypeVar("_T", bound=ExtractorConfig)


class FullConfig(Generic[_T]):
def __init__(
self,
connection_config: ConnectionConfig,
application_config: _T,
current_config_revision: ConfigRevision,
newest_config_revision: ConfigRevision,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.newest_config_revision = newest_config_revision


class Extractor(Generic[ConfigType]):
NAME: str
EXTERNAL_ID: str
Expand All @@ -32,18 +59,16 @@ class Extractor(Generic[ConfigType]):

CONFIG_TYPE: Type[ConfigType]

def __init__(
self,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> None:
RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES

def __init__(self, config: FullConfig[ConfigType]) -> None:
self.cancellation_token = CancellationToken()
self.cancellation_token.cancel_on_interrupt()

self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.connection_config = config.connection_config
self.application_config = config.application_config
self.current_config_revision = config.current_config_revision
self.newest_config_revision = config.newest_config_revision

self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}")

Expand All @@ -60,6 +85,54 @@ def __init__(

self._current_task: ContextVar[str | None] = ContextVar("current_task", default=None)

self.__init_tasks__()

def _setup_logging(self) -> None:
min_level = min([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers])
max_level = max([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers])

root = logging.getLogger()
root.setLevel(min_level)

# The oathlib logs too much on debug level, including secrets
logging.getLogger("requests_oauthlib.oauth2_session").setLevel(max(max_level, logging.INFO))

fmt = logging.Formatter(
"%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(process)d %(threadName)s - %(message)s",
"%Y-%m-%d %H:%M:%S",
)
# Set logging to UTC
fmt.converter = time.gmtime

# Remove any previous logging handlers
for handler in root.handlers:
root.removeHandler(handler)

# Define new handlers
for handler_config in self.application_config.log_handlers:
match handler_config:
case LogConsoleHandlerConfig() as console_handler:
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(_resolve_log_level(console_handler.level.value))

root.addHandler(sh)

case LogFileHandlerConfig() as file_handler:
fh = TimedRotatingFileHandler(
filename=file_handler.path,
when="midnight",
utc=True,
backupCount=file_handler.retention,
)
fh.setLevel(_resolve_log_level(file_handler.level.value))
fh.setFormatter(fmt)

root.addHandler(fh)

def __init_tasks__(self) -> None:
pass

def _set_runtime_message_queue(self, queue: Queue) -> None:
self._runtime_messages = queue

Expand All @@ -85,15 +158,19 @@ def _checkin(self) -> None:
res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin",
json={
"externalId": self.connection_config.extraction_pipeline,
"externalId": self.connection_config.integration,
"taskEvents": task_updates,
"errors": error_updates,
},
headers={"cdf-version": "alpha"},
)
new_config_revision = res.json().get("lastConfigRevision")

if new_config_revision and new_config_revision != self.current_config_revision:
if (
new_config_revision
and self.current_config_revision != "local"
and new_config_revision > self.newest_config_revision
):
self.restart()

def _run_checkin(self) -> None:
Expand Down Expand Up @@ -128,24 +205,20 @@ def error(
)

def restart(self) -> None:
self.logger.info("Restarting extractor")
if self._runtime_messages:
self._runtime_messages.put(RuntimeMessage.RESTART)
self.cancellation_token.cancel()

@classmethod
def _init_from_runtime(
cls,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> Self:
return cls(connection_config, application_config, current_config_revision)
def _init_from_runtime(cls, config: FullConfig[ConfigType]) -> Self:
return cls(config)

def add_task(self, task: Task) -> None:
# Store this for later, since we'll override it with the wrapped version
target = task.target

def wrapped() -> None:
def run_task() -> None:
"""
A wrapped version of the task's target, with tracking and error handling
"""
Expand All @@ -165,14 +238,17 @@ def wrapped() -> None:
target()

except Exception as e:
self.logger.exception(f"Unexpected error in {task.name}")

# Task crashed, record it as a fatal error
self.error(
ErrorLevel.fatal,
description="Task crashed unexpectedly",
details="".join(format_exception(e)),
).instant()

raise e
if self.__class__.RESTART_POLICY(task, e):
self.restart()

finally:
# Unset the current task
Expand All @@ -185,7 +261,7 @@ def wrapped() -> None:
TaskUpdate(type="ended", name=task.name, timestamp=now()),
)

task.target = wrapped
task.target = run_task
self._tasks.append(task)

match task:
Expand All @@ -196,7 +272,7 @@ def _report_extractor_info(self) -> None:
self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/extractorinfo",
json={
"externalId": self.connection_config.extraction_pipeline,
"externalId": self.connection_config.integration,
"activeConfigRevision": self.current_config_revision,
"extractor": {
"version": self.VERSION,
Expand All @@ -214,6 +290,7 @@ def _report_extractor_info(self) -> None:
)

def start(self) -> None:
self._setup_logging()
self._report_extractor_info()
Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()

Expand All @@ -234,6 +311,7 @@ def __exit__(
with self._checkin_lock:
self._checkin()

self.logger.info("Shutting down extractor")
return exc_val is None

def run(self) -> None:
Expand Down
Loading

0 comments on commit c90f42f

Please sign in to comment.