Skip to content

Commit

Permalink
[CDF-22879] Update extraction pipeline with NotificationConfig (#1978)
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino authored Oct 15, 2024
1 parent 3c97242 commit 88b0cf2
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.63.5] - 2024-10-15
### Fixed
- Added missing parameter `notification_config` to `ExtractionPipeline`.

## [7.63.4] - 2024-10-14
### Fixed
- Using `OAuthDeviceCode.load` now includes the missing parameters `oauth_discovery_url`, `clear_cache`, and `mem_cache_only`.
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.63.4"
__version__ = "7.63.5"
__api_subversion__ = "20230101"
41 changes: 38 additions & 3 deletions cognite/client/data_classes/extractionpipelines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from abc import ABC
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal, cast

from cognite.client.data_classes._base import (
Expand Down Expand Up @@ -51,6 +52,18 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> E
)


@dataclass
class ExtractionPipelineNotificationConfiguration(CogniteObject):
"""Extraction pipeline notification configuration
Args:
allowed_not_seen_range_in_minutes (int | None): Time in minutes to pass without any Run. Null if extraction pipeline is not checked.
"""

allowed_not_seen_range_in_minutes: int | None = None


class ExtractionPipelineCore(WriteableCogniteResource["ExtractionPipelineWrite"], ABC):
"""An extraction pipeline is a representation of a process writing data to CDF, such as an extractor or an ETL tool.
Expand All @@ -65,6 +78,7 @@ class ExtractionPipelineCore(WriteableCogniteResource["ExtractionPipelineWrite"]
metadata (dict[str, str] | None): Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.
source (str | None): Source text value for extraction pipeline.
documentation (str | None): Documentation text value for extraction pipeline.
notification_config (ExtractionPipelineNotificationConfiguration | None): Notification configuration for the extraction pipeline.
created_by (str | None): Extraction pipeline creator, usually an email.
"""
Expand All @@ -81,6 +95,7 @@ def __init__(
metadata: dict[str, str] | None = None,
source: str | None = None,
documentation: str | None = None,
notification_config: ExtractionPipelineNotificationConfiguration | None = None,
created_by: str | None = None,
) -> None:
self.external_id = external_id
Expand All @@ -93,12 +108,15 @@ def __init__(
self.metadata = metadata
self.source = source
self.documentation = documentation
self.notification_config = notification_config
self.created_by = created_by

def dump(self, camel_case: bool = True) -> dict[str, Any]:
result = super().dump(camel_case)
if self.contacts:
result["contacts"] = [contact.dump(camel_case) for contact in self.contacts]
if self.notification_config:
result["notificationConfig"] = self.notification_config.dump(camel_case)
return result


Expand All @@ -122,6 +140,7 @@ class ExtractionPipeline(ExtractionPipelineCore):
metadata (dict[str, str] | None): Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.
source (str | None): Source text value for extraction pipeline.
documentation (str | None): Documentation text value for extraction pipeline.
notification_config (ExtractionPipelineNotificationConfiguration | None): Notification configuration for the extraction pipeline.
created_time (int | None): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
last_updated_time (int | None): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
created_by (str | None): Extraction pipeline creator, usually an email.
Expand All @@ -145,6 +164,7 @@ def __init__(
metadata: dict[str, str] | None = None,
source: str | None = None,
documentation: str | None = None,
notification_config: ExtractionPipelineNotificationConfiguration | None = None,
created_time: int | None = None,
last_updated_time: int | None = None,
created_by: str | None = None,
Expand All @@ -161,6 +181,7 @@ def __init__(
metadata=metadata,
source=source,
documentation=documentation,
notification_config=notification_config,
created_by=created_by,
)
# id/created_time/last_updated_time are required when using the class to read,
Expand Down Expand Up @@ -192,6 +213,7 @@ def as_write(self) -> ExtractionPipelineWrite:
metadata=self.metadata,
source=self.source,
documentation=self.documentation,
notification_config=self.notification_config,
created_by=self.created_by,
)

Expand All @@ -203,6 +225,10 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> E
ExtractionPipelineContact._load(contact) if isinstance(contact, dict) else contact
for contact in instance.contacts
]
if instance.notification_config and isinstance(instance.notification_config, dict):
instance.notification_config = ExtractionPipelineNotificationConfiguration._load(
instance.notification_config
)
return instance

def __hash__(self) -> int:
Expand All @@ -224,6 +250,7 @@ class ExtractionPipelineWrite(ExtractionPipelineCore):
metadata (dict[str, str] | None): Custom, application specific metadata. String key -> String value. Limits: Maximum length of key is 128 bytes, value 10240 bytes, up to 256 key-value pairs, of total size at most 10240.
source (str | None): Source text value for extraction pipeline.
documentation (str | None): Documentation text value for extraction pipeline.
notification_config (ExtractionPipelineNotificationConfiguration | None): Notification configuration for the extraction pipeline.
created_by (str | None): Extraction pipeline creator, usually an email.
"""

Expand All @@ -239,6 +266,7 @@ def __init__(
metadata: dict[str, str] | None = None,
source: str | None = None,
documentation: str | None = None,
notification_config: ExtractionPipelineNotificationConfiguration | None = None,
created_by: str | None = None,
) -> None:
super().__init__(
Expand All @@ -252,6 +280,7 @@ def __init__(
metadata=metadata,
source=source,
documentation=documentation,
notification_config=notification_config,
created_by=created_by,
)

Expand All @@ -264,10 +293,13 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> E
data_set_id=resource["dataSetId"],
raw_tables=resource.get("rawTables"),
schedule=resource.get("schedule"),
contacts=[ExtractionPipelineContact.load(contact) for contact in resource.get("contacts") or []] or None,
contacts=[ExtractionPipelineContact._load(contact) for contact in resource.get("contacts") or []] or None,
metadata=resource.get("metadata"),
source=resource.get("source"),
documentation=resource.get("documentation"),
notification_config=ExtractionPipelineNotificationConfiguration._load(resource["notificationConfig"])
if "notificationConfig" in resource
else None,
created_by=resource.get("createdBy"),
)

Expand Down Expand Up @@ -340,6 +372,10 @@ def source(self) -> _PrimitiveExtractionPipelineUpdate:
def documentation(self) -> _PrimitiveExtractionPipelineUpdate:
return ExtractionPipelineUpdate._PrimitiveExtractionPipelineUpdate(self, "documentation")

@property
def notification_config(self) -> _PrimitiveExtractionPipelineUpdate:
return ExtractionPipelineUpdate._PrimitiveExtractionPipelineUpdate(self, "notificationConfig")

@property
def schedule(self) -> _PrimitiveExtractionPipelineUpdate:
return ExtractionPipelineUpdate._PrimitiveExtractionPipelineUpdate(self, "schedule")
Expand All @@ -361,8 +397,7 @@ def _get_update_properties(cls, item: CogniteResource | None = None) -> list[Pro
PropertySpec("metadata", is_object=True),
PropertySpec("source", is_nullable=False),
PropertySpec("documentation", is_nullable=False),
# Not supported yet
# PropertySpec("notification_config", is_nullable=False),
PropertySpec("notification_config", is_nullable=False),
]


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.63.4"
version = "7.63.5"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
18 changes: 14 additions & 4 deletions tests/tests_integration/test_api/test_extraction_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,29 @@
import pytest

from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun, ExtractionPipelineUpdate
from cognite.client.data_classes.extractionpipelines import ExtractionPipelineContact, ExtractionPipelineRunList
from cognite.client.data_classes import (
ExtractionPipeline,
ExtractionPipelineRun,
ExtractionPipelineUpdate,
ExtractionPipelineWrite,
)
from cognite.client.data_classes.extractionpipelines import (
ExtractionPipelineContact,
ExtractionPipelineNotificationConfiguration,
ExtractionPipelineRunList,
)
from cognite.client.exceptions import CogniteNotFoundError
from cognite.client.utils import datetime_to_ms
from cognite.client.utils._text import random_string
from cognite.client.utils._time import DayAligner


@pytest.fixture(scope="function")
def new_extpipe(cognite_client: CogniteClient):
def new_extpipe(cognite_client: CogniteClient) -> ExtractionPipeline:
testid = random_string(50)
dataset = cognite_client.data_sets.list()[0]
extpipe = cognite_client.extraction_pipelines.create(
ExtractionPipeline(
ExtractionPipelineWrite(
external_id=f"testid-{testid}",
name=f"Test extpipe {testid}",
data_set_id=dataset.id,
Expand All @@ -26,6 +35,7 @@ def new_extpipe(cognite_client: CogniteClient):
name="John Doe", email="[email protected]", role="owner", send_notification=False
)
],
notification_config=ExtractionPipelineNotificationConfiguration(allowed_not_seen_range_in_minutes=10),
schedule="Continuous",
)
)
Expand Down

0 comments on commit 88b0cf2

Please sign in to comment.