Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDF-22379] Hosted Extractors: Source #1893

Merged
merged 34 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3f6ca7b
refactor: added source data classes
doctrino Aug 29, 2024
f03e006
refactor: setup shell for API
doctrino Aug 29, 2024
c663cd2
refactor: fix Source classes
doctrino Aug 29, 2024
1979756
refactor; MQTT data classes
doctrino Aug 29, 2024
19f6d9d
refactor: robust loading
doctrino Aug 29, 2024
1ebb1fc
feat: Implemented CRD + list method
doctrino Aug 29, 2024
c6a9b97
feat: Added update classes
doctrino Aug 29, 2024
1800bd2
fix: bugs in data classes
doctrino Aug 29, 2024
0431129
tests: updated tests
doctrino Aug 29, 2024
1757dc8
refactor: created ingegration tests
doctrino Aug 29, 2024
18c0e3d
refactor: added warnings
doctrino Aug 29, 2024
3699dde
refactor: updated script for adding capability
doctrino Aug 29, 2024
ec5de1e
fix: small bugs
doctrino Aug 29, 2024
1712d8d
refactor: Only unknowns on read
doctrino Aug 29, 2024
9a63e58
fix: update with write object
doctrino Aug 29, 2024
1cec0fa
docs; updated docs
doctrino Aug 29, 2024
9169272
build: changelog
doctrino Aug 29, 2024
bd3e18a
fix: introduced bug
doctrino Aug 29, 2024
e407b82
style: happier mypy
doctrino Aug 29, 2024
ce959cb
style: mypy
doctrino Aug 29, 2024
3c7f447
styl: mypy
doctrino Aug 29, 2024
09980a5
refactor: adjust
doctrino Aug 29, 2024
aa39c78
refactor: added missing
doctrino Aug 29, 2024
1f2e914
refactor: review feedback
doctrino Sep 2, 2024
958b41c
build: docs
doctrino Sep 2, 2024
b65d68f
refactor: review feedback
doctrino Sep 2, 2024
f11d1b7
refactor: better solution to Update object
doctrino Sep 3, 2024
c9f699f
Merge remote-tracking branch 'origin/master' into hosted-extractors
doctrino Sep 3, 2024
020313a
fix: moved to correct location
doctrino Sep 3, 2024
5652bfc
fix: introducde bug
doctrino Sep 3, 2024
448054e
fix: stuff
doctrino Sep 3, 2024
9ce9540
tests: fix
doctrino Sep 3, 2024
6e16c0b
Merge remote-tracking branch 'origin/master' into hosted-extractors
doctrino Sep 3, 2024
2d2c72c
build: bump
doctrino Sep 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.58.2] - 2024-09-03
### Added
- [Feature Preview - alpha] Support for `client.hosted_extractors.sources`.

## [7.58.1] - 2024-09-03
### Fixed
- [Feature Preview - beta] data workflows: `workflowExecutionId` in `cognite.client.data_classes.workflows.WorkflowTriggerRun`
Expand Down
16 changes: 16 additions & 0 deletions cognite/client/_api/hosted_extractors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from cognite.client._api.hosted_extractors.sources import SourcesAPI
from cognite.client._api_client import APIClient

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


class HostedExtractorsAPI(APIClient):
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.sources = SourcesAPI(config, api_version, cognite_client)
275 changes: 275 additions & 0 deletions cognite/client/_api/hosted_extractors/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, Literal, Sequence, overload

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes._base import CogniteResource, PropertySpec
from cognite.client.data_classes.hosted_extractors.sources import Source, SourceList, SourceUpdate, SourceWrite
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import ClientConfig, CogniteClient


class SourcesAPI(APIClient):
_RESOURCE_PATH = "/hostedextractors/sources"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add rest and kafka which are alpha, the eventhub and mqtt are beta.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR? Or a later one?

Copy link
Contributor Author

@doctrino doctrino Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on, this is 1 /5 while I added kafka and rest in 5/5

)
Comment on lines +23 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not move this to HostedExtractorsAPI.__init__?

Suggested change
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
)
self._warning = FeaturePreviewWarning(
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors"
)

Copy link
Contributor Author

@doctrino doctrino Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just convenience. Does not matter much as it will be removed in the future.

self._CREATE_LIMIT = 100
self._LIST_LIMIT = 100
self._RETRIEVE_LIMIT = 100
self._DELETE_LIMIT = 100
self._UPDATE_LIMIT = 100

@overload
def __call__(
self,
chunk_size: None = None,
limit: int | None = None,
) -> Iterator[Source]: ...

@overload
def __call__(
self,
chunk_size: int,
limit: int | None = None,
) -> Iterator[SourceList]: ...

def __call__(
self,
chunk_size: int | None = None,
limit: int | None = None,
) -> Iterator[Source] | Iterator[SourceList]:
"""Iterate over sources

Fetches sources as they are iterated over, so you keep a limited number of sources in memory.

Args:
chunk_size (int | None): Number of sources to return in each chunk. Defaults to yielding one source a time.
limit (int | None): Maximum number of sources to return. Defaults to returning all items.

Returns:
Iterator[Source] | Iterator[SourceList]: yields Source one by one if chunk_size is not specified, else SourceList objects.
"""
self._warning.warn()

return self._list_generator(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Source abstract?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many types of sources. The Source object itself is not a concrete object.

method="GET",
chunk_size=chunk_size,
limit=limit,
headers={"cdf-version": "beta"},
)

def __iter__(self) -> Iterator[Source]:
"""Iterate over sources

Fetches sources as they are iterated over, so you keep a limited number of sources in memory.

Returns:
Iterator[Source]: yields Source one by one.
"""
return self()

@overload
def retrieve(self, external_ids: str, ignore_unknown_ids: bool = False) -> Source: ...

@overload
def retrieve(self, external_ids: SequenceNotStr[str], ignore_unknown_ids: bool = False) -> SourceList: ...

def retrieve(
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False
) -> Source | SourceList:
"""`Retrieve one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/retrieve_sources>`_

Args:
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type.
ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception.

Returns:
Source | SourceList: Requested sources

Examples:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.hosted_extractors.sources.retrieve('myMQTTSource')

Get multiple sources by id:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.hosted_extractors.sources.retrieve(["myMQTTSource", "MyEventHubSource"], ignore_unknown_ids=True)

"""
self._warning.warn()
return self._retrieve_multiple(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
identifiers=IdentifierSequence.load(external_ids=external_ids),
ignore_unknown_ids=ignore_unknown_ids,
headers={"cdf-version": "beta"},
doctrino marked this conversation as resolved.
Show resolved Hide resolved
)

def delete(
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False, force: bool = False
) -> None:
"""`Delete one or more sources <https://developer.cognite.com/api#tag/Sources/operation/delete_sources>`_

Args:
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type.
ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception.
force (bool): Delete any jobs associated with each item.
Examples:

Delete sources by id::

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.hosted_extractors.sources.delete(["myMQTTSource", "MyEventHubSource"])
"""
self._warning.warn()
extra_body_fields: dict[str, Any] = {}
if ignore_unknown_ids:
extra_body_fields["ignoreUnknownIds"] = True
if force:
extra_body_fields["force"] = True

self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_ids),
wrap_ids=True,
headers={"cdf-version": "beta"},
extra_body_fields=extra_body_fields or None,
doctrino marked this conversation as resolved.
Show resolved Hide resolved
)

@overload
def create(self, items: SourceWrite) -> Source: ...

@overload
def create(self, items: Sequence[SourceWrite]) -> SourceList: ...

def create(self, items: SourceWrite | Sequence[SourceWrite]) -> Source | SourceList:
"""`Create one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/create_sources>`_

Args:
items (SourceWrite | Sequence[SourceWrite]): Source(s) to create.

Returns:
Source | SourceList: Created source(s)

Examples:

Create new source:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceWrite
>>> client = CogniteClient()
>>> source = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value')
>>> res = client.hosted_extractors.sources.create(source)
"""
self._warning.warn()
return self._create_multiple(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
items=items, # type: ignore[arg-type]
input_resource_cls=SourceWrite,
headers={"cdf-version": "beta"},
)

@overload
def update(self, items: SourceWrite | SourceUpdate) -> Source: ...

@overload
def update(self, items: Sequence[SourceWrite | SourceUpdate]) -> SourceList: ...

def update(self, items: SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]) -> Source | SourceList:
"""`Update one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/update_sources>`_

Args:
items (SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]): Source(s) to update.

Returns:
Source | SourceList: Updated source(s)

Examples:

Update source:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceUpdate
>>> client = CogniteClient()
>>> source = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub")
>>> res = client.hosted_extractors.sources.update(source)
"""
self._warning.warn()
return self._update_multiple(
items=items, # type: ignore[arg-type]
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
update_cls=SourceUpdate,
headers={"cdf-version": "beta"},
)

@classmethod
def _convert_resource_to_patch_object(
cls,
resource: CogniteResource,
update_attributes: list[PropertySpec],
mode: Literal["replace_ignore_null", "patch", "replace"] = "replace_ignore_null",
) -> dict[str, dict[str, dict]]:
output = super()._convert_resource_to_patch_object(resource, update_attributes, mode)
if hasattr(resource, "_type"):
output["type"] = resource._type
return output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution LGTM to me!


def list(
self,
limit: int | None = DEFAULT_LIMIT_READ,
) -> SourceList:
"""`List sources <https://developer.cognite.com/api#tag/Sources/operation/list_sources>`_

Args:
limit (int | None): Maximum number of sources to return. Defaults to 25. Set to -1, float("inf") or None to return all items.

Returns:
SourceList: List of requested sources

Examples:

List sources:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> source_list = client.hosted_extractors.sources.list(limit=5)

Iterate over sources::

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> for source in client.hosted_extractors.sources:
... source # do something with the source

Iterate over chunks of sources to reduce memory load::

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> for source_list in client.hosted_extractors.sources(chunk_size=25):
... source_list # do something with the sources
"""
self._warning.warn()
return self._list(
list_cls=SourceList,
resource_cls=Source, # type: ignore[type-abstract]
method="GET",
limit=limit,
headers={"cdf-version": "beta"},
)
7 changes: 6 additions & 1 deletion cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class APIClient:
"transformations/(filter|byids|jobs/byids|schedules/byids|query/run)",
"extpipes/(list|byids|runs/list)",
"workflows/.*",
"hostedextractors/.*",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now also retries delete and update of sources, is that intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

)
)
]
Expand Down Expand Up @@ -1038,7 +1039,11 @@ def _update_multiple(
for index, item in enumerate(item_list):
if isinstance(item, CogniteResource):
patch_objects.append(
self._convert_resource_to_patch_object(item, update_cls._get_update_properties(item), mode)
self._convert_resource_to_patch_object(
item,
update_cls._get_update_properties(item),
mode,
)
)
elif isinstance(item, CogniteUpdate):
patch_objects.append(item.dump(camel_case=True))
Expand Down
2 changes: 2 additions & 0 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cognite.client._api.files import FilesAPI
from cognite.client._api.functions import FunctionsAPI
from cognite.client._api.geospatial import GeospatialAPI
from cognite.client._api.hosted_extractors import HostedExtractorsAPI
from cognite.client._api.iam import IAMAPI
from cognite.client._api.labels import LabelsAPI
from cognite.client._api.raw import RawAPI
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, config: ClientConfig | None = None) -> None:
self.templates = TemplatesAPI(self._config, self._API_VERSION, self)
self.vision = VisionAPI(self._config, self._API_VERSION, self)
self.extraction_pipelines = ExtractionPipelinesAPI(self._config, self._API_VERSION, self)
self.hosted_extractors = HostedExtractorsAPI(self._config, self._API_VERSION, self)
self.transformations = TransformationsAPI(self._config, self._API_VERSION, self)
self.diagrams = DiagramsAPI(self._config, self._API_VERSION, self)
self.annotations = AnnotationsAPI(self._config, self._API_VERSION, self)
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.58.1"
__version__ = "7.58.2"
__api_subversion__ = "20230101"
2 changes: 1 addition & 1 deletion cognite/client/data_classes/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def __init__(self, update_object: T_CogniteUpdate, name: str) -> None:
self._update_object = update_object
self._name = name

def _set(self, value: None | str | int | bool) -> T_CogniteUpdate:
def _set(self, value: None | str | int | bool | dict) -> T_CogniteUpdate:
if value is None:
self._update_object._set_null(self._name)
else:
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/data_classes/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def set(self, value: Any) -> DataPointSubscriptionUpdate:

class _FilterDataPointSubscriptionUpdate(CognitePrimitiveUpdate):
def set(self, value: Filter) -> DataPointSubscriptionUpdate:
return self._set(value.dump()) # type: ignore[arg-type]
return self._set(value.dump())

class _ListDataPointSubscriptionUpdate(CogniteListUpdate):
def set(self, value: list) -> DataPointSubscriptionUpdate:
Expand Down
Loading