Skip to content

Commit

Permalink
[CDF-22379] Hosted Extractors: Source (#1893)
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino authored Sep 3, 2024
1 parent 02e4305 commit 9c16977
Show file tree
Hide file tree
Showing 19 changed files with 1,120 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.58.2] - 2024-09-03
### Added
- [Feature Preview - alpha] Support for `client.hosted_extractors.sources`.

## [7.58.1] - 2024-09-03
### Fixed
- [Feature Preview - beta] data workflows: `workflowExecutionId` in `cognite.client.data_classes.workflows.WorkflowTriggerRun`
Expand Down
16 changes: 16 additions & 0 deletions cognite/client/_api/hosted_extractors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from cognite.client._api.hosted_extractors.sources import SourcesAPI
from cognite.client._api_client import APIClient

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


class HostedExtractorsAPI(APIClient):
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.sources = SourcesAPI(config, api_version, cognite_client)
275 changes: 275 additions & 0 deletions cognite/client/_api/hosted_extractors/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, Literal, Sequence, overload

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes._base import CogniteResource, PropertySpec
from cognite.client.data_classes.hosted_extractors.sources import Source, SourceList, SourceUpdate, SourceWrite
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import ClientConfig, CogniteClient


class SourcesAPI(APIClient):
_RESOURCE_PATH = "/hostedextractors/sources"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
)
self._CREATE_LIMIT = 100
self._LIST_LIMIT = 100
self._RETRIEVE_LIMIT = 100
self._DELETE_LIMIT = 100
self._UPDATE_LIMIT = 100

@overload
def __call__(
self,
chunk_size: None = None,
limit: int | None = None,
) -> Iterator[Source]: ...

@overload
def __call__(
self,
chunk_size: int,
limit: int | None = None,
) -> Iterator[SourceList]: ...

def __call__(
self,
chunk_size: int | None = None,
limit: int | None = None,
) -> Iterator[Source] | Iterator[SourceList]:
"""Iterate over sources
Fetches sources as they are iterated over, so you keep a limited number of sources in memory.
Args:
chunk_size (int | None): Number of sources to return in each chunk. Defaults to yielding one source a time.
limit (int | None): Maximum number of sources to return. Defaults to returning all items.
Returns:
Iterator[Source] | Iterator[SourceList]: yields Source one by one if chunk_size is not specified, else SourceList objects.
"""
self._warning.warn()

return self._list_generator(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
method="GET",
chunk_size=chunk_size,
limit=limit,
headers={"cdf-version": "beta"},
)

def __iter__(self) -> Iterator[Source]:
"""Iterate over sources
Fetches sources as they are iterated over, so you keep a limited number of sources in memory.
Returns:
Iterator[Source]: yields Source one by one.
"""
return self()

@overload
def retrieve(self, external_ids: str, ignore_unknown_ids: bool = False) -> Source: ...

@overload
def retrieve(self, external_ids: SequenceNotStr[str], ignore_unknown_ids: bool = False) -> SourceList: ...

def retrieve(
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False
) -> Source | SourceList:
"""`Retrieve one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/retrieve_sources>`_
Args:
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type.
ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception.
Returns:
Source | SourceList: Requested sources
Examples:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.hosted_extractors.sources.retrieve('myMQTTSource')
Get multiple sources by id:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.hosted_extractors.sources.retrieve(["myMQTTSource", "MyEventHubSource"], ignore_unknown_ids=True)
"""
self._warning.warn()
return self._retrieve_multiple(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
identifiers=IdentifierSequence.load(external_ids=external_ids),
ignore_unknown_ids=ignore_unknown_ids,
headers={"cdf-version": "beta"},
)

def delete(
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False, force: bool = False
) -> None:
"""`Delete one or more sources <https://developer.cognite.com/api#tag/Sources/operation/delete_sources>`_
Args:
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type.
ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception.
force (bool): Delete any jobs associated with each item.
Examples:
Delete sources by id::
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.hosted_extractors.sources.delete(["myMQTTSource", "MyEventHubSource"])
"""
self._warning.warn()
extra_body_fields: dict[str, Any] = {}
if ignore_unknown_ids:
extra_body_fields["ignoreUnknownIds"] = True
if force:
extra_body_fields["force"] = True

self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_ids),
wrap_ids=True,
headers={"cdf-version": "beta"},
extra_body_fields=extra_body_fields or None,
)

@overload
def create(self, items: SourceWrite) -> Source: ...

@overload
def create(self, items: Sequence[SourceWrite]) -> SourceList: ...

def create(self, items: SourceWrite | Sequence[SourceWrite]) -> Source | SourceList:
"""`Create one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/create_sources>`_
Args:
items (SourceWrite | Sequence[SourceWrite]): Source(s) to create.
Returns:
Source | SourceList: Created source(s)
Examples:
Create new source:
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceWrite
>>> client = CogniteClient()
>>> source = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value')
>>> res = client.hosted_extractors.sources.create(source)
"""
self._warning.warn()
return self._create_multiple(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
items=items, # type: ignore[arg-type]
input_resource_cls=SourceWrite,
headers={"cdf-version": "beta"},
)

@overload
def update(self, items: SourceWrite | SourceUpdate) -> Source: ...

@overload
def update(self, items: Sequence[SourceWrite | SourceUpdate]) -> SourceList: ...

def update(self, items: SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]) -> Source | SourceList:
"""`Update one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/update_sources>`_
Args:
items (SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]): Source(s) to update.
Returns:
Source | SourceList: Updated source(s)
Examples:
Update source:
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceUpdate
>>> client = CogniteClient()
>>> source = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub")
>>> res = client.hosted_extractors.sources.update(source)
"""
self._warning.warn()
return self._update_multiple(
items=items, # type: ignore[arg-type]
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
update_cls=SourceUpdate,
headers={"cdf-version": "beta"},
)

@classmethod
def _convert_resource_to_patch_object(
cls,
resource: CogniteResource,
update_attributes: list[PropertySpec],
mode: Literal["replace_ignore_null", "patch", "replace"] = "replace_ignore_null",
) -> dict[str, dict[str, dict]]:
output = super()._convert_resource_to_patch_object(resource, update_attributes, mode)
if hasattr(resource, "_type"):
output["type"] = resource._type
return output

def list(
self,
limit: int | None = DEFAULT_LIMIT_READ,
) -> SourceList:
"""`List sources <https://developer.cognite.com/api#tag/Sources/operation/list_sources>`_
Args:
limit (int | None): Maximum number of sources to return. Defaults to 25. Set to -1, float("inf") or None to return all items.
Returns:
SourceList: List of requested sources
Examples:
List sources:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> source_list = client.hosted_extractors.sources.list(limit=5)
Iterate over sources::
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> for source in client.hosted_extractors.sources:
... source # do something with the source
Iterate over chunks of sources to reduce memory load::
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> for source_list in client.hosted_extractors.sources(chunk_size=25):
... source_list # do something with the sources
"""
self._warning.warn()
return self._list(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
method="GET",
limit=limit,
headers={"cdf-version": "beta"},
)
7 changes: 6 additions & 1 deletion cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class APIClient:
"transformations/(filter|byids|jobs/byids|schedules/byids|query/run)",
"extpipes/(list|byids|runs/list)",
"workflows/.*",
"hostedextractors/.*",
)
)
]
Expand Down Expand Up @@ -1038,7 +1039,11 @@ def _update_multiple(
for index, item in enumerate(item_list):
if isinstance(item, CogniteResource):
patch_objects.append(
self._convert_resource_to_patch_object(item, update_cls._get_update_properties(item), mode)
self._convert_resource_to_patch_object(
item,
update_cls._get_update_properties(item),
mode,
)
)
elif isinstance(item, CogniteUpdate):
patch_objects.append(item.dump(camel_case=True))
Expand Down
2 changes: 2 additions & 0 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cognite.client._api.files import FilesAPI
from cognite.client._api.functions import FunctionsAPI
from cognite.client._api.geospatial import GeospatialAPI
from cognite.client._api.hosted_extractors import HostedExtractorsAPI
from cognite.client._api.iam import IAMAPI
from cognite.client._api.labels import LabelsAPI
from cognite.client._api.raw import RawAPI
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, config: ClientConfig | None = None) -> None:
self.templates = TemplatesAPI(self._config, self._API_VERSION, self)
self.vision = VisionAPI(self._config, self._API_VERSION, self)
self.extraction_pipelines = ExtractionPipelinesAPI(self._config, self._API_VERSION, self)
self.hosted_extractors = HostedExtractorsAPI(self._config, self._API_VERSION, self)
self.transformations = TransformationsAPI(self._config, self._API_VERSION, self)
self.diagrams = DiagramsAPI(self._config, self._API_VERSION, self)
self.annotations = AnnotationsAPI(self._config, self._API_VERSION, self)
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.58.1"
__version__ = "7.58.2"
__api_subversion__ = "20230101"
2 changes: 1 addition & 1 deletion cognite/client/data_classes/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def __init__(self, update_object: T_CogniteUpdate, name: str) -> None:
self._update_object = update_object
self._name = name

def _set(self, value: None | str | int | bool) -> T_CogniteUpdate:
def _set(self, value: None | str | int | bool | dict) -> T_CogniteUpdate:
if value is None:
self._update_object._set_null(self._name)
else:
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/data_classes/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def set(self, value: Any) -> DataPointSubscriptionUpdate:

class _FilterDataPointSubscriptionUpdate(CognitePrimitiveUpdate):
def set(self, value: Filter) -> DataPointSubscriptionUpdate:
return self._set(value.dump()) # type: ignore[arg-type]
return self._set(value.dump())

class _ListDataPointSubscriptionUpdate(CogniteListUpdate):
def set(self, value: list) -> DataPointSubscriptionUpdate:
Expand Down
Loading

0 comments on commit 9c16977

Please sign in to comment.