Skip to content

Commit

Permalink
Remove dependency: sortedcontainers (#1566)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Dec 27, 2023
1 parent 126cf44 commit b6cf169
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 104 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.8.6] - 2024-01-03
### Improved
- SDK dependency on the `sortedcontainers` package was dropped.

## [7.8.5] - 2023-12-22
### Fixed
- `DirectRelationReference` is now immutable.
Expand Down
73 changes: 28 additions & 45 deletions cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import operator as op
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from functools import cached_property
Expand All @@ -14,24 +15,24 @@
Any,
Callable,
ClassVar,
DefaultDict,
Dict,
Generic,
Hashable,
Iterator,
List,
Literal,
MutableSequence,
NoReturn,
Optional,
Sequence,
Tuple,
TypedDict,
TypeVar,
Union,
cast,
)

from google.protobuf.message import Message
from sortedcontainers import SortedDict, SortedList
from typing_extensions import NotRequired
from typing_extensions import NotRequired, TypeAlias

from cognite.client.data_classes.datapoints import NUMPY_IS_AVAILABLE, Aggregate, Datapoints, DatapointsArray
from cognite.client.utils._auxiliary import exactly_one_is_not_none, is_unlimited
Expand Down Expand Up @@ -562,26 +563,6 @@ def custom_from_aggregates(lst: list[str]) -> Callable[[DatapointsAgg], tuple[fl
return op.attrgetter(*lst)


class DefaultSortedDict(SortedDict, Generic[_T]):
def __init__(self, default_factory: Callable[[], _T], /, **kw: Any) -> None:
self.default_factory = default_factory
super().__init__(**kw)

def __missing__(self, key: Hashable) -> _T:
self[key] = self.default_factory()
return self[key]


def create_dps_container() -> DefaultSortedDict:
"""Initialises a new sorted container for datapoints storage"""
return DefaultSortedDict(list)


def create_subtask_lst() -> SortedList:
"""Initialises a new sorted list for subtasks"""
return SortedList(key=op.attrgetter("subtask_idx"))


def ensure_int(val: float, change_nan_to: int = 0) -> int:
if math.isnan(val):
return change_nan_to
Expand Down Expand Up @@ -610,21 +591,28 @@ def get_ts_info_from_proto(res: DataPointListItem) -> dict[str, int | str | bool
}


def create_array_from_dps_container(container: DefaultSortedDict) -> npt.NDArray:
return np.hstack(list(chain.from_iterable(container.values())))
_DataContainer: TypeAlias = DefaultDict[Tuple[float, ...], List]


def datapoints_in_order(container: _DataContainer) -> Iterator[list]:
return chain.from_iterable(container[k] for k in sorted(container))

def create_aggregates_arrays_from_dps_container(container: DefaultSortedDict, n_aggs: int) -> list[npt.NDArray]:
all_aggs_arr = np.vstack(list(chain.from_iterable(container.values())))

def create_array_from_dps_container(container: _DataContainer) -> npt.NDArray:
return np.hstack(list(datapoints_in_order(container)))


def create_aggregates_arrays_from_dps_container(container: _DataContainer, n_aggs: int) -> list[npt.NDArray]:
all_aggs_arr = np.vstack(list(datapoints_in_order(container)))
return list(map(np.ravel, np.hsplit(all_aggs_arr, n_aggs)))


def create_list_from_dps_container(container: DefaultSortedDict) -> list:
return list(chain.from_iterable(chain.from_iterable(container.values())))
def create_list_from_dps_container(container: _DataContainer) -> list:
return list(chain.from_iterable(datapoints_in_order(container)))


def create_aggregates_list_from_dps_container(container: DefaultSortedDict) -> Iterator[list[list]]:
concatenated = chain.from_iterable(chain.from_iterable(container.values()))
def create_aggregates_list_from_dps_container(container: _DataContainer) -> Iterator[list[list]]:
concatenated = chain.from_iterable(datapoints_in_order(container))
return map(list, zip(*concatenated)) # rows to columns


Expand All @@ -649,6 +637,7 @@ def __init__(
self.target_unit = target_unit
self.target_unit_system = target_unit_system
self.is_done = False
self.n_dps_fetched = 0

self.static_kwargs = identifier.as_dict()
if target_unit is not None:
Expand All @@ -665,9 +654,6 @@ def store_partial_result(self, res: DataPointListItem) -> list[SplittingFetchSub
...


T_BaseDpsFetchSubtask = TypeVar("T_BaseDpsFetchSubtask", bound=BaseDpsFetchSubtask)


class OutsideDpsFetchSubtask(BaseDpsFetchSubtask):
"""Fetches outside points and stores in parent"""

Expand Down Expand Up @@ -707,7 +693,6 @@ def __init__(
self.aggregates = aggregates
self.granularity = granularity
self.subtask_idx = subtask_idx
self.n_dps_fetched = 0
self.next_start = self.start

if not self.is_raw_query:
Expand Down Expand Up @@ -803,7 +788,7 @@ def _split_self_into_new_subtasks_if_needed(self, last_ts: int) -> list[Splittin
SplittingFetchSubtask(start=start, end=end, subtask_idx=idx, **self._static_params)
for start, end, idx in zip(boundaries[1:-1], boundaries[2:], split_idxs)
]
self.parent.subtasks.update(new_subtasks)
self.parent.subtasks.extend(new_subtasks)
return new_subtasks


Expand All @@ -826,11 +811,9 @@ def __init__(
self._is_done = False
self._final_result: Datapoints | DatapointsArray | None = None

# Only concurrent fetchers really need auto-sorted containers. To keep indexing simple,
# we also use them for serial fetchers (nice for outside points as well):
self.ts_data = create_dps_container()
self.dps_data = create_dps_container()
self.subtasks = create_subtask_lst()
self.ts_data: _DataContainer = defaultdict(list)
self.dps_data: _DataContainer = defaultdict(list)
self.subtasks: list[BaseDpsFetchSubtask] = []

# When running large queries (i.e. not "eager"), all time series have a first batch fetched before
# further subtasks are created. This gives us e.g. outside points for free (if asked for) and ts info:
Expand Down Expand Up @@ -974,7 +957,7 @@ def split_into_subtasks(self, max_workers: int, n_tot_queries: int) -> list[Base
subtask_idx=FIRST_IDX,
)
]
self.subtasks.update(subtasks)
self.subtasks.extend(subtasks)
self._maybe_queue_outside_dps_subtask(subtasks)
return subtasks

Expand Down Expand Up @@ -1080,7 +1063,7 @@ def split_into_subtasks(self, max_workers: int, n_tot_queries: int) -> list[Base
# we hold back to not create too many subtasks:
n_workers_per_queries = max(1, round(max_workers / n_tot_queries))
subtasks: list[BaseDpsFetchSubtask] = self._create_uniformly_split_subtasks(n_workers_per_queries)
self.subtasks.update(subtasks)
self.subtasks.extend(subtasks)
self._maybe_queue_outside_dps_subtask(subtasks)
return subtasks

Expand Down Expand Up @@ -1136,7 +1119,7 @@ def _set_aggregate_vars(self, aggs_camel_case: list[str], use_numpy: bool) -> No
self.float_aggs = aggs_camel_case[:]
self.is_count_query = "count" in self.float_aggs
if self.is_count_query:
self.count_data = create_dps_container()
self.count_data: _DataContainer = defaultdict(list)
self.float_aggs.remove("count") # Only aggregate that is (supposed to be) integer, handle separately

self.has_non_count_aggs = bool(self.float_aggs)
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.8.5"
__version__ = "7.8.6"
__api_subversion__ = "V20220125"
13 changes: 1 addition & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.8.5"
version = "7.8.6"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand All @@ -24,7 +24,6 @@ python = "^3.8"
requests = "^2"
requests_oauthlib = "^1"
msal = "^1"
sortedcontainers = "^2.2"
protobuf = ">=3.16.0"
pip = ">=20.0.0" # make optional once poetry doesn't auto-remove it on "simple install"
typing_extensions = ">= 4"
Expand Down
45 changes: 1 addition & 44 deletions tests/tests_unit/test_api/test_datapoints_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
from decimal import Decimal

import pytest
from sortedcontainers import SortedKeysView

from cognite.client._api.datapoint_tasks import (
_DatapointsQuery,
_SingleTSQueryValidator,
create_dps_container,
create_subtask_lst,
)
from cognite.client.utils._text import random_string
from tests.utils import random_aggregates, random_cognite_ids, random_gamma_dist_integer, random_granularity
from tests.utils import random_aggregates, random_cognite_ids, random_granularity

LIMIT_KWS = dict(dps_limit_raw=1234, dps_limit_agg=5678)

Expand Down Expand Up @@ -205,43 +202,3 @@ def test_retrieve_aggregates__include_outside_points_raises(self):
user_query = _DatapointsQuery(id=id_dct_lst, include_outside_points=False)
with pytest.raises(ValueError, match="'Include outside points' is not supported for aggregates."):
_SingleTSQueryValidator(user_query, **LIMIT_KWS).validate_and_create_single_queries()


@pytest.fixture
def create_random_int_tuples(n_min=5):
return {
tuple(random.choices(range(-5, 5), k=random.randint(1, 5)))
for _ in range(max(n_min, random_gamma_dist_integer(100)))
}


class TestSortedContainers:
def test_dps_container(self, create_random_int_tuples):
container = create_dps_container()
for k in create_random_int_tuples:
container[k] = None
assert isinstance(container.keys(), SortedKeysView)
assert list(container.keys()) == sorted(create_random_int_tuples)

def test_dps_container__with_duplicates(self, create_random_int_tuples):
container = create_dps_container()
tpls = list(create_random_int_tuples)
for k in tpls + tpls[:5]:
container[k].append(None)
assert isinstance(container.keys(), SortedKeysView)
assert list(container.keys()) == sorted(create_random_int_tuples)

@pytest.mark.parametrize("with_duplicates", (False, True))
def test_subtask_lst(self, with_duplicates, create_random_int_tuples):
class Foo:
def __init__(self, idx):
self.subtask_idx = idx

tpls = create_random_int_tuples
if with_duplicates:
tpls = list(tpls) + list(tpls)[:5]

random_foos = [Foo(tpl) for tpl in tpls]
container = create_subtask_lst()
container.update(random_foos)
assert list(container) == sorted(random_foos, key=lambda foo: foo.subtask_idx)

0 comments on commit b6cf169

Please sign in to comment.