Skip to content

Commit

Permalink
Merge branch 'master' into setup-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mathialo authored Dec 2, 2024
2 parents 2bd6b25 + 658fa18 commit 65fcadd
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 75 deletions.
3 changes: 3 additions & 0 deletions cognite/extractorutils/configtools/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def ignore_unknown(self, node: yaml.Node) -> None:
SafeLoaderIgnoreUnknown.add_constructor(None, SafeLoaderIgnoreUnknown.ignore_unknown) # type: ignore
initial_load = yaml.load(source, Loader=SafeLoaderIgnoreUnknown) # noqa: S506

if not isinstance(initial_load, dict):
raise InvalidConfigError("The root node of the YAML document must be an object")

if not isinstance(source, str):
source.seek(0)

Expand Down
24 changes: 24 additions & 0 deletions cognite/extractorutils/unstable/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import List, Optional


class InvalidConfigError(Exception):
"""
Exception thrown from ``load_yaml`` and ``load_yaml_dict`` if config file is invalid. This can be due to
* Missing fields
* Incompatible types
* Unkown fields
"""

def __init__(self, message: str, details: Optional[List[str]] = None):
super(InvalidConfigError, self).__init__()
self.message = message
self.details = details

self.attempted_revision: int | None = None

def __str__(self) -> str:
return f"Invalid config: {self.message}"

def __repr__(self) -> str:
return self.__str__()
22 changes: 15 additions & 7 deletions cognite/extractorutils/unstable/configuration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from cognite.client import CogniteClient
from cognite.extractorutils.configtools.loaders import _load_yaml_dict_raw
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.exceptions import InvalidConfigError as OldInvalidConfigError
from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError
from cognite.extractorutils.unstable.configuration.models import ConfigModel

_T = TypeVar("_T", bound=ConfigModel)
Expand Down Expand Up @@ -44,7 +45,17 @@ def load_from_cdf(
)
response.raise_for_status()
data = response.json()
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]

try:
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]

except InvalidConfigError as e:
e.attempted_revision = data["revision"]
raise e
except OldInvalidConfigError as e:
new_e = InvalidConfigError(e.message)
new_e.attempted_revision = data["revision"]
raise new_e from e


def load_io(stream: TextIO, format: ConfigFormat, schema: Type[_T]) -> _T:
Expand Down Expand Up @@ -100,12 +111,9 @@ def load_dict(data: dict, schema: Type[_T]) -> _T:
if "ctx" in err and "error" in err["ctx"]:
exc = err["ctx"]["error"]
if isinstance(exc, ValueError) or isinstance(exc, AssertionError):
messages.append(f"{loc_str}: {str(exc)}")
messages.append(f"{str(exc)}: {loc_str}")
continue

if err.get("type") == "json_invalid":
messages.append(f"{err.get('msg')}: {loc_str}")
else:
messages.append(f"{loc_str}: {err.get('msg')}")
messages.append(f"{err.get('msg')}: {loc_str}")

raise InvalidConfigError(", ".join(messages), details=messages) from e
2 changes: 1 addition & 1 deletion cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ConnectionConfig(ConfigModel):
project: str
base_url: str

extraction_pipeline: str
integration: str

authentication: AuthenticationConfig

Expand Down
46 changes: 27 additions & 19 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@
ConfigRevision = Union[Literal["local"], int]


_T = TypeVar("_T", bound=ExtractorConfig)


class FullConfig(Generic[_T]):
def __init__(
self,
connection_config: ConnectionConfig,
application_config: _T,
current_config_revision: ConfigRevision,
newest_config_revision: ConfigRevision,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.newest_config_revision = newest_config_revision


class Extractor(Generic[ConfigType]):
NAME: str
EXTERNAL_ID: str
Expand All @@ -44,18 +61,14 @@ class Extractor(Generic[ConfigType]):

RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES

def __init__(
self,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> None:
def __init__(self, config: FullConfig[ConfigType]) -> None:
self.cancellation_token = CancellationToken()
self.cancellation_token.cancel_on_interrupt()

self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision = current_config_revision
self.connection_config = config.connection_config
self.application_config = config.application_config
self.current_config_revision = config.current_config_revision
self.newest_config_revision = config.newest_config_revision

self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}")

Expand Down Expand Up @@ -145,7 +158,7 @@ def _checkin(self) -> None:
res = self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin",
json={
"externalId": self.connection_config.extraction_pipeline,
"externalId": self.connection_config.integration,
"taskEvents": task_updates,
"errors": error_updates,
},
Expand All @@ -156,7 +169,7 @@ def _checkin(self) -> None:
if (
new_config_revision
and self.current_config_revision != "local"
and new_config_revision != self.current_config_revision
and new_config_revision > self.newest_config_revision
):
self.restart()

Expand Down Expand Up @@ -198,13 +211,8 @@ def restart(self) -> None:
self.cancellation_token.cancel()

@classmethod
def _init_from_runtime(
cls,
connection_config: ConnectionConfig,
application_config: ConfigType,
current_config_revision: ConfigRevision,
) -> Self:
return cls(connection_config, application_config, current_config_revision)
def _init_from_runtime(cls, config: FullConfig[ConfigType]) -> Self:
return cls(config)

def add_task(self, task: Task) -> None:
# Store this for later, since we'll override it with the wrapped version
Expand Down Expand Up @@ -264,7 +272,7 @@ def _report_extractor_info(self) -> None:
self.cognite_client.post(
f"/api/v1/projects/{self.cognite_client.config.project}/odin/extractorinfo",
json={
"externalId": self.connection_config.extraction_pipeline,
"externalId": self.connection_config.integration,
"activeConfigRevision": self.current_config_revision,
"extractor": {
"version": self.VERSION,
Expand Down
Loading

0 comments on commit 65fcadd

Please sign in to comment.