Skip to content

Commit

Permalink
refactor; MQTT data classes
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino committed Aug 29, 2024
1 parent 28e432b commit bb27dd2
Showing 1 changed file with 178 additions and 3 deletions.
181 changes: 178 additions & 3 deletions cognite/client/data_classes/hosted_extractors/sources.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

from abc import ABC
from typing import TYPE_CHECKING, ClassVar, Literal
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ClassVar, Literal

from typing_extensions import TypeAlias
from typing_extensions import Self, TypeAlias

from cognite.client.data_classes._base import (
CogniteObject,
CogniteResource,
CogniteResourceList,
ExternalIDTransformerMixin,
Expand All @@ -15,7 +17,7 @@
)

if TYPE_CHECKING:
pass
from cognite.client import CogniteClient


SourceType: TypeAlias = Literal["mqtt5", "mqtt3", "eventhub"]
Expand Down Expand Up @@ -139,6 +141,179 @@ def as_write(self, key_value: str | None = None) -> EventHubSourceWrite:
)


@dataclass
class MQTTAuthenticationWrite(CogniteObject, ABC):
_type: ClassVar[str]


@dataclass
class BasicMQTTAuthenticationWrite(MQTTAuthenticationWrite):
username: str
password: str | None


@dataclass
class CACertificateWrite(CogniteObject):
type: Literal["der", "pem"]
certificate: str

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(type=resource["type"], certificate=resource["certificate"])


@dataclass
class AuthCertificateWrite(CogniteObject):
type: Literal["der", "pem"]
certificate: str
key: str
key_password: str | None

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(
type=resource["type"],
certificate=resource["certificate"],
key=resource["key"],
key_password=resource.get("keyPassword"),
)


class MQTTSourceWrite(SourceWrite):
def __init__(
self,
external_id: str,
host: str,
port: int | None = None,
authentication: MQTTAuthenticationWrite | None = None,
useTls: bool = False,
ca_certificate: CACertificateWrite | None = None,
auth_certificate: AuthCertificateWrite | None = None,
) -> None:
super().__init__(external_id)
self.host = host
self.port = port
self.authentication = authentication
self.useTls = useTls
self.ca_certificate = ca_certificate
self.auth_certificate = auth_certificate

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(
external_id=resource["externalId"],
host=resource["host"],
port=resource.get("port"),
authentication=MQTTAuthenticationWrite._load(resource["authentication"])
if "authentication" in resource
else None,
useTls=resource.get("useTls", False),
ca_certificate=CACertificateWrite._load(resource["caCertificate"]) if "caCertificate" in resource else None,
auth_certificate=AuthCertificateWrite._load(resource["authCertificate"])
if "authCertificate" in resource
else None,
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
output = super().dump(camel_case)
if self.authentication:
output["authentication"] = self.authentication.dump(camel_case)
if self.ca_certificate:
output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case)
if self.auth_certificate:
output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case)
return output


@dataclass
class MQTTAuthentication(CogniteObject, ABC):
_type: ClassVar[str]


@dataclass
class BasicMQTTAuthentication(MQTTAuthentication):
username: str

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(username=resource["username"])


@dataclass
class CACertificate(CogniteObject):
thumbprint: str
expires_at: str

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(thumbprint=resource["thumbprint"], expires_at=resource["expiresAt"])


@dataclass
class AuthCertificate(CogniteObject):
thumbprint: str
expires_at: str

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(thumbprint=resource["thumbprint"], expires_at=resource["expiresAt"])


class MQTTSource(Source):
def __init__(
self,
external_id: str,
host: str,
created_time: int,
last_updated_time: int,
port: int | None = None,
authentication: MQTTAuthentication | None = None,
useTls: bool = False,
ca_certificate: CACertificate | None = None,
auth_certificate: AuthCertificate | None = None,
) -> None:
super().__init__(external_id)
self.host = host
self.port = port
self.authentication = authentication
self.useTls = useTls
self.ca_certificate = ca_certificate
self.auth_certificate = auth_certificate
self.created_time = created_time
self.last_updated_time = last_updated_time

@classmethod
def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self:
return cls(
external_id=resource["externalId"],
host=resource["host"],
port=resource.get("port"),
authentication=MQTTAuthentication._load(resource["authentication"])
if "authentication" in resource
else None,
useTls=resource.get("useTls", False),
ca_certificate=CACertificate._load(resource["caCertificate"]) if "caCertificate" in resource else None,
auth_certificate=AuthCertificate._load(resource["authCertificate"])
if "authCertificate" in resource
else None,
created_time=resource["createdTime"],
last_updated_time=resource["lastUpdatedTime"],
)

def as_write(self) -> MQTTSourceWrite:
raise TypeError(f"{type(self).__name__} cannot be converted to write as id does not contain the secrets")

def dump(self, camel_case: bool = True) -> dict[str, Any]:
output = super().dump(camel_case)
if self.authentication:
output["authentication"] = self.authentication.dump(camel_case)
if self.ca_certificate:
output["caCertificate" if camel_case else "ca_certificate"] = self.ca_certificate.dump(camel_case)
if self.auth_certificate:
output["authCertificate" if camel_case else "auth_certificate"] = self.auth_certificate.dump(camel_case)
return output


class SourceWriteList(CogniteResourceList[SourceWrite], ExternalIDTransformerMixin):
_RESOURCE = SourceWrite

Expand Down

0 comments on commit bb27dd2

Please sign in to comment.