Skip to content

Commit

Permalink
Merge branch 'master' into all
Browse files Browse the repository at this point in the history
  • Loading branch information
mathialo authored Dec 2, 2024
2 parents b74bb4c + 444bff5 commit 6a42d50
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 79 deletions.
3 changes: 3 additions & 0 deletions cognite/extractorutils/configtools/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def ignore_unknown(self, node: yaml.Node) -> None:
SafeLoaderIgnoreUnknown.add_constructor(None, SafeLoaderIgnoreUnknown.ignore_unknown) # type: ignore
initial_load = yaml.load(source, Loader=SafeLoaderIgnoreUnknown) # noqa: S506

if not isinstance(initial_load, dict):
raise InvalidConfigError("The root node of the YAML document must be an object")

if not isinstance(source, str):
source.seek(0)

Expand Down
24 changes: 24 additions & 0 deletions cognite/extractorutils/unstable/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import List, Optional


class InvalidConfigError(Exception):
"""
Exception thrown from ``load_yaml`` and ``load_yaml_dict`` if config file is invalid. This can be due to
* Missing fields
* Incompatible types
* Unkown fields
"""

def __init__(self, message: str, details: Optional[List[str]] = None):
super(InvalidConfigError, self).__init__()
self.message = message
self.details = details

self.attempted_revision: int | None = None

def __str__(self) -> str:
return f"Invalid config: {self.message}"

def __repr__(self) -> str:
return self.__str__()
22 changes: 15 additions & 7 deletions cognite/extractorutils/unstable/configuration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from cognite.client import CogniteClient
from cognite.extractorutils.configtools.loaders import _load_yaml_dict_raw
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.exceptions import InvalidConfigError as OldInvalidConfigError
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
from cognite.extractorutils.unstable.configuration.models import ConfigModel

__all__ = ["ConfigFormat", "load_file", "load_from_cdf", "load_io", "load_dict"]
Expand Down Expand Up @@ -47,7 +48,17 @@ def load_from_cdf(
)
response.raise_for_status()
data = response.json()
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]

try:
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]

except InvalidConfigError as e:
e.attempted_revision = data["revision"]
raise e
except OldInvalidConfigError as e:
new_e = InvalidConfigError(e.message)
new_e.attempted_revision = data["revision"]
raise new_e from e


def load_io(stream: TextIO, format: ConfigFormat, schema: Type[_T]) -> _T:
Expand Down Expand Up @@ -103,12 +114,9 @@ def load_dict(data: dict, schema: Type[_T]) -> _T:
if "ctx" in err and "error" in err["ctx"]:
exc = err["ctx"]["error"]
if isinstance(exc, ValueError) or isinstance(exc, AssertionError):
messages.append(f"{loc_str}: {str(exc)}")
messages.append(f"{str(exc)}: {loc_str}")
continue

if err.get("type") == "json_invalid":
messages.append(f"{err.get('msg')}: {loc_str}")
else:
messages.append(f"{loc_str}: {err.get('msg')}")
messages.append(f"{err.get('msg')}: {loc_str}")

raise InvalidConfigError(", ".join(messages), details=messages) from e
10 changes: 6 additions & 4 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class ConnectionConfig(ConfigModel):
project: str
base_url: str

extraction_pipeline: str
integration: str

authentication: AuthenticationConfig

Expand Down Expand Up @@ -238,21 +238,23 @@ class LogLevel(Enum):


class LogFileHandlerConfig(ConfigModel):
type: Literal["file"]
path: Path
level: LogLevel
retention: int = 7


class LogConsoleHandlerConfig(ConfigModel):
type: Literal["console"]
level: LogLevel


LogHandlerConfig = Union[LogFileHandlerConfig, LogConsoleHandlerConfig]
LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")]


# Mypy BS
def _log_handler_default() -> List[Union[LogFileHandlerConfig, LogConsoleHandlerConfig]]:
return [LogConsoleHandlerConfig(level=LogLevel.INFO)]
def _log_handler_default() -> List[LogHandlerConfig]:
return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)]


class ExtractorConfig(ConfigModel):
Expand Down
102 changes: 82 additions & 20 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import logging.config
import time
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar, Token
from logging.handlers import TimedRotatingFileHandler
from multiprocessing import Queue
from threading import RLock, Thread
from traceback import format_exception
Expand All @@ -10,8 +13,14 @@
from humps import pascalize
from typing_extensions import Self, assert_never

from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, ExtractorConfig
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
)
from cognite.extractorutils.unstable.core._dto import Error as DtoError
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
Expand All @@ -27,6 +36,23 @@
ConfigRevision = Union[Literal["local"], int]


_T = TypeVar("_T", bound=ExtractorConfig)


class FullConfig(Generic[_T]):
def __init__(
self,
connection_config: ConnectionConfig,
application_config: _T,
current_config_revision: ConfigRevision,
newest_config_revision: ConfigRevision,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.newest_config_revision = newest_config_revision


class Extractor(Generic[ConfigType]):
NAME: str
EXTERNAL_ID: str
Expand All @@ -37,18 +63,14 @@ class Extractor(Generic[ConfigType]):

RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES

def __init__(
self,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> None:
def __init__(self, config: FullConfig[ConfigType]) -> None:
self.cancellation_token = CancellationToken()
self.cancellation_token.cancel_on_interrupt()

self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.connection_config = config.connection_config
self.application_config = config.application_config
self.current_config_revision = config.current_config_revision
self.newest_config_revision = config.newest_config_revision

self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}")

Expand All @@ -67,6 +89,49 @@ def __init__(

self.__init_tasks__()

def _setup_logging(self) -> None:
min_level = min([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers])
max_level = max([_resolve_log_level(h.level.value) for h in self.application_config.log_handlers])

root = logging.getLogger()
root.setLevel(min_level)

# The oathlib logs too much on debug level, including secrets
logging.getLogger("requests_oauthlib.oauth2_session").setLevel(max(max_level, logging.INFO))

fmt = logging.Formatter(
"%(asctime)s.%(msecs)03d UTC [%(levelname)-8s] %(process)d %(threadName)s - %(message)s",
"%Y-%m-%d %H:%M:%S",
)
# Set logging to UTC
fmt.converter = time.gmtime

# Remove any previous logging handlers
for handler in root.handlers:
root.removeHandler(handler)

# Define new handlers
for handler_config in self.application_config.log_handlers:
match handler_config:
case LogConsoleHandlerConfig() as console_handler:
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(_resolve_log_level(console_handler.level.value))

root.addHandler(sh)

case LogFileHandlerConfig() as file_handler:
fh = TimedRotatingFileHandler(
filename=file_handler.path,
when="midnight",
utc=True,
backupCount=file_handler.retention,
)
fh.setLevel(_resolve_log_level(file_handler.level.value))
fh.setFormatter(fmt)

root.addHandler(fh)

def __init_tasks__(self) -> None:
pass

Expand Down Expand Up @@ -95,7 +160,7 @@ def _checkin(self) -> None:
res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin",
json={
"externalId": self.connection_config.extraction_pipeline,
"externalId": self.connection_config.integration,
"taskEvents": task_updates,
"errors": error_updates,
},
Expand All @@ -106,7 +171,7 @@ def _checkin(self) -> None:
if (
new_config_revision
and self.current_config_revision != "local"
and new_config_revision != self.current_config_revision
and new_config_revision > self.newest_config_revision
):
self.restart()

Expand Down Expand Up @@ -148,13 +213,8 @@ def restart(self) -> None:
self.cancellation_token.cancel()

@classmethod
def _init_from_runtime(
cls,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> Self:
return cls(connection_config, application_config, current_config_revision)
def _init_from_runtime(cls, config: FullConfig[ConfigType]) -> Self:
return cls(config)

def add_task(self, task: Task) -> None:
# Store this for later, since we'll override it with the wrapped version
Expand Down Expand Up @@ -214,7 +274,7 @@ def _report_extractor_info(self) -> None:
self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/extractorinfo",
json={
"externalId": self.connection_config.extraction_pipeline,
"externalId": self.connection_config.integration,
"activeConfigRevision": self.current_config_revision,
"extractor": {
"version": self.VERSION,
Expand All @@ -232,6 +292,7 @@ def _report_extractor_info(self) -> None:
)

def start(self) -> None:
self._setup_logging()
self._report_extractor_info()
Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()

Expand All @@ -252,6 +313,7 @@ def __exit__(
with self._checkin_lock:
self._checkin()

self.logger.info("Shutting down extractor")
return exc_val is None

def run(self) -> None:
Expand Down
Loading

0 comments on commit 6a42d50

Please sign in to comment.