Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDF-22379] Hosted Extractors: Source #1893

Merged
merged 34 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3f6ca7b
refactor: added source data classes
doctrino Aug 29, 2024
f03e006
refactor: setup shell for API
doctrino Aug 29, 2024
c663cd2
refactor: fix Source classes
doctrino Aug 29, 2024
1979756
refactor; MQTT data classes
doctrino Aug 29, 2024
19f6d9d
refactor: robust loading
doctrino Aug 29, 2024
1ebb1fc
feat: Implemented CRD + list method
doctrino Aug 29, 2024
c6a9b97
feat: Added update classes
doctrino Aug 29, 2024
1800bd2
fix: bugs in data classes
doctrino Aug 29, 2024
0431129
tests: updated tests
doctrino Aug 29, 2024
1757dc8
refactor: created ingegration tests
doctrino Aug 29, 2024
18c0e3d
refactor: added warnings
doctrino Aug 29, 2024
3699dde
refactor: updated script for adding capability
doctrino Aug 29, 2024
ec5de1e
fix: small bugs
doctrino Aug 29, 2024
1712d8d
refactor: Only unknowns on read
doctrino Aug 29, 2024
9a63e58
fix: update with write object
doctrino Aug 29, 2024
1cec0fa
docs; updated docs
doctrino Aug 29, 2024
9169272
build: changelog
doctrino Aug 29, 2024
bd3e18a
fix: introduced bug
doctrino Aug 29, 2024
e407b82
style: happier mypy
doctrino Aug 29, 2024
ce959cb
style: mypy
doctrino Aug 29, 2024
3c7f447
styl: mypy
doctrino Aug 29, 2024
09980a5
refactor: adjust
doctrino Aug 29, 2024
aa39c78
refactor: added missing
doctrino Aug 29, 2024
1f2e914
refactor: review feedback
doctrino Sep 2, 2024
958b41c
build: docs
doctrino Sep 2, 2024
b65d68f
refactor: review feedback
doctrino Sep 2, 2024
f11d1b7
refactor: better solution to Update object
doctrino Sep 3, 2024
c9f699f
Merge remote-tracking branch 'origin/master' into hosted-extractors
doctrino Sep 3, 2024
020313a
fix: moved to correct location
doctrino Sep 3, 2024
5652bfc
fix: introducde bug
doctrino Sep 3, 2024
448054e
fix: stuff
doctrino Sep 3, 2024
9ce9540
tests: fix
doctrino Sep 3, 2024
6e16c0b
Merge remote-tracking branch 'origin/master' into hosted-extractors
doctrino Sep 3, 2024
2d2c72c
build: bump
doctrino Sep 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.55.2] - 2024-08-29
### Added
- [Feature Preview - beta] Support for `client.hosted_extractors.sources`.
doctrino marked this conversation as resolved.
Show resolved Hide resolved

## [7.55.1] - 2024-08-29
### Fixed
- Missing exports for workflow triggers
Expand Down
1 change: 1 addition & 0 deletions cognite/client/_api/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def _convert_resource_to_patch_object(
resource: CogniteResource,
update_attributes: list[PropertySpec],
mode: Literal["replace_ignore_null", "patch", "replace"] = "replace_ignore_null",
identifying_properties: dict[str, Any] | None = None,
) -> dict[str, dict[str, dict]]:
if not isinstance(resource, Annotation):
return APIClient._convert_resource_to_patch_object(resource, update_attributes)
Expand Down
16 changes: 16 additions & 0 deletions cognite/client/_api/hosted_extractors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from cognite.client._api.hosted_extractors.sources import SourcesAPI
from cognite.client._api_client import APIClient

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


class HostedExtractorsAPI(APIClient):
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.sources = SourcesAPI(config, api_version, cognite_client)
263 changes: 263 additions & 0 deletions cognite/client/_api/hosted_extractors/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, Sequence, overload

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.hosted_extractors.sources import Source, SourceList, SourceUpdate, SourceWrite
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import ClientConfig, CogniteClient


class SourcesAPI(APIClient):
_RESOURCE_PATH = "/hostedextractors/sources"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add rest and kafka which are alpha, the eventhub and mqtt are beta.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR? Or a later one?

Copy link
Contributor Author

@doctrino doctrino Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on, this is 1 /5 while I added kafka and rest in 5/5

)
Comment on lines +23 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not move this to HostedExtractorsAPI.__init__?

Suggested change
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
)
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
)

Copy link
Contributor Author

@doctrino doctrino Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just convenience. Does not matter much as it will be removed in the future.

self._CREATE_LIMIT = 100
self._LIST_LIMIT = 100
self._RETRIEVE_LIMIT = 100
self._DELETE_LIMIT = 100
self._UPDATE_LIMIT = 100

@overload
def __call__(
self,
chunk_size: None = None,
limit: int | None = None,
) -> Iterator[Source]: ...

@overload
def __call__(
self,
chunk_size: int,
limit: int | None = None,
) -> Iterator[SourceList]: ...

def __call__(
self,
chunk_size: int | None = None,
limit: int | None = None,
) -> Iterator[Source] | Iterator[SourceList]:
"""Iterate over sources

Fetches sources as they are iterated over, so you keep a limited number of spaces in memory.
doctrino marked this conversation as resolved.
Show resolved Hide resolved

Args:
chunk_size (int | None): Number of sources to return in each chunk. Defaults to yielding one source a time.
limit (int | None): Maximum number of sources to return. Defaults to returning all items.

Returns:
Iterator[Source] | Iterator[SourceList]: yields Source one by one if chunk_size is not specified, else SourceList objects.
"""
self._warning.warn()

return self._list_generator(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Source abstract?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many types of sources. The Source object itself is not a concrete object.

method="GET",
chunk_size=chunk_size,
limit=limit,
headers={"cdf-version": "beta"},
)

def __iter__(self) -> Iterator[Source]:
"""Iterate over sources

Fetches sources as they are iterated over, so you keep a limited number of spaces in memory.
doctrino marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Iterator[Source]: yields Source one by one.
"""
return self()

@overload
def retrieve(self, external_ids: str, ignore_unknown_ids: bool = False) -> Source: ...

@overload
def retrieve(self, external_ids: SequenceNotStr[str], ignore_unknown_ids: bool = False) -> SourceList: ...

def retrieve(
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False
) -> Source | SourceList:
"""`Retrieve one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/retrieve_sources>`_

Args:
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type.
ignore_unknown_ids (bool): No description.
doctrino marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Source | SourceList: Requested sources

Examples:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.hosted_extractors.sources.retrieve('myMQTTSource')

Get multiple spaces by id:
doctrino marked this conversation as resolved.
Show resolved Hide resolved

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.hosted_extractors.sources.retrieve(["myMQTTSource", "MyEvenHubSource"], ignore_unknown_ids=True)
doctrino marked this conversation as resolved.
Show resolved Hide resolved

"""
self._warning.warn()
return self._retrieve_multiple(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
identifiers=IdentifierSequence.load(external_ids=external_ids),
ignore_unknown_ids=ignore_unknown_ids,
headers={"cdf-version": "beta"},
doctrino marked this conversation as resolved.
Show resolved Hide resolved
)

def delete(
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False, force: bool = False
) -> None:
"""`Delete one or more sources <https://developer.cognite.com/api#tag/Sources/operation/delete_sources>`_

Args:
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type.
ignore_unknown_ids (bool): No description.
force (bool): No description.
doctrino marked this conversation as resolved.
Show resolved Hide resolved
Examples:

Delete sources by id::

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.hosted_extractors.sources.delete(spaces=["myMQTTSource", "MyEvenHubSource"])
doctrino marked this conversation as resolved.
Show resolved Hide resolved
"""
self._warning.warn()
extra_body_fields: dict[str, Any] = {}
if ignore_unknown_ids:
extra_body_fields["ignoreUnknownIds"] = True
if force:
extra_body_fields["force"] = True

self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_ids),
wrap_ids=True,
returns_items=False,
doctrino marked this conversation as resolved.
Show resolved Hide resolved
headers={"cdf-version": "beta"},
extra_body_fields=extra_body_fields or None,
doctrino marked this conversation as resolved.
Show resolved Hide resolved
)

@overload
def create(self, items: SourceWrite) -> Source: ...

@overload
def create(self, items: Sequence[SourceWrite]) -> SourceList: ...

def create(self, items: SourceWrite | Sequence[SourceWrite]) -> Source | SourceList:
"""`Create one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/create_sources>`_

Args:
items (SourceWrite | Sequence[SourceWrite]): Space | Sequence[Space]): Source(s) to create.

Returns:
Source | SourceList: Created source(s)

Examples:

Create new source:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceWrite
>>> client = CogniteClient()
>>> source = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value')
>>> res = client.hosted_extractors.sources.create(source)
"""
self._warning.warn()
return self._create_multiple(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
items=items, # type: ignore[arg-type]
input_resource_cls=SourceWrite,
headers={"cdf-version": "beta"},
)

@overload
def update(self, items: SourceWrite | SourceUpdate) -> Source: ...

@overload
def update(self, items: Sequence[SourceWrite | SourceUpdate]) -> SourceList: ...

def update(self, items: SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]) -> Source | SourceList:
"""`Update one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/update_sources>`_

Args:
items (SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]): Space | Sequence[Space]): Source(s) to update.
doctrino marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Source | SourceList: Updated source(s)

Examples:

Update source:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceUpdate
>>> client = CogniteClient()
>>> source = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub")
>>> res = client.hosted_extractors.sources.update(source)
"""
self._warning.warn()
return self._update_multiple(
items=items, # type: ignore[arg-type]
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
update_cls=SourceUpdate,
headers={"cdf-version": "beta"},
)

def list(
self,
limit: int | None = DEFAULT_LIMIT_READ,
) -> SourceList:
"""`List sources <https://developer.cognite.com/api#tag/Sources/operation/list_sources>`_

Args:
limit (int | None): Maximum number of sources to return. Defaults to 25. Set to -1, float("inf") or None to return all items.

Returns:
SourceList: List of requested sources

Examples:

List sources:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> space_list = client.hosted_extractors.sources.list(limit=5)

Iterate over sources::

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> for source in client.hosted_extractors.sources:
... source # do something with the source

Iterate over chunks of sources to reduce memory load::

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> for source_list in client.hosted_extractors.sources(chunk_size=25):
... source_list # do something with the spaces
doctrino marked this conversation as resolved.
Show resolved Hide resolved
"""
self._warning.warn()
return self._list(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
method="GET",
limit=limit,
headers={"cdf-version": "beta"},
)
12 changes: 11 additions & 1 deletion cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class APIClient:
"transformations/(filter|byids|jobs/byids|schedules/byids|query/run)",
"extpipes/(list|byids|runs/list)",
"workflows/.*",
"hostedextractors/.*",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now also retries delete and update of sources, is that intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

)
)
]
Expand Down Expand Up @@ -1038,7 +1039,12 @@ def _update_multiple(
for index, item in enumerate(item_list):
if isinstance(item, CogniteResource):
patch_objects.append(
self._convert_resource_to_patch_object(item, update_cls._get_update_properties(item), mode)
self._convert_resource_to_patch_object(
item,
update_cls._get_update_properties(item),
mode,
update_cls._get_extra_identifying_properties(item),
doctrino marked this conversation as resolved.
Show resolved Hide resolved
)
)
elif isinstance(item, CogniteUpdate):
patch_objects.append(item.dump(camel_case=True))
Expand Down Expand Up @@ -1218,6 +1224,7 @@ def _convert_resource_to_patch_object(
resource: CogniteResource,
update_attributes: list[PropertySpec],
mode: Literal["replace_ignore_null", "patch", "replace"] = "replace_ignore_null",
identifying_properties: dict[str, Any] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the word "identifying" come from? Is there a more general term we could use in case we need to add "extra-but-not-identifying-properties" in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried another solution. Wdyt?

) -> dict[str, dict[str, dict]]:
dumped_resource = resource.dump(camel_case=True)
has_id = "id" in dumped_resource
Expand All @@ -1233,6 +1240,9 @@ def _convert_resource_to_patch_object(
elif has_external_id:
patch_object["externalId"] = dumped_resource.pop("externalId")

if identifying_properties:
patch_object.update(identifying_properties)

update: dict[str, dict] = cls._clear_all_attributes(update_attributes) if mode == "replace" else {}

update_attribute_by_name = {prop.name: prop for prop in update_attributes}
Expand Down
2 changes: 2 additions & 0 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cognite.client._api.files import FilesAPI
from cognite.client._api.functions import FunctionsAPI
from cognite.client._api.geospatial import GeospatialAPI
from cognite.client._api.hosted_extractors import HostedExtractorsAPI
from cognite.client._api.iam import IAMAPI
from cognite.client._api.labels import LabelsAPI
from cognite.client._api.raw import RawAPI
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, config: ClientConfig | None = None) -> None:
self.templates = TemplatesAPI(self._config, self._API_VERSION, self)
self.vision = VisionAPI(self._config, self._API_VERSION, self)
self.extraction_pipelines = ExtractionPipelinesAPI(self._config, self._API_VERSION, self)
self.hosted_extractors = HostedExtractorsAPI(self._config, self._API_VERSION, self)
self.transformations = TransformationsAPI(self._config, self._API_VERSION, self)
self.diagrams = DiagramsAPI(self._config, self._API_VERSION, self)
self.annotations = AnnotationsAPI(self._config, self._API_VERSION, self)
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.55.1"
__version__ = "7.55.2"
__api_subversion__ = "20230101"
8 changes: 7 additions & 1 deletion cognite/client/data_classes/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,12 @@ def dump(self, camel_case: Literal[True] = True) -> dict[str, Any]:
def _get_update_properties(cls, item: CogniteResource | None = None) -> list[PropertySpec]:
raise NotImplementedError

@classmethod
def _get_extra_identifying_properties(cls, item: CogniteResource | None = None) -> dict[str, Any]:
# This method is used to provide additional identifying properties for the update object.
# It is intended to be overridden by subclasses that need to provide additional identifying properties.
doctrino marked this conversation as resolved.
Show resolved Hide resolved
return {}


T_CogniteUpdate = TypeVar("T_CogniteUpdate", bound=CogniteUpdate)

Expand All @@ -534,7 +540,7 @@ def __init__(self, update_object: T_CogniteUpdate, name: str) -> None:
self._update_object = update_object
self._name = name

def _set(self, value: None | str | int | bool) -> T_CogniteUpdate:
def _set(self, value: None | str | int | bool | dict) -> T_CogniteUpdate:
if value is None:
self._update_object._set_null(self._name)
else:
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/data_classes/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def set(self, value: Any) -> DataPointSubscriptionUpdate:

class _FilterDataPointSubscriptionUpdate(CognitePrimitiveUpdate):
def set(self, value: Filter) -> DataPointSubscriptionUpdate:
return self._set(value.dump()) # type: ignore[arg-type]
return self._set(value.dump())

class _ListDataPointSubscriptionUpdate(CogniteListUpdate):
def set(self, value: list) -> DataPointSubscriptionUpdate:
Expand Down
Loading