From ed8543186dbd017249eccccef7867840b8190b22 Mon Sep 17 00:00:00 2001 From: Anders Albert <60234212+doctrino@users.noreply.github.com> Date: Tue, 24 Sep 2024 18:03:40 +0200 Subject: [PATCH] [CDF-22379] Hosted Extractors Source: Kafka and Rest (#1899) --- CHANGELOG.md | 6 +- cognite/client/_version.py | 2 +- cognite/client/data_classes/_base.py | 2 +- .../hosted_extractors/__init__.py | 12 + .../data_classes/hosted_extractors/sources.py | 394 +++++++++++++++++- pyproject.toml | 2 +- 6 files changed, 398 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d34fa5253..35e7e4f7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,11 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. -## [7.62.2] - 2024-09-23 +## [7.62.3] - 2024-09-24 +### Added +- [Feature Preview - alpha] Support for `Kafka` and `Rest` sources in `client.hosted_extractors.sources`. + +## [7.62.2] - 2024-09-24 ### Added - [Feature Preview - alpha] Support for `client.hosted_extractors.mappings`. diff --git a/cognite/client/_version.py b/cognite/client/_version.py index f0c148a8b..937fe21d7 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.62.2" +__version__ = "7.62.3" __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/_base.py b/cognite/client/data_classes/_base.py index f4fdaed0e..0e2c7027e 100644 --- a/cognite/client/data_classes/_base.py +++ b/cognite/client/data_classes/_base.py @@ -533,7 +533,7 @@ def __init__(self, update_object: T_CogniteUpdate, name: str) -> None: self._update_object = update_object self._name = name - def _set(self, value: None | str | int | bool | dict) -> T_CogniteUpdate: + def _set(self, value: None | str | int | bool | dict | list) -> T_CogniteUpdate: if value is None: self._update_object._set_null(self._name) else: diff --git a/cognite/client/data_classes/hosted_extractors/__init__.py b/cognite/client/data_classes/hosted_extractors/__init__.py index 7da32acc6..ad4462399 100644 --- a/cognite/client/data_classes/hosted_extractors/__init__.py +++ b/cognite/client/data_classes/hosted_extractors/__init__.py @@ -53,12 +53,18 @@ EventHubSource, EventHubSourceUpdate, EventHubSourceWrite, + KafkaSource, + KafkaSourceUpdate, + KafkaSourceWrite, MQTT3Source, MQTT3SourceUpdate, MQTT3SourceWrite, MQTT5Source, MQTT5SourceUpdate, MQTT5SourceWrite, + RestSource, + RestSourceUpdate, + RestSourceWrite, Source, SourceList, SourceUpdate, @@ -124,4 +130,10 @@ "ProtoBufInput", "JSONInput", "XMLInput", + "RestSource", + "RestSourceWrite", + "RestSourceUpdate", + "KafkaSource", + "KafkaSourceWrite", + "KafkaSourceUpdate", ] diff --git a/cognite/client/data_classes/hosted_extractors/sources.py b/cognite/client/data_classes/hosted_extractors/sources.py index 598557572..078c60c72 100644 --- a/cognite/client/data_classes/hosted_extractors/sources.py +++ b/cognite/client/data_classes/hosted_extractors/sources.py @@ -2,6 +2,7 @@ import itertools from abc import ABC, abstractmethod +from collections.abc import Sequence from dataclasses import dataclass from typing import TYPE_CHECKING, Any, ClassVar, Literal, NoReturn, cast @@ -266,7 +267,7 @@ def _get_update_properties(cls, item: CogniteResource | None = None) -> list[Pro @dataclass -class MQTTAuthenticationWrite(CogniteObject, ABC): +class AuthenticationWrite(CogniteObject, ABC): _type: ClassVar[str] @classmethod @@ -293,7 +294,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: @dataclass -class BasicMQTTAuthenticationWrite(MQTTAuthenticationWrite): +class BasicMQTTAuthenticationWrite(AuthenticationWrite): _type = "basic" username: str password: str | None @@ -334,12 +335,14 @@ def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = class _MQTTSourceWrite(SourceWrite, ABC): + _type = "mqtt" + def __init__( self, external_id: str, host: str, port: int | None = None, - authentication: MQTTAuthenticationWrite | None = None, + authentication: AuthenticationWrite | None = None, use_tls: bool = False, ca_certificate: CACertificateWrite | None = None, auth_certificate: AuthCertificateWrite | None = None, @@ -358,7 +361,7 @@ def _load_source(cls, resource: dict[str, Any]) -> Self: external_id=resource["externalId"], host=resource["host"], port=resource.get("port"), - authentication=MQTTAuthenticationWrite._load(resource["authentication"]) + authentication=AuthenticationWrite._load(resource["authentication"]) if "authentication" in resource else None, use_tls=resource.get("useTls", False), @@ -370,7 +373,7 @@ def _load_source(cls, resource: dict[str, Any]) -> Self: def dump(self, camel_case: bool = True) -> dict[str, Any]: output = super().dump(camel_case) - if isinstance(self.authentication, MQTTAuthenticationWrite): + if isinstance(self.authentication, AuthenticationWrite): output["authentication"] = self.authentication.dump(camel_case) if isinstance(self.ca_certificate, CACertificateWrite): output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) @@ -438,6 +441,8 @@ def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = class _MQTTSource(Source, ABC): + _type = "mqtt" + def __init__( self, external_id: str, @@ -493,6 +498,8 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class _MQTTUpdate(SourceUpdate, ABC): + _type = "mqtt" + class _HostUpdate(CognitePrimitiveUpdate): def set(self, value: str) -> _MQTTUpdate: return self._set(value) @@ -530,7 +537,7 @@ def authentication(self) -> _AuthenticationUpdate: return _MQTTUpdate._AuthenticationUpdate(self, "authentication") @property - def useTls(self) -> _UseTlsUpdate: + def use_tls(self) -> _UseTlsUpdate: return _MQTTUpdate._UseTlsUpdate(self, "useTls") @property @@ -547,7 +554,7 @@ def _get_update_properties(cls, item: CogniteResource | None = None) -> list[Pro PropertySpec("host", is_nullable=False), PropertySpec("port", is_nullable=True), PropertySpec("authentication", is_nullable=True, is_object=True), - PropertySpec("useTls", is_nullable=False), + PropertySpec("use_tls", is_nullable=False), PropertySpec("ca_certificate", is_nullable=True, is_object=True), PropertySpec("auth_certificate", is_nullable=True, is_object=True), ] @@ -577,6 +584,366 @@ class MQTT5SourceUpdate(_MQTTUpdate): _type = "mqtt5" +@dataclass +class KafkaBroker(CogniteObject): + host: str + port: int + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(host=resource["host"], port=resource["port"]) + + +class KafkaSourceWrite(SourceWrite): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the write/request format of the kafka resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + bootstrap_brokers (Sequence[KafkaBroker]): List of redundant kafka brokers to connect to. + authentication (AuthenticationWrite | None): Authentication information for the kafka source. + use_tls (bool): If true, use TLS when connecting to the broker. + ca_certificate (CACertificateWrite | None): Custom certificate authority certificate to let the source use a self signed certificate. + auth_certificate (AuthCertificateWrite | None): Authentication certificate (if configured) used to authenticate to source. + """ + + _type = "kafka" + + def __init__( + self, + external_id: str, + bootstrap_brokers: Sequence[KafkaBroker], + authentication: AuthenticationWrite | None = None, + use_tls: bool = False, + ca_certificate: CACertificateWrite | None = None, + auth_certificate: AuthCertificateWrite | None = None, + ) -> None: + super().__init__(external_id) + self.bootstrap_brokers = bootstrap_brokers + self.authentication = authentication + self.use_tls = use_tls + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + bootstrap_brokers=[KafkaBroker._load(broker) for broker in resource["bootstrapBrokers"]], + authentication=AuthenticationWrite._load(resource["authentication"]) + if "authentication" in resource + else None, + use_tls=resource.get("useTls", False), + ca_certificate=CACertificateWrite._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificateWrite._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["bootstrapBrokers" if camel_case else "bootstrap_brokers"] = [ + broker.dump(camel_case) for broker in self.bootstrap_brokers + ] + if isinstance(self.authentication, AuthenticationWrite): + output["authentication"] = self.authentication.dump(camel_case) + if isinstance(self.ca_certificate, CACertificateWrite): + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if isinstance(self.auth_certificate, AuthCertificateWrite): + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +class KafkaSource(Source): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the read/response format of the kafka resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + bootstrap_brokers (Sequence[KafkaBroker]): List of redundant kafka brokers to connect to. + created_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. + last_updated_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. + authentication (MQTTAuthentication | None): Authentication information for the kafka source. + use_tls (bool): If true, use TLS when connecting to the broker. + ca_certificate (CACertificate | None): Custom certificate authority certificate to let the source use a self signed certificate. + auth_certificate (AuthCertificate | None): Authentication certificate (if configured) used to authenticate to source. + """ + + _type = "kafka" + + def __init__( + self, + external_id: str, + bootstrap_brokers: Sequence[KafkaBroker], + created_time: int, + last_updated_time: int, + authentication: MQTTAuthentication | None = None, + use_tls: bool = False, + ca_certificate: CACertificate | None = None, + auth_certificate: AuthCertificate | None = None, + ) -> None: + super().__init__(external_id) + self.bootstrap_brokers = bootstrap_brokers + self.authentication = authentication + self.use_tls = use_tls + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + self.created_time = created_time + self.last_updated_time = last_updated_time + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + bootstrap_brokers=[KafkaBroker._load(broker) for broker in resource["bootstrapBrokers"]], + authentication=MQTTAuthentication._load(resource["authentication"]) + if "authentication" in resource + else None, + use_tls=resource.get("useTls", False), + ca_certificate=CACertificate._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificate._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + ) + + def as_write(self) -> KafkaSourceWrite: + raise TypeError(f"{type(self).__name__} cannot be converted to write as id does not contain the secrets") + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + output["bootstrapBrokers" if camel_case else "bootstrap_brokers"] = [ + broker.dump(camel_case) for broker in self.bootstrap_brokers + ] + if isinstance(self.authentication, MQTTAuthentication): + output["authentication"] = self.authentication.dump(camel_case) + if isinstance(self.ca_certificate, CACertificate): + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if isinstance(self.auth_certificate, AuthCertificate): + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +class KafkaSourceUpdate(SourceUpdate): + _type = "kafka" + + class _BootstrapBrokersUpdate(CognitePrimitiveUpdate): + def set(self, value: Sequence[KafkaBroker]) -> KafkaSourceUpdate: + return self._set([broker.dump() for broker in value]) + + class _AuthenticationUpdate(CognitePrimitiveUpdate): + def set(self, value: MQTTAuthentication | None) -> KafkaSourceUpdate: + return self._set(value.dump() if value else None) + + class _UseTlsUpdate(CognitePrimitiveUpdate): + def set(self, value: bool) -> KafkaSourceUpdate: + return self._set(value) + + class _CACertificateUpdate(CognitePrimitiveUpdate): + def set(self, value: CACertificate | None) -> KafkaSourceUpdate: + return self._set(value.dump() if value else None) + + class _AuthCertificateUpdate(CognitePrimitiveUpdate): + def set(self, value: AuthCertificate | None) -> KafkaSourceUpdate: + return self._set(value.dump() if value else None) + + @property + def bootstrap_brokers(self) -> _BootstrapBrokersUpdate: + return KafkaSourceUpdate._BootstrapBrokersUpdate(self, "bootstrapBrokers") + + @property + def authentication(self) -> _AuthenticationUpdate: + return KafkaSourceUpdate._AuthenticationUpdate(self, "authentication") + + @property + def use_tls(self) -> _UseTlsUpdate: + return KafkaSourceUpdate._UseTlsUpdate(self, "useTls") + + @property + def ca_certificate(self) -> _CACertificateUpdate: + return KafkaSourceUpdate._CACertificateUpdate(self, "caCertificate") + + @property + def auth_certificate(self) -> _AuthCertificateUpdate: + return KafkaSourceUpdate._AuthCertificateUpdate(self, "authCertificate") + + @classmethod + def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]: + return [ + PropertySpec("bootstrap_brokers", is_nullable=False), + PropertySpec("authentication", is_nullable=True, is_object=True), + PropertySpec("use_tls", is_nullable=False), + PropertySpec("ca_certificate", is_nullable=True, is_object=True), + PropertySpec("auth_certificate", is_nullable=True, is_object=True), + ] + + +class RestSourceWrite(SourceWrite): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the write/request format of the rest resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + host (str): Host or IP address to connect to. + scheme (Literal["http", "https"]): Type of connection to establish. + port (int | None): Port on server to connect to. Uses default ports based on the scheme if omitted. + ca_certificate (CACertificateWrite | None): Custom certificate authority certificate to let the source use a self signed certificate. + auth_certificate (AuthCertificateWrite | None): Authentication certificate (if configured) used to authenticate to source. + """ + + _type = "rest" + + def __init__( + self, + external_id: str, + host: str, + scheme: Literal["http", "https"], + port: int | None = None, + ca_certificate: CACertificateWrite | None = None, + auth_certificate: AuthCertificateWrite | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.scheme = scheme + self.port = port + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + scheme=resource["scheme"], + port=resource.get("port"), + ca_certificate=CACertificateWrite._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificateWrite._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + if isinstance(self.ca_certificate, CACertificateWrite): + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if isinstance(self.auth_certificate, AuthCertificateWrite): + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +class RestSource(Source): + """A hosted extractor source represents an external source system on the internet. + The source resource in CDF contains all the information the extractor needs to + connect to the external source system. + + This is the read/response format of the rest resource. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + host (str): Host or IP address to connect to. + scheme (Literal["http", "https"]): Type of connection to establish. + created_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. + last_updated_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. + port (int | None): Port on server to connect to. Uses default ports based on the scheme if omitted. + ca_certificate (CACertificate | None): Custom certificate authority certificate to let the source use a self signed certificate. + auth_certificate (AuthCertificate | None): Authentication certificate (if configured) used to authenticate to source. + """ + + _type = "rest" + + def __init__( + self, + external_id: str, + host: str, + scheme: Literal["http", "https"], + created_time: int, + last_updated_time: int, + port: int | None = None, + ca_certificate: CACertificate | None = None, + auth_certificate: AuthCertificate | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.scheme = scheme + self.port = port + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + self.created_time = created_time + self.last_updated_time = last_updated_time + + @classmethod + def _load_source(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + scheme=resource["scheme"], + port=resource.get("port"), + ca_certificate=CACertificate._load(resource["caCertificate"]) if "caCertificate" in resource else None, + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + auth_certificate=AuthCertificate._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + ) + + def as_write(self) -> RestSourceWrite: + raise TypeError(f"{type(self).__name__} cannot be converted to write as id does not contain the secrets") + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + if isinstance(self.ca_certificate, CACertificate): + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if isinstance(self.auth_certificate, AuthCertificate): + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +class RestSourceUpdate(SourceUpdate): + _type = "rest" + + class _HostUpdate(CognitePrimitiveUpdate): + def set(self, value: str) -> RestSourceUpdate: + return self._set(value) + + class _SchemeUpdate(CognitePrimitiveUpdate): + def set(self, value: Literal["http", "https"]) -> RestSourceUpdate: + return self._set(value) + + class _PortUpdate(CognitePrimitiveUpdate): + def set(self, value: int) -> RestSourceUpdate: + return self._set(value) + + @property + def host(self) -> _HostUpdate: + return RestSourceUpdate._HostUpdate(self, "host") + + @property + def scheme(self) -> _SchemeUpdate: + return RestSourceUpdate._SchemeUpdate(self, "scheme") + + @property + def port(self) -> _PortUpdate: + return RestSourceUpdate._PortUpdate(self, "port") + + @classmethod + def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]: + return [ + PropertySpec("host", is_nullable=False), + PropertySpec("scheme", is_nullable=False), + PropertySpec("port", is_nullable=False), + ] + + class SourceWriteList(CogniteResourceList[SourceWrite], ExternalIDTransformerMixin): _RESOURCE = SourceWrite @@ -591,31 +958,26 @@ def as_write( _SOURCE_WRITE_CLASS_BY_TYPE: dict[str, type[SourceWrite]] = { - subclass._type: subclass # type: ignore[misc] + subclass._type: subclass # type: ignore[misc, attr-defined] for subclass in itertools.chain(SourceWrite.__subclasses__(), _MQTTSourceWrite.__subclasses__()) - if hasattr(subclass, "_type") } _SOURCE_CLASS_BY_TYPE: dict[str, type[Source]] = { - subclass._type: subclass # type: ignore[misc] + subclass._type: subclass # type: ignore[misc, attr-defined] for subclass in itertools.chain(Source.__subclasses__(), _MQTTSource.__subclasses__()) - if hasattr(subclass, "_type") } _SOURCE_UPDATE_BY_TYPE: dict[str, type[SourceUpdate]] = { subclass._type: subclass for subclass in itertools.chain(SourceUpdate.__subclasses__(), _MQTTUpdate.__subclasses__()) - if hasattr(subclass, "_type") } -_MQTTAUTHENTICATION_WRITE_CLASS_BY_TYPE: dict[str, type[MQTTAuthenticationWrite]] = { +_MQTTAUTHENTICATION_WRITE_CLASS_BY_TYPE: dict[str, type[AuthenticationWrite]] = { subclass._type: subclass # type: ignore[type-abstract] - for subclass in MQTTAuthenticationWrite.__subclasses__() - if hasattr(subclass, "_type") + for subclass in AuthenticationWrite.__subclasses__() } _MQTTAUTHENTICATION_CLASS_BY_TYPE: dict[str, type[MQTTAuthentication]] = { subclass._type: subclass # type: ignore[type-abstract] for subclass in MQTTAuthentication.__subclasses__() - if hasattr(subclass, "_type") } diff --git a/pyproject.toml b/pyproject.toml index f1d1b01e9..a3f76379c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.62.2" +version = "7.62.3" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com"