-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CDF-22379] Hosted Extractors: Source #1893
Changes from 29 commits
3f6ca7b
f03e006
c663cd2
1979756
19f6d9d
1ebb1fc
c6a9b97
1800bd2
0431129
1757dc8
18c0e3d
3699dde
ec5de1e
1712d8d
9a63e58
1cec0fa
9169272
bd3e18a
e407b82
ce959cb
3c7f447
09980a5
aa39c78
1f2e914
958b41c
b65d68f
f11d1b7
c9f699f
020313a
5652bfc
448054e
9ce9540
6e16c0b
2d2c72c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING | ||
|
||
from cognite.client._api.hosted_extractors.sources import SourcesAPI | ||
from cognite.client._api_client import APIClient | ||
|
||
if TYPE_CHECKING: | ||
from cognite.client import CogniteClient | ||
from cognite.client.config import ClientConfig | ||
|
||
|
||
class HostedExtractorsAPI(APIClient): | ||
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: | ||
super().__init__(config, api_version, cognite_client) | ||
self.sources = SourcesAPI(config, api_version, cognite_client) |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,275 @@ | ||||||||||||||
from __future__ import annotations | ||||||||||||||
|
||||||||||||||
from collections.abc import Iterator | ||||||||||||||
from typing import TYPE_CHECKING, Any, Literal, Sequence, overload | ||||||||||||||
|
||||||||||||||
from cognite.client._api_client import APIClient | ||||||||||||||
from cognite.client._constants import DEFAULT_LIMIT_READ | ||||||||||||||
from cognite.client.data_classes._base import CogniteResource, PropertySpec | ||||||||||||||
from cognite.client.data_classes.hosted_extractors.sources import Source, SourceList, SourceUpdate, SourceWrite | ||||||||||||||
from cognite.client.utils._experimental import FeaturePreviewWarning | ||||||||||||||
from cognite.client.utils._identifier import IdentifierSequence | ||||||||||||||
from cognite.client.utils.useful_types import SequenceNotStr | ||||||||||||||
|
||||||||||||||
if TYPE_CHECKING: | ||||||||||||||
from cognite.client import ClientConfig, CogniteClient | ||||||||||||||
|
||||||||||||||
|
||||||||||||||
class SourcesAPI(APIClient): | ||||||||||||||
_RESOURCE_PATH = "/hostedextractors/sources" | ||||||||||||||
|
||||||||||||||
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: | ||||||||||||||
super().__init__(config, api_version, cognite_client) | ||||||||||||||
self._warning = FeaturePreviewWarning( | ||||||||||||||
api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors" | ||||||||||||||
) | ||||||||||||||
Comment on lines
+23
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to not move this to
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just convenience. Does not matter much as it will be removed in the future. |
||||||||||||||
self._CREATE_LIMIT = 100 | ||||||||||||||
self._LIST_LIMIT = 100 | ||||||||||||||
self._RETRIEVE_LIMIT = 100 | ||||||||||||||
self._DELETE_LIMIT = 100 | ||||||||||||||
self._UPDATE_LIMIT = 100 | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def __call__( | ||||||||||||||
self, | ||||||||||||||
chunk_size: None = None, | ||||||||||||||
limit: int | None = None, | ||||||||||||||
) -> Iterator[Source]: ... | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def __call__( | ||||||||||||||
self, | ||||||||||||||
chunk_size: int, | ||||||||||||||
limit: int | None = None, | ||||||||||||||
) -> Iterator[SourceList]: ... | ||||||||||||||
|
||||||||||||||
def __call__( | ||||||||||||||
self, | ||||||||||||||
chunk_size: int | None = None, | ||||||||||||||
limit: int | None = None, | ||||||||||||||
) -> Iterator[Source] | Iterator[SourceList]: | ||||||||||||||
"""Iterate over sources | ||||||||||||||
|
||||||||||||||
Fetches sources as they are iterated over, so you keep a limited number of sources in memory. | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
chunk_size (int | None): Number of sources to return in each chunk. Defaults to yielding one source a time. | ||||||||||||||
limit (int | None): Maximum number of sources to return. Defaults to returning all items. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Iterator[Source] | Iterator[SourceList]: yields Source one by one if chunk_size is not specified, else SourceList objects. | ||||||||||||||
""" | ||||||||||||||
self._warning.warn() | ||||||||||||||
|
||||||||||||||
return self._list_generator( | ||||||||||||||
list_cls=SourceList, | ||||||||||||||
resource_cls=Source, # type: ignore[type-abstract] | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Many types of sources. The Source object itself is not a concrete object. |
||||||||||||||
method="GET", | ||||||||||||||
chunk_size=chunk_size, | ||||||||||||||
limit=limit, | ||||||||||||||
headers={"cdf-version": "beta"}, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
def __iter__(self) -> Iterator[Source]: | ||||||||||||||
"""Iterate over sources | ||||||||||||||
|
||||||||||||||
Fetches sources as they are iterated over, so you keep a limited number of sources in memory. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Iterator[Source]: yields Source one by one. | ||||||||||||||
""" | ||||||||||||||
return self() | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def retrieve(self, external_ids: str, ignore_unknown_ids: bool = False) -> Source: ... | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def retrieve(self, external_ids: SequenceNotStr[str], ignore_unknown_ids: bool = False) -> SourceList: ... | ||||||||||||||
|
||||||||||||||
def retrieve( | ||||||||||||||
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False | ||||||||||||||
) -> Source | SourceList: | ||||||||||||||
"""`Retrieve one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/retrieve_sources>`_ | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type. | ||||||||||||||
ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Source | SourceList: Requested sources | ||||||||||||||
|
||||||||||||||
Examples: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> res = client.hosted_extractors.sources.retrieve('myMQTTSource') | ||||||||||||||
|
||||||||||||||
Get multiple sources by id: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> res = client.hosted_extractors.sources.retrieve(["myMQTTSource", "MyEventHubSource"], ignore_unknown_ids=True) | ||||||||||||||
|
||||||||||||||
""" | ||||||||||||||
self._warning.warn() | ||||||||||||||
return self._retrieve_multiple( | ||||||||||||||
list_cls=SourceList, | ||||||||||||||
resource_cls=Source, # type: ignore[type-abstract] | ||||||||||||||
identifiers=IdentifierSequence.load(external_ids=external_ids), | ||||||||||||||
ignore_unknown_ids=ignore_unknown_ids, | ||||||||||||||
headers={"cdf-version": "beta"}, | ||||||||||||||
doctrino marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
) | ||||||||||||||
|
||||||||||||||
def delete( | ||||||||||||||
self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bool = False, force: bool = False | ||||||||||||||
) -> None: | ||||||||||||||
"""`Delete one or more sources <https://developer.cognite.com/api#tag/Sources/operation/delete_sources>`_ | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
external_ids (str | SequenceNotStr[str]): The external ID provided by the client. Must be unique for the resource type. | ||||||||||||||
ignore_unknown_ids (bool): Ignore external IDs that are not found rather than throw an exception. | ||||||||||||||
force (bool): Delete any jobs associated with each item. | ||||||||||||||
Examples: | ||||||||||||||
|
||||||||||||||
Delete sources by id:: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> client.hosted_extractors.sources.delete(["myMQTTSource", "MyEventHubSource"]) | ||||||||||||||
""" | ||||||||||||||
self._warning.warn() | ||||||||||||||
extra_body_fields: dict[str, Any] = {} | ||||||||||||||
if ignore_unknown_ids: | ||||||||||||||
extra_body_fields["ignoreUnknownIds"] = True | ||||||||||||||
if force: | ||||||||||||||
extra_body_fields["force"] = True | ||||||||||||||
|
||||||||||||||
self._delete_multiple( | ||||||||||||||
identifiers=IdentifierSequence.load(external_ids=external_ids), | ||||||||||||||
wrap_ids=True, | ||||||||||||||
headers={"cdf-version": "beta"}, | ||||||||||||||
extra_body_fields=extra_body_fields or None, | ||||||||||||||
doctrino marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
) | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def create(self, items: SourceWrite) -> Source: ... | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def create(self, items: Sequence[SourceWrite]) -> SourceList: ... | ||||||||||||||
|
||||||||||||||
def create(self, items: SourceWrite | Sequence[SourceWrite]) -> Source | SourceList: | ||||||||||||||
"""`Create one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/create_sources>`_ | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
items (SourceWrite | Sequence[SourceWrite]): Source(s) to create. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Source | SourceList: Created source(s) | ||||||||||||||
|
||||||||||||||
Examples: | ||||||||||||||
|
||||||||||||||
Create new source: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceWrite | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> source = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value') | ||||||||||||||
>>> res = client.hosted_extractors.sources.create(source) | ||||||||||||||
""" | ||||||||||||||
self._warning.warn() | ||||||||||||||
return self._create_multiple( | ||||||||||||||
list_cls=SourceList, | ||||||||||||||
resource_cls=Source, # type: ignore[type-abstract] | ||||||||||||||
items=items, # type: ignore[arg-type] | ||||||||||||||
input_resource_cls=SourceWrite, | ||||||||||||||
headers={"cdf-version": "beta"}, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def update(self, items: SourceWrite | SourceUpdate) -> Source: ... | ||||||||||||||
|
||||||||||||||
@overload | ||||||||||||||
def update(self, items: Sequence[SourceWrite | SourceUpdate]) -> SourceList: ... | ||||||||||||||
|
||||||||||||||
def update(self, items: SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]) -> Source | SourceList: | ||||||||||||||
"""`Update one or more sources. <https://developer.cognite.com/api#tag/Sources/operation/update_sources>`_ | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
items (SourceWrite | SourceUpdate | Sequence[SourceWrite | SourceUpdate]): Source(s) to update. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
Source | SourceList: Updated source(s) | ||||||||||||||
|
||||||||||||||
Examples: | ||||||||||||||
|
||||||||||||||
Update source: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> from cognite.client.data_classes.hosted_extractors import EventHubSourceUpdate | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> source = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub") | ||||||||||||||
>>> res = client.hosted_extractors.sources.update(source) | ||||||||||||||
""" | ||||||||||||||
self._warning.warn() | ||||||||||||||
return self._update_multiple( | ||||||||||||||
items=items, # type: ignore[arg-type] | ||||||||||||||
list_cls=SourceList, | ||||||||||||||
resource_cls=Source, # type: ignore[type-abstract] | ||||||||||||||
update_cls=SourceUpdate, | ||||||||||||||
headers={"cdf-version": "beta"}, | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
@classmethod | ||||||||||||||
def _convert_resource_to_patch_object( | ||||||||||||||
cls, | ||||||||||||||
resource: CogniteResource, | ||||||||||||||
update_attributes: list[PropertySpec], | ||||||||||||||
mode: Literal["replace_ignore_null", "patch", "replace"] = "replace_ignore_null", | ||||||||||||||
) -> dict[str, dict[str, dict]]: | ||||||||||||||
output = super()._convert_resource_to_patch_object(resource, update_attributes, mode) | ||||||||||||||
if hasattr(resource, "type"): | ||||||||||||||
output["type"] = resource.type | ||||||||||||||
return output | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This solution LGTM to me! |
||||||||||||||
|
||||||||||||||
def list( | ||||||||||||||
self, | ||||||||||||||
limit: int | None = DEFAULT_LIMIT_READ, | ||||||||||||||
) -> SourceList: | ||||||||||||||
"""`List sources <https://developer.cognite.com/api#tag/Sources/operation/list_sources>`_ | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
limit (int | None): Maximum number of sources to return. Defaults to 25. Set to -1, float("inf") or None to return all items. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
SourceList: List of requested sources | ||||||||||||||
|
||||||||||||||
Examples: | ||||||||||||||
|
||||||||||||||
List sources: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> source_list = client.hosted_extractors.sources.list(limit=5) | ||||||||||||||
|
||||||||||||||
Iterate over sources:: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> for source in client.hosted_extractors.sources: | ||||||||||||||
... source # do something with the source | ||||||||||||||
|
||||||||||||||
Iterate over chunks of sources to reduce memory load:: | ||||||||||||||
|
||||||||||||||
>>> from cognite.client import CogniteClient | ||||||||||||||
>>> client = CogniteClient() | ||||||||||||||
>>> for source_list in client.hosted_extractors.sources(chunk_size=25): | ||||||||||||||
... source_list # do something with the sources | ||||||||||||||
""" | ||||||||||||||
self._warning.warn() | ||||||||||||||
return self._list( | ||||||||||||||
list_cls=SourceList, | ||||||||||||||
resource_cls=Source, # type: ignore[type-abstract] | ||||||||||||||
method="GET", | ||||||||||||||
limit=limit, | ||||||||||||||
headers={"cdf-version": "beta"}, | ||||||||||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,6 +106,7 @@ class APIClient: | |
"transformations/(filter|byids|jobs/byids|schedules/byids|query/run)", | ||
"extpipes/(list|byids|runs/list)", | ||
"workflows/.*", | ||
"hostedextractors/.*", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This now also retries delete and update of sources, is that intended? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
) | ||
) | ||
] | ||
|
@@ -1038,7 +1039,11 @@ def _update_multiple( | |
for index, item in enumerate(item_list): | ||
if isinstance(item, CogniteResource): | ||
patch_objects.append( | ||
self._convert_resource_to_patch_object(item, update_cls._get_update_properties(item), mode) | ||
self._convert_resource_to_patch_object( | ||
item, | ||
update_cls._get_update_properties(item), | ||
mode, | ||
) | ||
) | ||
elif isinstance(item, CogniteUpdate): | ||
patch_objects.append(item.dump(camel_case=True)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from __future__ import annotations | ||
|
||
__version__ = "7.58.0" | ||
__version__ = "7.58.1" | ||
__api_subversion__ = "20230101" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add
rest
andkafka
which arealpha
, theeventhub
andmqtt
are beta.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR? Or a later one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later on, this is 1 /5 while I added kafka and rest in 5/5