From 9c1697781a7ef12cee993251c459567749f17a64 Mon Sep 17 00:00:00 2001 From: Anders Albert <60234212+doctrino@users.noreply.github.com> Date: Tue, 3 Sep 2024 15:00:42 +0200 Subject: [PATCH] [CDF-22379] Hosted Extractors: Source (#1893) --- CHANGELOG.md | 4 + .../client/_api/hosted_extractors/__init__.py | 16 + .../client/_api/hosted_extractors/sources.py | 275 ++++++++ cognite/client/_api_client.py | 7 +- cognite/client/_cognite_client.py | 2 + cognite/client/_version.py | 2 +- cognite/client/data_classes/_base.py | 2 +- .../data_classes/datapoints_subscriptions.py | 2 +- .../hosted_extractors/__init__.py | 35 + .../data_classes/hosted_extractors/sources.py | 627 ++++++++++++++++++ cognite/client/testing.py | 5 + docs/source/hosted_extractors.rst | 24 + docs/source/index.rst | 1 + pyproject.toml | 2 +- scripts/add_capability.py | 22 +- .../test_hosted_extractors/__init__.py | 0 .../test_hosted_extractors/test_sources.py | 96 +++ tests/tests_unit/test_base.py | 16 +- tests/tests_unit/test_meta.py | 2 +- 19 files changed, 1120 insertions(+), 20 deletions(-) create mode 100644 cognite/client/_api/hosted_extractors/__init__.py create mode 100644 cognite/client/_api/hosted_extractors/sources.py create mode 100644 cognite/client/data_classes/hosted_extractors/__init__.py create mode 100644 cognite/client/data_classes/hosted_extractors/sources.py create mode 100644 docs/source/hosted_extractors.rst create mode 100644 tests/tests_integration/test_api/test_hosted_extractors/__init__.py create mode 100644 tests/tests_integration/test_api/test_hosted_extractors/test_sources.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b49580bb9..2c99ca06e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [7.58.2] - 2024-09-03 +### Added +- [Feature Preview - alpha] Support for `client.hosted_extractors.sources`. + ## [7.58.1] - 2024-09-03 ### Fixed - [Feature Preview - beta] data workflows: `workflowExecutionId` in `cognite.client.data_classes.workflows.WorkflowTriggerRun` diff --git a/cognite/client/_api/hosted_extractors/__init__.py b/cognite/client/_api/hosted_extractors/__init__.py new file mode 100644 index 000000000..6c6a23a93 --- /dev/null +++ b/cognite/client/_api/hosted_extractors/__init__.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from cognite.client._api.hosted_extractors.sources import SourcesAPI +from cognite.client._api_client import APIClient + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class HostedExtractorsAPI(APIClient): + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self.sources = SourcesAPI(config, api_version, cognite_client) diff --git a/cognite/client/_api/hosted_extractors/sources.py b/cognite/client/_api/hosted_extractors/sources.py new file mode 100644 index 000000000..066930127 --- /dev/null +++ b/cognite/client/_api/hosted_extractors/sources.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import TYPE_CHECKING, Any, Literal, Sequence, overload + +from cognite.client._api_client import APIClient +from cognite.client._constants import DEFAULT_LIMIT_READ +from cognite.client.data_classes._base import CogniteResource, PropertySpec +from cognite.client.data_classes.hosted_extractors.sources import Source, SourceList, SourceUpdate, SourceWrite +from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import ClientConfig, CogniteClient + + +class SourcesAPI(APIClient): + _RESOURCE_PATH = "/hostedextractors/sources" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._warning = FeaturePreviewWarning( + api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors" + ) + self._CREATE_LIMIT = 100 + self._LIST_LIMIT = 100 + self._RETRIEVE_LIMIT = 100 + self._DELETE_LIMIT = 100 + self._UPDATE_LIMIT = 100 + + @overload + def __call__( + self, + chunk_size: None = None, + limit: int | None = None, + ) -> Iterator[Source]: ... + + @overload + def __call__( + self, + chunk_size: int, + limit: int | None = None, + ) -> Iterator[SourceList]: ... + + def __call__( + self, + chunk_size: int | None = None, + limit: int | None = None, + ) -> Iterator[Source] | Iterator[SourceList]: + """Iterate over sources + + Fetches sources as they are iterated over, so you keep a limited number of sources in memory. + + Args: + chunk_size (int | None): Number of sources to return in each chunk. Defaults to yielding one source a time. + limit (int | None): Maximum number of sources to return. Defaults to returning all items. + + Returns: + Iterator[Source] | Iterator[SourceList]: yields Source one by one if chunk_size is not specified, else SourceList objects. + """ + self._warning.warn() + + return self._list_generator( + list_cls=SourceList, + resource_cls=Source, # type: ignore[type-abstract] + method="GET", + chunk_size=chunk_size, + limit=limit, + headers={"cdf-version": "beta"}, + ) + + def __iter__(self) -> Iterator[Source]: + """Iterate over sources + + Fetches sources as they are iterated over, so you keep a limited number of sources in memory. + + Returns: + Iterator[Source]: yields Source one by one. + """ + return self() + + @overload + def retrieve(self, external_ids: str, ignore_unknown_ids: bool = False) -> Source: ... + + @overload + def retrieve(self, external_ids: SequenceNotStr[str], ignore_unknown_ids: bool = False) -> SourceList: ... + + def retrieve( + self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False + ) -> Source | SourceList: + """`Retrieve one or more sources. `_ + + Args: + external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type. + ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception. + + Returns: + Source | SourceList: Requested sources + + Examples: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> res = client.hosted_extractors.sources.retrieve('myMQTTSource') + + Get multiple sources by id: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> res = client.hosted_extractors.sources.retrieve(["myMQTTSource", "MyEventHubSource"], ignore_unknown_ids=True) + + """ + self._warning.warn() + return self._retrieve_multiple( + list_cls=SourceList, + resource_cls=Source, # type: ignore[type-abstract] + identifiers=IdentifierSequence.load(external_ids=external_ids), + ignore_unknown_ids=ignore_unknown_ids, + headers={"cdf-version": "beta"}, + ) + + def delete( + self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False, force: bool = False + ) -> None: + """`Delete one or more sources `_ + + Args: + external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type. + ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception. + force (bool): Delete any jobs associated with each item. + Examples: + + Delete sources by id:: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> client.hosted_extractors.sources.delete(["myMQTTSource", "MyEventHubSource"]) + """ + self._warning.warn() + extra_body_fields: dict[str, Any] = {} + if ignore_unknown_ids: + extra_body_fields["ignoreUnknownIds"] = True + if force: + extra_body_fields["force"] = True + + self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_ids), + wrap_ids=True, + headers={"cdf-version": "beta"}, + extra_body_fields=extra_body_fields or None, + ) + + @overload + def create(self, items: SourceWrite) -> Source: ... + + @overload + def create(self, items: Sequence[SourceWrite]) -> SourceList: ... + + def create(self, items: SourceWrite | Sequence[SourceWrite]) -> Source | SourceList: + """`Create one or more sources. `_ + + Args: + items (SourceWrite | Sequence[SourceWrite]): Source(s) to create. + + Returns: + Source | SourceList: Created source(s) + + Examples: + + Create new source: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.hosted_extractors import EventHubSourceWrite + >>> client = CogniteClient() + >>> source = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value') + >>> res = client.hosted_extractors.sources.create(source) + """ + self._warning.warn() + return self._create_multiple( + list_cls=SourceList, + resource_cls=Source, # type: ignore[type-abstract] + items=items, # type: ignore[arg-type] + input_resource_cls=SourceWrite, + headers={"cdf-version": "beta"}, + ) + + @overload + def update(self, items: SourceWrite | SourceUpdate) -> Source: ... + + @overload + def update(self, items: Sequence[SourceWrite | SourceUpdate]) -> SourceList: ... + + def update(self, items: SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]) -> Source | SourceList: + """`Update one or more sources. `_ + + Args: + items (SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]): Source(s) to update. + + Returns: + Source | SourceList: Updated source(s) + + Examples: + + Update source: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.hosted_extractors import EventHubSourceUpdate + >>> client = CogniteClient() + >>> source = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub") + >>> res = client.hosted_extractors.sources.update(source) + """ + self._warning.warn() + return self._update_multiple( + items=items, # type: ignore[arg-type] + list_cls=SourceList, + resource_cls=Source, # type: ignore[type-abstract] + update_cls=SourceUpdate, + headers={"cdf-version": "beta"}, + ) + + @classmethod + def _convert_resource_to_patch_object( + cls, + resource: CogniteResource, + update_attributes: list[PropertySpec], + mode: Literal["replace_ignore_null", "patch", "replace"] = "replace_ignore_null", + ) -> dict[str, dict[str, dict]]: + output = super()._convert_resource_to_patch_object(resource, update_attributes, mode) + if hasattr(resource, "_type"): + output["type"] = resource._type + return output + + def list( + self, + limit: int | None = DEFAULT_LIMIT_READ, + ) -> SourceList: + """`List sources `_ + + Args: + limit (int | None): Maximum number of sources to return. Defaults to 25. Set to -1, float("inf") or None to return all items. + + Returns: + SourceList: List of requested sources + + Examples: + + List sources: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> source_list = client.hosted_extractors.sources.list(limit=5) + + Iterate over sources:: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> for source in client.hosted_extractors.sources: + ... source # do something with the source + + Iterate over chunks of sources to reduce memory load:: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> for source_list in client.hosted_extractors.sources(chunk_size=25): + ... source_list # do something with the sources + """ + self._warning.warn() + return self._list( + list_cls=SourceList, + resource_cls=Source, # type: ignore[type-abstract] + method="GET", + limit=limit, + headers={"cdf-version": "beta"}, + ) diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index 74f9d4c8e..f58946a6c 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -106,6 +106,7 @@ class APIClient: "transformations/(filter|byids|jobs/byids|schedules/byids|query/run)", "extpipes/(list|byids|runs/list)", "workflows/.*", + "hostedextractors/.*", ) ) ] @@ -1038,7 +1039,11 @@ def _update_multiple( for index, item in enumerate(item_list): if isinstance(item, CogniteResource): patch_objects.append( - self._convert_resource_to_patch_object(item, update_cls._get_update_properties(item), mode) + self._convert_resource_to_patch_object( + item, + update_cls._get_update_properties(item), + mode, + ) ) elif isinstance(item, CogniteUpdate): patch_objects.append(item.dump(camel_case=True)) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 559143b09..12d83162f 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -16,6 +16,7 @@ from cognite.client._api.files import FilesAPI from cognite.client._api.functions import FunctionsAPI from cognite.client._api.geospatial import GeospatialAPI +from cognite.client._api.hosted_extractors import HostedExtractorsAPI from cognite.client._api.iam import IAMAPI from cognite.client._api.labels import LabelsAPI from cognite.client._api.raw import RawAPI @@ -71,6 +72,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.templates = TemplatesAPI(self._config, self._API_VERSION, self) self.vision = VisionAPI(self._config, self._API_VERSION, self) self.extraction_pipelines = ExtractionPipelinesAPI(self._config, self._API_VERSION, self) + self.hosted_extractors = HostedExtractorsAPI(self._config, self._API_VERSION, self) self.transformations = TransformationsAPI(self._config, self._API_VERSION, self) self.diagrams = DiagramsAPI(self._config, self._API_VERSION, self) self.annotations = AnnotationsAPI(self._config, self._API_VERSION, self) diff --git a/cognite/client/_version.py b/cognite/client/_version.py index b4bf6d573..b1cad4426 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.58.1" +__version__ = "7.58.2" __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/_base.py b/cognite/client/data_classes/_base.py index 551c861b3..3856850bd 100644 --- a/cognite/client/data_classes/_base.py +++ b/cognite/client/data_classes/_base.py @@ -528,7 +528,7 @@ def __init__(self, update_object: T_CogniteUpdate, name: str) -> None: self._update_object = update_object self._name = name - def _set(self, value: None | str | int | bool) -> T_CogniteUpdate: + def _set(self, value: None | str | int | bool | dict) -> T_CogniteUpdate: if value is None: self._update_object._set_null(self._name) else: diff --git a/cognite/client/data_classes/datapoints_subscriptions.py b/cognite/client/data_classes/datapoints_subscriptions.py index 273cc1eed..806595d4d 100644 --- a/cognite/client/data_classes/datapoints_subscriptions.py +++ b/cognite/client/data_classes/datapoints_subscriptions.py @@ -185,7 +185,7 @@ def set(self, value: Any) -> DataPointSubscriptionUpdate: class _FilterDataPointSubscriptionUpdate(CognitePrimitiveUpdate): def set(self, value: Filter) -> DataPointSubscriptionUpdate: - return self._set(value.dump()) # type: ignore[arg-type] + return self._set(value.dump()) class _ListDataPointSubscriptionUpdate(CogniteListUpdate): def set(self, value: list) -> DataPointSubscriptionUpdate: diff --git a/cognite/client/data_classes/hosted_extractors/__init__.py b/cognite/client/data_classes/hosted_extractors/__init__.py new file mode 100644 index 000000000..abadfb1c4 --- /dev/null +++ b/cognite/client/data_classes/hosted_extractors/__init__.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from cognite.client.data_classes.hosted_extractors.sources import ( + EventHubSource, + EventHubSourceUpdate, + EventHubSourceWrite, + MQTT3Source, + MQTT3SourceUpdate, + MQTT3SourceWrite, + MQTT5Source, + MQTT5SourceUpdate, + MQTT5SourceWrite, + Source, + SourceList, + SourceUpdate, + SourceWrite, + SourceWriteList, +) + +__all__ = [ + "EventHubSource", + "EventHubSourceWrite", + "MQTT3Source", + "MQTT3SourceWrite", + "MQTT5Source", + "MQTT5SourceWrite", + "Source", + "SourceList", + "SourceWrite", + "SourceWriteList", + "SourceUpdate", + "MQTT3SourceUpdate", + "MQTT5SourceUpdate", + "EventHubSourceUpdate", +] diff --git a/cognite/client/data_classes/hosted_extractors/sources.py b/cognite/client/data_classes/hosted_extractors/sources.py new file mode 100644 index 000000000..640244db8 --- /dev/null +++ b/cognite/client/data_classes/hosted_extractors/sources.py @@ -0,0 +1,627 @@ +from __future__ import annotations + +import itertools +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, ClassVar, Literal, NoReturn, cast + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CognitePrimitiveUpdate, + CogniteResource, + CogniteResourceList, + CogniteUpdate, + ExternalIDTransformerMixin, + PropertySpec, + T_WriteClass, + UnknownCogniteObject, + WriteableCogniteResource, + WriteableCogniteResourceList, +) + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +class SourceWrite(CogniteResource, ABC): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the write/request format of the source resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + """ + + _type: ClassVar[str] + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + @abstractmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + raise NotImplementedError() + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + type_ = resource.get("type") + if type_ is None and hasattr(cls, "_type"): + type_ = cls._type + elif type_ is None: + raise KeyError("type") + try: + return cast(Self, _SOURCE_WRITE_CLASS_BY_TYPE[type_]._load_source(resource)) + except KeyError: + raise TypeError(f"Unknown source type: {type_}") + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["type"] = self._type + return output + + +class Source(WriteableCogniteResource[T_WriteClass], ABC): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the read/response format of the source resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + """ + + _type: ClassVar[str] + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + @abstractmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + raise NotImplementedError() + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + type_ = resource.get("type") + if type_ is None and hasattr(cls, "_type"): + type_ = cls._type + elif type_ is None: + raise KeyError("type") + source_class = _SOURCE_CLASS_BY_TYPE.get(type_) + if source_class is None: + return UnknownCogniteObject(resource) # type: ignore[return-value] + return cast(Self, source_class._load_source(resource)) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["type"] = self._type + return output + + +class SourceUpdate(CogniteUpdate, ABC): + _type: ClassVar[str] + + def __init__(self, external_id: str) -> None: + super().__init__(external_id=external_id) + + def dump(self, camel_case: Literal[True] = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["type"] = self._type + return output + + @classmethod + def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]: + if item is None or not isinstance(item, SourceWrite): + return [] + return _SOURCE_UPDATE_BY_TYPE[item._type]._get_update_properties(item) + + @classmethod + def _get_extra_identifying_properties(cls, item: CogniteResource | None = None) -> dict[str, Any]: + if not isinstance(item, SourceWrite): + return {} + return {"type": item._type} + + +class EventHubSourceWrite(SourceWrite): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the write/request format of the source resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + host (str): URL of the event hub consumer endpoint. + event_hub_name (str): Name of the event hub + key_name (str): The name of the Event Hub key to use. + key_value (str): Value of the Event Hub key to use for authentication. + consumer_group (str | None): The event hub consumer group to use. Microsoft recommends having a distinct consumer group for each application consuming data from event hub. If left out, this uses the default consumer group. + """ + + _type = "eventhub" + + def __init__( + self, + external_id: str, + host: str, + event_hub_name: str, + key_name: str, + key_value: str, + consumer_group: str | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.event_hub_name = event_hub_name + self.key_name = key_name + self.key_value = key_value + self.consumer_group = consumer_group + + def as_write(self) -> SourceWrite: + return self + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + event_hub_name=resource["eventHubName"], + key_name=resource["keyName"], + key_value=resource["keyValue"], + consumer_group=resource.get("consumerGroup"), + ) + + +class EventHubSource(Source): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the read/response format of the source resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + host (str): URL of the event hub consumer endpoint. + event_hub_name (str): Name of the event hub + key_name (str): The name of the Event Hub key to use. + created_time (int): No description. + last_updated_time (int): No description. + consumer_group (str | None): The event hub consumer group to use. Microsoft recommends having a distinct consumer group for each application consuming data from event hub. If left out, this uses the default consumer group. + """ + + _type = "eventhub" + + def __init__( + self, + external_id: str, + host: str, + event_hub_name: str, + key_name: str, + created_time: int, + last_updated_time: int, + consumer_group: str | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.event_hub_name = event_hub_name + self.key_name = key_name + self.consumer_group = consumer_group + self.created_time = created_time + self.last_updated_time = last_updated_time + + def as_write(self) -> NoReturn: + raise TypeError(f"{type(self).__name__} cannot be converted to write as id does not contain the secrets") + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + event_hub_name=resource["eventHubName"], + key_name=resource["keyName"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + consumer_group=resource.get("consumerGroup"), + ) + + +class EventHubSourceUpdate(SourceUpdate): + _type = "eventhub" + + class _PrimitiveEventHubSourceUpdate(CognitePrimitiveUpdate): + def set(self, value: str) -> EventHubSourceUpdate: + return self._set(value) + + class _PrimitiveNullableEventHubSourceUpdate(CognitePrimitiveUpdate): + def set(self, value: str | None) -> EventHubSourceUpdate: + return self._set(value) + + @property + def host(self) -> _PrimitiveEventHubSourceUpdate: + return EventHubSourceUpdate._PrimitiveEventHubSourceUpdate(self, "host") + + @property + def event_hub_name(self) -> _PrimitiveEventHubSourceUpdate: + return EventHubSourceUpdate._PrimitiveEventHubSourceUpdate(self, "eventHubName") + + @property + def key_name(self) -> _PrimitiveEventHubSourceUpdate: + return EventHubSourceUpdate._PrimitiveEventHubSourceUpdate(self, "keyName") + + @property + def key_value(self) -> _PrimitiveEventHubSourceUpdate: + return EventHubSourceUpdate._PrimitiveEventHubSourceUpdate(self, "keyValue") + + @property + def consumer_group(self) -> _PrimitiveNullableEventHubSourceUpdate: + return EventHubSourceUpdate._PrimitiveNullableEventHubSourceUpdate(self, "consumerGroup") + + @classmethod + def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]: + return [ + PropertySpec("host", is_nullable=False), + PropertySpec("event_hub_name", is_nullable=False), + PropertySpec("key_name", is_nullable=False), + PropertySpec("key_value", is_nullable=False), + PropertySpec("consumer_group", is_nullable=True), + ] + + +@dataclass +class MQTTAuthenticationWrite(CogniteObject, ABC): + _type: ClassVar[str] + + @classmethod + @abstractmethod + def _load_authentication(cls, resource: dict[str, Any]) -> Self: + raise NotImplementedError() + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + type_ = resource.get("type") + if type_ is None and hasattr(cls, "_type"): + type_ = cls._type + elif type_ is None: + raise KeyError("type is required") + try: + return cast(Self, _MQTTAUTHENTICATION_WRITE_CLASS_BY_TYPE[type_]._load_authentication(resource)) + except KeyError: + raise TypeError(f"Unknown authentication type: {type_}") + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["type"] = self._type + return output + + +@dataclass +class BasicMQTTAuthenticationWrite(MQTTAuthenticationWrite): + _type = "basic" + username: str + password: str | None + + @classmethod + def _load_authentication(cls, resource: dict[str, Any]) -> Self: + return cls( + username=resource["username"], + password=resource.get("password"), + ) + + +@dataclass +class CACertificateWrite(CogniteObject): + type: Literal["der", "pem"] + certificate: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(type=resource["type"], certificate=resource["certificate"]) + + +@dataclass +class AuthCertificateWrite(CogniteObject): + type: Literal["der", "pem"] + certificate: str + key: str + key_password: str | None + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + type=resource["type"], + certificate=resource["certificate"], + key=resource["key"], + key_password=resource.get("keyPassword"), + ) + + +class _MQTTSourceWrite(SourceWrite, ABC): + def __init__( + self, + external_id: str, + host: str, + port: int | None = None, + authentication: MQTTAuthenticationWrite | None = None, + use_tls: bool = False, + ca_certificate: CACertificateWrite | None = None, + auth_certificate: AuthCertificateWrite | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.port = port + self.authentication = authentication + self.use_tls = use_tls + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + port=resource.get("port"), + authentication=MQTTAuthenticationWrite._load(resource["authentication"]) + if "authentication" in resource + else None, + use_tls=resource.get("useTls", False), + ca_certificate=CACertificateWrite._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificateWrite._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + if isinstance(self.authentication, MQTTAuthenticationWrite): + output["authentication"] = self.authentication.dump(camel_case) + if isinstance(self.ca_certificate, CACertificateWrite): + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if isinstance(self.auth_certificate, AuthCertificateWrite): + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +@dataclass +class MQTTAuthentication(CogniteObject, ABC): + _type: ClassVar[str] + + @classmethod + @abstractmethod + def _load_authentication(cls, resource: dict[str, Any]) -> Self: + raise NotImplementedError() + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + type_ = resource.get("type") + if type_ is None and hasattr(cls, "_type"): + type_ = cls._type + elif type_ is None: + raise KeyError("type") + + authentication_class = _MQTTAUTHENTICATION_CLASS_BY_TYPE.get(type_) + if authentication_class is None: + return UnknownCogniteObject(resource) # type: ignore[return-value] + return cast(Self, authentication_class._load_authentication(resource)) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["type"] = self._type + return output + + +@dataclass +class BasicMQTTAuthentication(MQTTAuthentication): + _type = "basic" + username: str + + @classmethod + def _load_authentication(cls, resource: dict[str, Any]) -> Self: + return cls(username=resource["username"]) + + +@dataclass +class CACertificate(CogniteObject): + thumbprint: str + expires_at: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(thumbprint=resource["thumbprint"], expires_at=resource["expiresAt"]) + + +@dataclass +class AuthCertificate(CogniteObject): + thumbprint: str + expires_at: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(thumbprint=resource["thumbprint"], expires_at=resource["expiresAt"]) + + +class _MQTTSource(Source, ABC): + def __init__( + self, + external_id: str, + host: str, + created_time: int, + last_updated_time: int, + port: int | None = None, + authentication: MQTTAuthentication | None = None, + use_tls: bool = False, + ca_certificate: CACertificate | None = None, + auth_certificate: AuthCertificate | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.port = port + self.authentication = authentication + self.use_tls = use_tls + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + self.created_time = created_time + self.last_updated_time = last_updated_time + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + port=resource.get("port"), + authentication=MQTTAuthentication._load(resource["authentication"]) + if "authentication" in resource + else None, + use_tls=resource.get("useTls", False), + ca_certificate=CACertificate._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificate._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + ) + + def as_write(self) -> NoReturn: + raise TypeError(f"{type(self).__name__} cannot be converted to write as id does not contain the secrets") + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + if isinstance(self.authentication, MQTTAuthentication): + output["authentication"] = self.authentication.dump(camel_case) + if isinstance(self.ca_certificate, CACertificate): + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if isinstance(self.auth_certificate, AuthCertificate): + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +class _MQTTUpdate(SourceUpdate, ABC): + class _HostUpdate(CognitePrimitiveUpdate): + def set(self, value: str) -> _MQTTUpdate: + return self._set(value) + + class _PortUpdate(CognitePrimitiveUpdate): + def set(self, value: int | None) -> _MQTTUpdate: + return self._set(value) + + class _AuthenticationUpdate(CognitePrimitiveUpdate): + def set(self, value: MQTTAuthentication | None) -> _MQTTUpdate: + return self._set(value.dump() if value else None) + + class _UseTlsUpdate(CognitePrimitiveUpdate): + def set(self, value: bool) -> _MQTTUpdate: + return self._set(value) + + class _CACertificateUpdate(CognitePrimitiveUpdate): + def set(self, value: CACertificate | None) -> _MQTTUpdate: + return self._set(value.dump() if value else None) + + class _AuthCertificateUpdate(CognitePrimitiveUpdate): + def set(self, value: AuthCertificate | None) -> _MQTTUpdate: + return self._set(value.dump() if value else None) + + @property + def host(self) -> _HostUpdate: + return _MQTTUpdate._HostUpdate(self, "host") + + @property + def port(self) -> _PortUpdate: + return _MQTTUpdate._PortUpdate(self, "port") + + @property + def authentication(self) -> _AuthenticationUpdate: + return _MQTTUpdate._AuthenticationUpdate(self, "authentication") + + @property + def useTls(self) -> _UseTlsUpdate: + return _MQTTUpdate._UseTlsUpdate(self, "useTls") + + @property + def ca_certificate(self) -> _CACertificateUpdate: + return _MQTTUpdate._CACertificateUpdate(self, "caCertificate") + + @property + def auth_certificate(self) -> _AuthCertificateUpdate: + return _MQTTUpdate._AuthCertificateUpdate(self, "authCertificate") + + @classmethod + def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]: + return [ + PropertySpec("host", is_nullable=False), + PropertySpec("port", is_nullable=True), + PropertySpec("authentication", is_nullable=True, is_container=True), + PropertySpec("useTls", is_nullable=False), + PropertySpec("ca_certificate", is_nullable=True, is_container=True), + PropertySpec("auth_certificate", is_nullable=True, is_container=True), + ] + + +class MQTT3SourceWrite(_MQTTSourceWrite): + _type = "mqtt3" + + +class MQTT5SourceWrite(_MQTTSourceWrite): + _type = "mqtt5" + + +class MQTT3Source(_MQTTSource): + _type = "mqtt3" + + +class MQTT5Source(_MQTTSource): + _type = "mqtt5" + + +class MQTT3SourceUpdate(_MQTTUpdate): + _type = "mqtt3" + + +class MQTT5SourceUpdate(_MQTTUpdate): + _type = "mqtt5" + + +class SourceWriteList(CogniteResourceList[SourceWrite], ExternalIDTransformerMixin): + _RESOURCE = SourceWrite + + +class SourceList(WriteableCogniteResourceList[SourceWrite, Source], ExternalIDTransformerMixin): + _RESOURCE = Source + + def as_write( + self, + ) -> NoReturn: + raise TypeError(f"{type(self).__name__} cannot be converted to write") + + +_SOURCE_WRITE_CLASS_BY_TYPE: dict[str, type[SourceWrite]] = { + subclass._type: subclass # type: ignore[misc] + for subclass in itertools.chain(SourceWrite.__subclasses__(), _MQTTSourceWrite.__subclasses__()) + if hasattr(subclass, "_type") +} + +_SOURCE_CLASS_BY_TYPE: dict[str, type[Source]] = { + subclass._type: subclass # type: ignore[misc] + for subclass in itertools.chain(Source.__subclasses__(), _MQTTSource.__subclasses__()) + if hasattr(subclass, "_type") +} + +_SOURCE_UPDATE_BY_TYPE: dict[str, type[SourceUpdate]] = { + subclass._type: subclass + for subclass in itertools.chain(SourceUpdate.__subclasses__(), _MQTTUpdate.__subclasses__()) + if hasattr(subclass, "_type") +} + +_MQTTAUTHENTICATION_WRITE_CLASS_BY_TYPE: dict[str, type[MQTTAuthenticationWrite]] = { + subclass._type: subclass # type: ignore[type-abstract] + for subclass in MQTTAuthenticationWrite.__subclasses__() + if hasattr(subclass, "_type") +} + +_MQTTAUTHENTICATION_CLASS_BY_TYPE: dict[str, type[MQTTAuthentication]] = { + subclass._type: subclass # type: ignore[type-abstract] + for subclass in MQTTAuthentication.__subclasses__() + if hasattr(subclass, "_type") +} diff --git a/cognite/client/testing.py b/cognite/client/testing.py index bfc3f8593..a4bfcb6f3 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -29,6 +29,8 @@ from cognite.client._api.files import FilesAPI from cognite.client._api.functions import FunctionCallsAPI, FunctionsAPI, FunctionSchedulesAPI from cognite.client._api.geospatial import GeospatialAPI +from cognite.client._api.hosted_extractors import HostedExtractorsAPI +from cognite.client._api.hosted_extractors.sources import SourcesAPI from cognite.client._api.iam import IAMAPI, GroupsAPI, SecurityCategoriesAPI, SessionsAPI, TokenAPI from cognite.client._api.labels import LabelsAPI from cognite.client._api.raw import RawAPI, RawDatabasesAPI, RawRowsAPI, RawTablesAPI @@ -136,6 +138,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.sequences = MagicMock(spec=SequencesAPI) self.sequences.data = MagicMock(spec_set=SequencesDataAPI) + self.hosted_extractors = MagicMock(spec=HostedExtractorsAPI) + self.hosted_extractors.sources = MagicMock(spec_set=SourcesAPI) + self.templates = MagicMock(spec=TemplatesAPI) self.templates.groups = MagicMock(spec_set=TemplateGroupsAPI) self.templates.instances = MagicMock(spec_set=TemplateInstancesAPI) diff --git a/docs/source/hosted_extractors.rst b/docs/source/hosted_extractors.rst new file mode 100644 index 000000000..afc45101d --- /dev/null +++ b/docs/source/hosted_extractors.rst @@ -0,0 +1,24 @@ +Hosted Extractors +================= + +Sources +------- +Create new source +^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.hosted_extractors.SourcesAPI.create + +Delete source +^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.hosted_extractors.SourcesAPI.delete + +List sources +^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.hosted_extractors.SourcesAPI.list + +Retrieve sources +^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.hosted_extractors.SourcesAPI.retrieve + +Update sources +^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.hosted_extractors.SourcesAPI.update diff --git a/docs/source/index.rst b/docs/source/index.rst index f8d2044fc..1881c108b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -50,6 +50,7 @@ Contents contextualization documents data_ingestion + hosted_extractors data_organization transformations functions diff --git a/pyproject.toml b/pyproject.toml index 70327d822..777549750 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.58.1" +version = "7.58.2" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/scripts/add_capability.py b/scripts/add_capability.py index b47c8ae21..1240d5190 100644 --- a/scripts/add_capability.py +++ b/scripts/add_capability.py @@ -2,6 +2,7 @@ from pathlib import Path from cognite.client import CogniteClient +from cognite.client.data_classes.capabilities import Capability TMP_DIR = Path(__file__).resolve().parent / "tmp" @@ -9,9 +10,9 @@ def main(client: CogniteClient): new_capabilities = [ { - "experimentAcl": { - "actions": ["USE"], - "scope": {"experimentscope": {"experiments": ["workflowOrchestrator"]}}, + "hostedExtractorsAcl": { + "actions": ["READ", "WRITE"], + "scope": {"all": {}}, } }, ] @@ -33,28 +34,27 @@ def main(client: CogniteClient): (TMP_DIR / f"{selected_group.name}.json").write_text(json.dumps(selected_group.dump(camel_case=True), indent=4)) existing_capability_by_name = { - next(iter(capability.keys())): capability for capability in selected_group.capabilities + capability._capability_name: capability for capability in selected_group.capabilities } added = [] for new_capability in new_capabilities: (capability_name,) = new_capability.keys() if capability_name not in existing_capability_by_name: - selected_group.capabilities.append(new_capability) + selected_group.capabilities.append(Capability.load(new_capability)) added.append(capability_name) elif new_capability[capability_name] != existing_capability_by_name[capability_name]: # Capability exists, but with different scope or actions - selected_group.capabilities.append(new_capability) - added.append(capability_name) + raise NotImplementedError() + # selected_group.capabilities.append(new_capability) + # added.append(capability_name) else: print(f"Capability {capability_name} already exists") if not added: print("All capabilities already exists") return delete_id = selected_group.id - selected_group.id = None - selected_group.is_deleted = None - selected_group.deleted_time = None - client.iam.groups.create(selected_group) + selected_group_write = selected_group.as_write() + client.iam.groups.create(selected_group_write) client.iam.groups.delete(delete_id) diff --git a/tests/tests_integration/test_api/test_hosted_extractors/__init__.py b/tests/tests_integration/test_api/test_hosted_extractors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/tests_integration/test_api/test_hosted_extractors/test_sources.py b/tests/tests_integration/test_api/test_hosted_extractors/test_sources.py new file mode 100644 index 000000000..08cba2afb --- /dev/null +++ b/tests/tests_integration/test_api/test_hosted_extractors/test_sources.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import platform + +import pytest + +from cognite.client import CogniteClient +from cognite.client.data_classes.hosted_extractors import ( + EventHubSource, + EventHubSourceUpdate, + EventHubSourceWrite, + SourceList, +) +from cognite.client.exceptions import CogniteAPIError +from cognite.client.utils._text import random_string + + +@pytest.fixture(scope="session") +def one_event_hub_source(cognite_client: CogniteClient) -> SourceList: + my_hub = EventHubSourceWrite( + external_id=f"myNewHub-{platform.system()}-{platform.python_version()}", + host="myHost", + key_name="myKeyName", + key_value="myKey", + event_hub_name="myEventHub", + ) + retrieved = cognite_client.hosted_extractors.sources.retrieve([my_hub.external_id], ignore_unknown_ids=True) + if retrieved: + return retrieved + return cognite_client.hosted_extractors.sources.create([my_hub]) + + +class TestSources: + def test_create_update_retrieve_delete(self, cognite_client: CogniteClient) -> None: + my_hub = EventHubSourceWrite( + external_id=f"myNewHub-{random_string(10)}", + host="myHost", + key_name="myKeyName", + key_value="myKey", + event_hub_name="myEventHub", + ) + created: EventHubSource | None = None + try: + created = cognite_client.hosted_extractors.sources.create(my_hub) + assert isinstance(created, EventHubSource) + update = EventHubSourceUpdate(external_id=my_hub.external_id).event_hub_name.set("myNewEventHub") + updated = cognite_client.hosted_extractors.sources.update(update) + assert updated.event_hub_name == "myNewEventHub" + retrieved = cognite_client.hosted_extractors.sources.retrieve(created.external_id) + assert retrieved is not None + assert retrieved.external_id == created.external_id + assert retrieved.event_hub_name == "myNewEventHub" + + cognite_client.hosted_extractors.sources.delete(created.external_id) + + with pytest.raises(CogniteAPIError): + cognite_client.hosted_extractors.sources.retrieve(created.external_id) + + cognite_client.hosted_extractors.sources.retrieve(created.external_id, ignore_unknown_ids=True) + + finally: + if created: + cognite_client.hosted_extractors.sources.delete(created.external_id, ignore_unknown_ids=True) + + @pytest.mark.usefixtures("one_event_hub_source") + def test_list(self, cognite_client: CogniteClient) -> None: + res = cognite_client.hosted_extractors.sources.list(limit=1) + assert len(res) == 1 + assert isinstance(res, SourceList) + + def test_update_using_write_object(self, cognite_client: CogniteClient) -> None: + my_hub = EventHubSourceWrite( + external_id=f"to-update-{random_string(10)}", + host="myHost", + key_name="myKeyName", + key_value="myKey", + event_hub_name="myEventHub", + ) + created: EventHubSource | None = None + try: + created = cognite_client.hosted_extractors.sources.create(my_hub) + + my_new_hub = EventHubSourceWrite( + external_id=created.external_id, + host="updatedHost", + key_name="updatedKeyName", + key_value="updatedKey", + event_hub_name="updatedEventHub", + ) + + updated = cognite_client.hosted_extractors.sources.update(my_new_hub) + + assert updated.host == my_new_hub.host + finally: + if created: + cognite_client.hosted_extractors.sources.delete(created.external_id, ignore_unknown_ids=True) diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index 1d7232564..317a82945 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -36,10 +36,11 @@ ) from cognite.client.data_classes.datapoints import DatapointsArray from cognite.client.data_classes.events import Event, EventList +from cognite.client.data_classes.hosted_extractors import Source, SourceList from cognite.client.exceptions import CogniteMissingClientError from cognite.client.testing import CogniteClientMock from cognite.client.utils import _json -from tests.utils import FakeCogniteResourceGenerator, all_concrete_subclasses, all_subclasses +from tests.utils import FakeCogniteResourceGenerator, all_concrete_subclasses class MyResource(CogniteResource): @@ -192,7 +193,9 @@ def test_dump_load_only_required( "cognite_writable_cls", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") + # Hosted extractors does not support the as_write method for cls in all_concrete_subclasses(WriteableCogniteResource) + if not issubclass(cls, Source) ], ) def test_writable_as_write( @@ -210,7 +213,7 @@ def test_writable_as_write( [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") for cls in all_concrete_subclasses(WriteableCogniteResourceList) - if cls not in [EdgeListWithCursor, NodeListWithCursor] + if cls not in [EdgeListWithCursor, NodeListWithCursor, SourceList] ], ) def test_writable_list_as_write( @@ -762,13 +765,20 @@ def test_get_update_properties(self): assert hasattr(MyUpdate, "columns") and "columns" not in props assert {"string", "list", "object", "labels"} == set(props) - @pytest.mark.parametrize("cognite_update_subclass", all_subclasses(CogniteUpdate)) + @pytest.mark.parametrize("cognite_update_subclass", all_concrete_subclasses(CogniteUpdate)) def test_correct_implementation_get_update_properties(self, cognite_update_subclass: CogniteUpdate): expected = sorted( key for key in cognite_update_subclass.__dict__ if not key.startswith("_") and key not in {"columns", "dump"} ) + if not expected: + # Check parent class if there are no attributes in the subclass + expected = sorted( + key + for key in cognite_update_subclass.__bases__[0].__dict__ + if not key.startswith("_") and key != "dump" + ) actual = sorted(prop.name for prop in cognite_update_subclass._get_update_properties()) assert expected == actual diff --git a/tests/tests_unit/test_meta.py b/tests/tests_unit/test_meta.py index 30d38c12a..df4199468 100644 --- a/tests/tests_unit/test_meta.py +++ b/tests/tests_unit/test_meta.py @@ -79,7 +79,7 @@ def apis_that_should_not_have_post_retry_rule(): ) def test_all_base_api_paths_have_retry_or_specifically_no_set( api, apis_with_post_method_retry_set, apis_that_should_not_have_post_retry_rule -): +) -> None: # So you've added a new API to the SDK, but suddenly this test is failing - what's the deal?! # Answer the following: # Does this new API have POST methods that should be retried automatically?