diff --git a/cognite/extractorutils/configtools/loaders.py b/cognite/extractorutils/configtools/loaders.py index 5bf5d2b..8e0cc1e 100644 --- a/cognite/extractorutils/configtools/loaders.py +++ b/cognite/extractorutils/configtools/loaders.py @@ -163,6 +163,9 @@ def ignore_unknown(self, node: yaml.Node) -> None: SafeLoaderIgnoreUnknown.add_constructor(None, SafeLoaderIgnoreUnknown.ignore_unknown) # type: ignore initial_load = yaml.load(source, Loader=SafeLoaderIgnoreUnknown) # noqa: S506 + if not isinstance(initial_load, dict): + raise InvalidConfigError("The root node of the YAML document must be an object") + if not isinstance(source, str): source.seek(0) diff --git a/cognite/extractorutils/unstable/configuration/exceptions.py b/cognite/extractorutils/unstable/configuration/exceptions.py new file mode 100644 index 0000000..7d05ab0 --- /dev/null +++ b/cognite/extractorutils/unstable/configuration/exceptions.py @@ -0,0 +1,24 @@ +from typing import List, Optional + + +class InvalidConfigError(Exception): + """ + Exception thrown from ``load_yaml`` and ``load_yaml_dict`` if config file is invalid. This can be due to + + * Missing fields + * Incompatible types + * Unkown fields + """ + + def __init__(self, message: str, details: Optional[List[str]] = None): + super(InvalidConfigError, self).__init__() + self.message = message + self.details = details + + self.attempted_revision: int | None = None + + def __str__(self) -> str: + return f"Invalid config: {self.message}" + + def __repr__(self) -> str: + return self.__str__() diff --git a/cognite/extractorutils/unstable/configuration/loaders.py b/cognite/extractorutils/unstable/configuration/loaders.py index 0b15899..e7c3f0a 100644 --- a/cognite/extractorutils/unstable/configuration/loaders.py +++ b/cognite/extractorutils/unstable/configuration/loaders.py @@ -8,7 +8,8 @@ from cognite.client import CogniteClient from cognite.extractorutils.configtools.loaders import _load_yaml_dict_raw -from cognite.extractorutils.exceptions import InvalidConfigError +from cognite.extractorutils.exceptions import InvalidConfigError as OldInvalidConfigError +from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError from cognite.extractorutils.unstable.configuration.models import ConfigModel _T = TypeVar("_T", bound=ConfigModel) @@ -44,7 +45,17 @@ def load_from_cdf( ) response.raise_for_status() data = response.json() - return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"] + + try: + return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"] + + except InvalidConfigError as e: + e.attempted_revision = data["revision"] + raise e + except OldInvalidConfigError as e: + new_e = InvalidConfigError(e.message) + new_e.attempted_revision = data["revision"] + raise new_e from e def load_io(stream: TextIO, format: ConfigFormat, schema: Type[_T]) -> _T: @@ -100,12 +111,9 @@ def load_dict(data: dict, schema: Type[_T]) -> _T: if "ctx" in err and "error" in err["ctx"]: exc = err["ctx"]["error"] if isinstance(exc, ValueError) or isinstance(exc, AssertionError): - messages.append(f"{loc_str}: {str(exc)}") + messages.append(f"{str(exc)}: {loc_str}") continue - if err.get("type") == "json_invalid": - messages.append(f"{err.get('msg')}: {loc_str}") - else: - messages.append(f"{loc_str}: {err.get('msg')}") + messages.append(f"{err.get('msg')}: {loc_str}") raise InvalidConfigError(", ".join(messages), details=messages) from e diff --git a/cognite/extractorutils/unstable/configuration/models.py b/cognite/extractorutils/unstable/configuration/models.py index 33f9fa0..8791ebd 100644 --- a/cognite/extractorutils/unstable/configuration/models.py +++ b/cognite/extractorutils/unstable/configuration/models.py @@ -139,7 +139,7 @@ class ConnectionConfig(ConfigModel): project: str base_url: str - extraction_pipeline: str + integration: str authentication: AuthenticationConfig diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 2f94316..1d71d29 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -34,6 +34,23 @@ ConfigRevision = Union[Literal["local"], int] +_T = TypeVar("_T", bound=ExtractorConfig) + + +class FullConfig(Generic[_T]): + def __init__( + self, + connection_config: ConnectionConfig, + application_config: _T, + current_config_revision: ConfigRevision, + newest_config_revision: ConfigRevision, + ) -> None: + self.connection_config = connection_config + self.application_config = application_config + self.current_config_revision = current_config_revision + self.newest_config_revision = newest_config_revision + + class Extractor(Generic[ConfigType]): NAME: str EXTERNAL_ID: str @@ -44,18 +61,14 @@ class Extractor(Generic[ConfigType]): RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES - def __init__( - self, - connection_config: ConnectionConfig, - application_config: ConfigType, - current_config_revision: ConfigRevision, - ) -> None: + def __init__(self, config: FullConfig[ConfigType]) -> None: self.cancellation_token = CancellationToken() self.cancellation_token.cancel_on_interrupt() - self.connection_config = connection_config - self.application_config = application_config - self.current_config_revision = current_config_revision + self.connection_config = config.connection_config + self.application_config = config.application_config + self.current_config_revision = config.current_config_revision + self.newest_config_revision = config.newest_config_revision self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") @@ -145,7 +158,7 @@ def _checkin(self) -> None: res = self.cognite_client.post( f"/api/v1/projects/{self.cognite_client.config.project}/odin/checkin", json={ - "externalId": self.connection_config.extraction_pipeline, + "externalId": self.connection_config.integration, "taskEvents": task_updates, "errors": error_updates, }, @@ -156,7 +169,7 @@ def _checkin(self) -> None: if ( new_config_revision and self.current_config_revision != "local" - and new_config_revision != self.current_config_revision + and new_config_revision > self.newest_config_revision ): self.restart() @@ -198,13 +211,8 @@ def restart(self) -> None: self.cancellation_token.cancel() @classmethod - def _init_from_runtime( - cls, - connection_config: ConnectionConfig, - application_config: ConfigType, - current_config_revision: ConfigRevision, - ) -> Self: - return cls(connection_config, application_config, current_config_revision) + def _init_from_runtime(cls, config: FullConfig[ConfigType]) -> Self: + return cls(config) def add_task(self, task: Task) -> None: # Store this for later, since we'll override it with the wrapped version @@ -264,7 +272,7 @@ def _report_extractor_info(self) -> None: self.cognite_client.post( f"/api/v1/projects/{self.cognite_client.config.project}/odin/extractorinfo", json={ - "externalId": self.connection_config.extraction_pipeline, + "externalId": self.connection_config.integration, "activeConfigRevision": self.current_config_revision, "extractor": { "version": self.VERSION, diff --git a/cognite/extractorutils/unstable/core/runtime.py b/cognite/extractorutils/unstable/core/runtime.py index bd9779a..732c8f6 100644 --- a/cognite/extractorutils/unstable/core/runtime.py +++ b/cognite/extractorutils/unstable/core/runtime.py @@ -6,15 +6,21 @@ from multiprocessing import Process, Queue from pathlib import Path from typing import Any, Generic, Type, TypeVar +from uuid import uuid4 +from requests.exceptions import ConnectionError from typing_extensions import assert_never +from cognite.client.exceptions import CogniteAPIError, CogniteAuthError, CogniteConnectionError from cognite.extractorutils.threading import CancellationToken +from cognite.extractorutils.unstable.configuration.exceptions import InvalidConfigError from cognite.extractorutils.unstable.configuration.loaders import load_file, load_from_cdf from cognite.extractorutils.unstable.configuration.models import ConnectionConfig +from cognite.extractorutils.unstable.core._dto import Error +from cognite.extractorutils.util import now from ._messaging import RuntimeMessage -from .base import ConfigRevision, ConfigType, Extractor +from .base import ConfigRevision, ConfigType, Extractor, FullConfig ExtractorType = TypeVar("ExtractorType", bound=Extractor) @@ -59,6 +65,11 @@ def _create_argparser(self) -> ArgumentParser: default=None, help="Include to use a local application configuration instead of fetching it from CDF", ) + argparser.add_argument( + "--skip-init-checks", + action="store_true", + help="Skip any checks during startup. Useful for debugging, not recommended for production deployments.", + ) return argparser @@ -82,16 +93,10 @@ def _setup_logging(self) -> None: def _inner_run( self, message_queue: Queue, - connection_config: ConnectionConfig, - application_config: ConfigType, - current_config_revision: ConfigRevision, + config: FullConfig, ) -> None: # This code is run inside the new extractor process - extractor = self._extractor_class._init_from_runtime( - connection_config, - application_config, - current_config_revision, - ) + extractor = self._extractor_class._init_from_runtime(config) extractor._set_runtime_message_queue(message_queue) try: @@ -104,56 +109,193 @@ def _inner_run( def _spawn_extractor( self, - connection_config: ConnectionConfig, - application_config: ConfigType, - current_config_revision: ConfigRevision, + config: FullConfig, ) -> Process: self._message_queue = Queue() process = Process( target=self._inner_run, - args=(self._message_queue, connection_config, application_config, current_config_revision), + args=(self._message_queue, config), ) process.start() - self.logger.info(f"Started extractor as {process.pid}") + self.logger.info(f"Started extractor with PID {process.pid}") return process def _get_application_config( self, args: Namespace, connection_config: ConnectionConfig, - ) -> tuple[ConfigType, ConfigRevision]: + ) -> tuple[ConfigType, ConfigRevision, ConfigRevision]: current_config_revision: ConfigRevision + newest_config_revision: ConfigRevision + if args.local_override: + self.logger.info("Loading local application config") + current_config_revision = "local" - application_config = load_file(args.local_override[0], self._extractor_class.CONFIG_TYPE) + newest_config_revision = "local" + try: + application_config = load_file(args.local_override[0], self._extractor_class.CONFIG_TYPE) + except InvalidConfigError as e: + self.logger.critical(str(e)) + raise e + except FileNotFoundError as e: + self.logger.critical(str(e)) + raise InvalidConfigError(str(e)) from e + else: + self.logger.info("Loading application config from CDF") client = connection_config.get_cognite_client( f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}" ) - application_config, current_config_revision = load_from_cdf( - client, - connection_config.extraction_pipeline, - self._extractor_class.CONFIG_TYPE, + + errors: list[Error] = [] + + revision: int | None = None + try: + while True: + try: + application_config, current_config_revision = load_from_cdf( + client, + connection_config.integration, + self._extractor_class.CONFIG_TYPE, + revision=revision, + ) + break + + except InvalidConfigError as e: + if e.attempted_revision is None: + # Should never happen, attempted_revision is set in every handler in load_from_cdf, but it's + # needed for type checks to pass + raise e + + self.logger.error(f"Revision {e.attempted_revision} is invalid: {e.message}") + + t = now() + errors.append( + Error( + external_id=str(uuid4()), + level="error", + description=f"Revision {e.attempted_revision} is invalid", + details=e.message, + start_time=t, + end_time=t, + task=None, + ) + ) + + if revision is None: + revision = e.attempted_revision - 1 + newest_config_revision = e.attempted_revision + else: + revision -= 1 + + if revision > 0: + self.logger.info(f"Falling back to revision {revision}") + else: + self.logger.critical("No more revisions to fall back to") + raise e + + finally: + if errors: + client.post( + f"/api/v1/projects/{client.config.project}/odin/checkin", + json={ + "externalId": connection_config.integration, + "errors": [e.model_dump() for e in errors], + }, + headers={"cdf-version": "alpha"}, + ) + + return application_config, current_config_revision, newest_config_revision + + def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool: + client = connection_config.get_cognite_client( + f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}" + ) + try: + client.post( + f"/api/v1/projects/{client.config.project}/odin/checkin", + json={ + "externalId": connection_config.integration, + }, + headers={"cdf-version": "alpha"}, ) - return application_config, current_config_revision + except CogniteConnectionError as e: + if e.__cause__ is not None: + self.logger.error(str(e.__cause__)) + self.logger.critical("Could not connect to CDF. Please check your configuration.") + return False + + except CogniteAuthError as e: + # Error while fetching auth token + self.logger.error(str(e)) + self.logger.critical("Could not get an access token. Please check your configuration.") + return False + + except CogniteAPIError as e: + # Error response from the CDF API + if e.code == 401: + self.logger.critical( + "Got a 401 error from CDF. Please check your configuration. " + "Make sure the credentials and project is correct." + ) + + elif e.message: + self.logger.critical(str(e.message)) + + else: + self.logger.critical(f"Error while connecting to CDF {str(e)}") + + return False + + except ConnectionError as e: + # This is sometime thrown, I've seen it when trying to get an auth token but it might happen elsewhere too + self.logger.error(str(e)) + self.logger.critical("Could not initiate connection. Please check your configuration.") + return False + + return True def run(self) -> None: argparser = self._create_argparser() args = argparser.parse_args() - self.logger.info(f"Started runtime as {os.getpid()}") + self.logger.info(f"Started runtime with PID {os.getpid()}") + + try: + connection_config = load_file(args.connection_config[0], ConnectionConfig) + except InvalidConfigError as e: + self.logger.error(str(e)) + self.logger.critical("Could not load connection config") + sys.exit(1) - connection_config = load_file(args.connection_config[0], ConnectionConfig) + if not args.skip_init_checks and not self._verify_connection_config(connection_config): + sys.exit(1) # This has to be Any. We don't know the type of the extractors' config at type checking since the sel doesn't # exist yet, and I have not found a way to represent it in a generic way that isn't just an Any in disguise. application_config: Any while not self._cancellation_token.is_cancelled: - application_config, current_config_revision = self._get_application_config(args, connection_config) + try: + application_config, current_config_revision, newest_config_revision = self._get_application_config( + args, connection_config + ) + + except InvalidConfigError: + self.logger.critical("Could not get a valid application config file. Shutting down") + sys.exit(1) + # Start extractor in separate process, and wait for it to end - process = self._spawn_extractor(connection_config, application_config, current_config_revision) + process = self._spawn_extractor( + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=current_config_revision, + newest_config_revision=newest_config_revision, + ) + ) process.join() # Check if we are asked to restart the extractor, shut down otherwise diff --git a/tests/test_unstable/conftest.py b/tests/test_unstable/conftest.py index aa48acd..7c752db 100644 --- a/tests/test_unstable/conftest.py +++ b/tests/test_unstable/conftest.py @@ -73,7 +73,7 @@ def connection_config(extraction_pipeline: str) -> ConnectionConfig: return ConnectionConfig( project=os.environ["COGNITE_DEV_PROJECT"], base_url=os.environ["COGNITE_DEV_BASE_URL"], - extraction_pipeline=extraction_pipeline, + integration=extraction_pipeline, authentication=_ClientCredentialsConfig( type="client-credentials", client_id=os.environ.get("COGNITE_DEV_CLIENT_ID", os.environ["COGNITE_CLIENT_ID"]), diff --git a/tests/test_unstable/test_base.py b/tests/test_unstable/test_base.py index bf14250..9af0aac 100644 --- a/tests/test_unstable/test_base.py +++ b/tests/test_unstable/test_base.py @@ -3,6 +3,7 @@ import pytest from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, IntervalConfig, TimeIntervalConfig +from cognite.extractorutils.unstable.core.base import FullConfig from cognite.extractorutils.unstable.core.tasks import ScheduledTask from cognite.extractorutils.util import now @@ -19,9 +20,12 @@ def test_simple_task_report( # Create a simple test extractor extractor = TestExtractor( - connection_config=connection_config, - application_config=application_config, - current_config_revision=1, + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + newest_config_revision=1, + ) ) extractor.add_task( @@ -76,7 +80,7 @@ def test_simple_task_report( # Test that the task run is entered into the history for that task res = extractor.cognite_client.get( - f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/history?extpipe={connection_config.extraction_pipeline}&taskName=TestTask", + f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/history?integration={connection_config.integration}&taskName=TestTask", headers={"cdf-version": "alpha"}, ).json() diff --git a/tests/test_unstable/test_configuration.py b/tests/test_unstable/test_configuration.py index f532f34..2d5910d 100644 --- a/tests/test_unstable/test_configuration.py +++ b/tests/test_unstable/test_configuration.py @@ -7,7 +7,7 @@ project: test-project base-url: https://baseurl.com -extraction-pipeline: test-pipeline +integration: test-pipeline authentication: type: client-credentials diff --git a/tests/test_unstable/test_errors.py b/tests/test_unstable/test_errors.py index 23bc5f2..3a705ff 100644 --- a/tests/test_unstable/test_errors.py +++ b/tests/test_unstable/test_errors.py @@ -3,6 +3,7 @@ import pytest from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, IntervalConfig, TimeIntervalConfig +from cognite.extractorutils.unstable.core.base import FullConfig from cognite.extractorutils.unstable.core.errors import ErrorLevel from cognite.extractorutils.unstable.core.tasks import ScheduledTask from tests.test_unstable.conftest import TestConfig, TestExtractor @@ -13,9 +14,12 @@ def test_global_error( application_config: TestConfig, ) -> None: extractor = TestExtractor( - connection_config=connection_config, - application_config=application_config, - current_config_revision=1, + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + newest_config_revision=1, + ) ) err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") @@ -41,9 +45,12 @@ def test_instant_error( application_config: TestConfig, ) -> None: extractor = TestExtractor( - connection_config=connection_config, - application_config=application_config, - current_config_revision=1, + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + newest_config_revision=1, + ) ) err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") @@ -66,9 +73,12 @@ def test_task_error( application_config: TestConfig, ) -> None: extractor = TestExtractor( - connection_config=connection_config, - application_config=application_config, - current_config_revision=1, + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + newest_config_revision=1, + ) ) def task() -> None: @@ -108,9 +118,12 @@ def test_crashing_task( application_config: TestConfig, ) -> None: extractor = TestExtractor( - connection_config=connection_config, - application_config=application_config, - current_config_revision=1, + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + newest_config_revision=1, + ) ) def task() -> None: @@ -151,9 +164,12 @@ def test_reporting_errors( checkin_between: bool, ) -> None: extractor = TestExtractor( - connection_config=connection_config, - application_config=application_config, - current_config_revision=1, + FullConfig( + connection_config=connection_config, + application_config=application_config, + current_config_revision=1, + newest_config_revision=1, + ) ) err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") @@ -165,7 +181,7 @@ def test_reporting_errors( extractor._checkin() res = extractor.cognite_client.get( - f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/errors?extpipe={connection_config.extraction_pipeline}", + f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/errors?integration={connection_config.integration}", headers={"cdf-version": "alpha"}, ).json()["items"] assert len(res) == 1 @@ -182,7 +198,7 @@ def test_reporting_errors( extractor._checkin() res = extractor.cognite_client.get( - f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/errors?extpipe={connection_config.extraction_pipeline}", + f"/api/v1/projects/{extractor.cognite_client.config.project}/odin/errors?integration={connection_config.integration}", headers={"cdf-version": "alpha"}, ).json()["items"] assert len(res) == 1