From bb27dd244b38286106e36f3669c9a771f92dd876 Mon Sep 17 00:00:00 2001 From: anders-albert Date: Thu, 29 Aug 2024 10:00:11 +0200 Subject: [PATCH] refactor; MQTT data classes --- .../data_classes/hosted_extractors/sources.py | 181 +++++++++++++++++- 1 file changed, 178 insertions(+), 3 deletions(-) diff --git a/cognite/client/data_classes/hosted_extractors/sources.py b/cognite/client/data_classes/hosted_extractors/sources.py index 0b82be72c3..dd0ef7d1b4 100644 --- a/cognite/client/data_classes/hosted_extractors/sources.py +++ b/cognite/client/data_classes/hosted_extractors/sources.py @@ -1,11 +1,13 @@ from __future__ import annotations from abc import ABC -from typing import TYPE_CHECKING, ClassVar, Literal +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, ClassVar, Literal -from typing_extensions import TypeAlias +from typing_extensions import Self, TypeAlias from cognite.client.data_classes._base import ( + CogniteObject, CogniteResource, CogniteResourceList, ExternalIDTransformerMixin, @@ -15,7 +17,7 @@ ) if TYPE_CHECKING: - pass + from cognite.client import CogniteClient SourceType: TypeAlias = Literal["mqtt5", "mqtt3", "eventhub"] @@ -139,6 +141,179 @@ def as_write(self, key_value: str | None = None) -> EventHubSourceWrite: ) +@dataclass +class MQTTAuthenticationWrite(CogniteObject, ABC): + _type: ClassVar[str] + + +@dataclass +class BasicMQTTAuthenticationWrite(MQTTAuthenticationWrite): + username: str + password: str | None + + +@dataclass +class CACertificateWrite(CogniteObject): + type: Literal["der", "pem"] + certificate: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(type=resource["type"], certificate=resource["certificate"]) + + +@dataclass +class AuthCertificateWrite(CogniteObject): + type: Literal["der", "pem"] + certificate: str + key: str + key_password: str | None + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + type=resource["type"], + certificate=resource["certificate"], + key=resource["key"], + key_password=resource.get("keyPassword"), + ) + + +class MQTTSourceWrite(SourceWrite): + def __init__( + self, + external_id: str, + host: str, + port: int | None = None, + authentication: MQTTAuthenticationWrite | None = None, + useTls: bool = False, + ca_certificate: CACertificateWrite | None = None, + auth_certificate: AuthCertificateWrite | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.port = port + self.authentication = authentication + self.useTls = useTls + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + port=resource.get("port"), + authentication=MQTTAuthenticationWrite._load(resource["authentication"]) + if "authentication" in resource + else None, + useTls=resource.get("useTls", False), + ca_certificate=CACertificateWrite._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificateWrite._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + if self.authentication: + output["authentication"] = self.authentication.dump(camel_case) + if self.ca_certificate: + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if self.auth_certificate: + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + +@dataclass +class MQTTAuthentication(CogniteObject, ABC): + _type: ClassVar[str] + + +@dataclass +class BasicMQTTAuthentication(MQTTAuthentication): + username: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(username=resource["username"]) + + +@dataclass +class CACertificate(CogniteObject): + thumbprint: str + expires_at: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(thumbprint=resource["thumbprint"], expires_at=resource["expiresAt"]) + + +@dataclass +class AuthCertificate(CogniteObject): + thumbprint: str + expires_at: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(thumbprint=resource["thumbprint"], expires_at=resource["expiresAt"]) + + +class MQTTSource(Source): + def __init__( + self, + external_id: str, + host: str, + created_time: int, + last_updated_time: int, + port: int | None = None, + authentication: MQTTAuthentication | None = None, + useTls: bool = False, + ca_certificate: CACertificate | None = None, + auth_certificate: AuthCertificate | None = None, + ) -> None: + super().__init__(external_id) + self.host = host + self.port = port + self.authentication = authentication + self.useTls = useTls + self.ca_certificate = ca_certificate + self.auth_certificate = auth_certificate + self.created_time = created_time + self.last_updated_time = last_updated_time + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + host=resource["host"], + port=resource.get("port"), + authentication=MQTTAuthentication._load(resource["authentication"]) + if "authentication" in resource + else None, + useTls=resource.get("useTls", False), + ca_certificate=CACertificate._load(resource["caCertificate"]) if "caCertificate" in resource else None, + auth_certificate=AuthCertificate._load(resource["authCertificate"]) + if "authCertificate" in resource + else None, + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + ) + + def as_write(self) -> MQTTSourceWrite: + raise TypeError(f"{type(self).__name__} cannot be converted to write as id does not contain the secrets") + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case) + if self.authentication: + output["authentication"] = self.authentication.dump(camel_case) + if self.ca_certificate: + output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case) + if self.auth_certificate: + output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case) + return output + + class SourceWriteList(CogniteResourceList[SourceWrite], ExternalIDTransformerMixin): _RESOURCE = SourceWrite