Skip to content

Commit

Permalink
Quality of Life on client.extraction_pipelines.runs.list (#1322)
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino authored Nov 29, 2023
1 parent 6354682 commit b0bdb05
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.4.2] - 2023-11-28
### Improved
- Quality of life improvement to `client.extraction_pipelines.runs.list` method. The `statuses` parameter now accepts
a single value and the annotation is improved. The parameter `created_time` can now be given on the format `12d-ago`.

## [7.4.1] - 2023-11-28
### Fixed
- Error in logic when creating a `Transformation`. This is causing when calling `client.transformations.update`.
Expand Down
31 changes: 26 additions & 5 deletions cognite/client/_api/extractionpipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import TYPE_CHECKING, Any, Literal, Sequence, overload

from typing_extensions import TypeAlias

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes import (
Expand All @@ -16,6 +18,7 @@
TimestampRange,
)
from cognite.client.data_classes.extractionpipelines import StringFilter
from cognite.client.utils import timestamp_to_ms
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._validation import assert_type

Expand All @@ -24,6 +27,9 @@
from cognite.client.config import ClientConfig


RunStatus: TypeAlias = Literal["success", "failure", "seen"]


class ExtractionPipelinesAPI(APIClient):
_RESOURCE_PATH = "/extpipes"

Expand Down Expand Up @@ -217,23 +223,29 @@ class ExtractionPipelineRunsAPI(APIClient):
def list(
self,
external_id: str,
statuses: Sequence[str] | None = None,
statuses: RunStatus | Sequence[RunStatus] | Sequence[str] | None = None,
message_substring: str | None = None,
created_time: dict[str, Any] | TimestampRange | None = None,
created_time: dict[str, Any] | TimestampRange | str | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> ExtractionPipelineRunList:
"""`List runs for an extraction pipeline with given external_id <https://developer.cognite.com/api#tag/Extraction-Pipelines-Runs/operation/filterRuns>`_
Args:
external_id (str): Extraction pipeline external Id.
statuses (Sequence[str] | None): One or more among "success" / "failure" / "seen".
statuses (RunStatus | Sequence[RunStatus] | Sequence[str] | None): One or more among "success" / "failure" / "seen".
message_substring (str | None): Failure message part.
created_time (dict[str, Any] | TimestampRange | None): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
created_time (dict[str, Any] | TimestampRange | str | None): Range between two timestamps. Possible keys are `min` and `max`, with values given as timestamps in ms.
If a string is passed, it is assumed to be the minimum value.
limit (int | None): Maximum number of ExtractionPipelines to return. Defaults to 25. Set to -1, float("inf") or None to return all items.
Returns:
ExtractionPipelineRunList: List of requested extraction pipeline runs
Tip:
The ``created_time`` parameter can also be passed as a string, to support the most typical usage pattern
of fetching the most recent runs, meaning it is implicitly assumed to be the minimum created time. The
format is "N[timeunit]-ago", where timeunit is w,d,h,m (week, day, hour, minute), e.g. "12d-ago".
Examples:
List extraction pipeline runs::
Expand All @@ -247,12 +259,21 @@ def list(
>>> from cognite.client import CogniteClient
>>> c = CogniteClient()
>>> runsList = c.extraction_pipelines.runs.list(external_id="test ext id", statuses=["seen"], statuslimit=5)
Get all failed pipeline runs in the last 24 hours for pipeline 'extId':
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import ExtractionPipelineRun
>>> c = CogniteClient()
>>> res = c.extraction_pipelines.runs.list(external_id="extId", statuses="failure", created_time="24h-ago")
"""
if isinstance(created_time, str):
created_time = TimestampRange(min=timestamp_to_ms(created_time))

if statuses is not None or message_substring is not None or created_time is not None:
filter = ExtractionPipelineRunFilter(
external_id=external_id,
statuses=statuses,
statuses=[statuses] if isinstance(statuses, str) else statuses,
message=StringFilter(substring=message_substring),
created_time=created_time,
).dump(camel_case=True)
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.4.1"
__version__ = "7.4.2"
__api_subversion__ = "V20220125"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.4.1"
version = "7.4.2"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
61 changes: 58 additions & 3 deletions tests/tests_integration/test_api/test_extraction_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from datetime import datetime, timedelta, timezone

import pytest

from cognite.client import CogniteClient
from cognite.client.data_classes import ExtractionPipeline, ExtractionPipelineRun, ExtractionPipelineUpdate
from cognite.client.data_classes.extractionpipelines import ExtractionPipelineContact
from cognite.client.data_classes.extractionpipelines import ExtractionPipelineContact, ExtractionPipelineRunList
from cognite.client.exceptions import CogniteNotFoundError
from cognite.client.utils import datetime_to_ms
from cognite.client.utils._text import random_string
from cognite.client.utils._time import YearAligner


@pytest.fixture
def new_extpipe(cognite_client):
@pytest.fixture(scope="function")
def new_extpipe(cognite_client: CogniteClient):
testid = random_string(50)
dataset = cognite_client.data_sets.list()[0]
extpipe = cognite_client.extraction_pipelines.create(
Expand All @@ -33,6 +37,28 @@ def new_extpipe(cognite_client):
assert cognite_client.extraction_pipelines.retrieve(extpipe.id) is None


@pytest.fixture(scope="function")
def populated_runs(cognite_client: CogniteClient, new_extpipe: ExtractionPipeline) -> ExtractionPipelineRunList:
now = datetime_to_ms(dt_now := datetime.now(timezone.utc))
a_year_ago = datetime_to_ms(YearAligner.add_units(dt_now, -1))
runs = [
ExtractionPipelineRun(
extpipe_external_id=new_extpipe.external_id, status="failure", message="lorem ipsum", created_time=now
),
ExtractionPipelineRun(
extpipe_external_id=new_extpipe.external_id,
status="success",
message="dolor sit amet",
created_time=a_year_ago,
),
]
created = []
for run in runs:
new_run = cognite_client.extraction_pipelines.runs.create(run)
created.append(new_run)
return ExtractionPipelineRunList(created)


class TestExtractionPipelinesAPI:
def test_retrieve(self, cognite_client):
res = cognite_client.extraction_pipelines.list(limit=1)
Expand Down Expand Up @@ -116,3 +142,32 @@ def test_list_extraction_pipeline_runs(
dumped = res.dump(camel_case=False)
for run in dumped:
assert run["external_id"] == new_extpipe.external_id

def test_list_failed_extraction_pipeline_runs(
self,
cognite_client: CogniteClient,
new_extpipe: ExtractionPipeline,
populated_runs: ExtractionPipelineRunList,
) -> None:
expected = ExtractionPipelineRunList([run for run in populated_runs if run.status == "failure"])

filtered = cognite_client.extraction_pipelines.runs.list(
external_id=new_extpipe.external_id, statuses="failure", limit=1
)

assert expected.dump() == filtered.dump()

def test_filter_extraction_pipeline_runs_created_ago(
self,
cognite_client: CogniteClient,
new_extpipe: ExtractionPipeline,
populated_runs: ExtractionPipelineRunList,
) -> None:
yesterday = datetime_to_ms(datetime.now(timezone.utc) - timedelta(days=1))
expected = ExtractionPipelineRunList([run for run in populated_runs if run.created_time > yesterday])

filtered = cognite_client.extraction_pipelines.runs.list(
external_id=new_extpipe.external_id, created_time="24h-ago", limit=1
)

assert expected.dump() == filtered.dump()

0 comments on commit b0bdb05

Please sign in to comment.