Skip to content

Commit

Permalink
Simplify datapoints queries with limit (#1425)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Oct 17, 2023
1 parent 513f5a8 commit e72ae96
Show file tree
Hide file tree
Showing 47 changed files with 738 additions and 1,010 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ repos:
- --style=google
- --exclude=tests/|scripts/
- --quiet # otherwise prints all checked files that are ok...
- --skip-checking-raises=true # TODO: Fix in other PR
- --skip-checking-raises=true
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.33.2] - 2023-10-16
### Fixed
- When fetching datapoints from "a few time series" (implementation detail), all missing, non-ignorable time series
are now raised together in a `CogniteNotFoundError` rather than only the first encountered.

### Improved
- Datapoints fetching has a lower peak memory consumption when fetching from multiple time series simultaneously.

## [6.33.1] - 2023-10-14
### Fixed
- `Function.list_schedules()` would return schedules unrelated to the function if the function did not have an external id.
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_api/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from cognite.client.data_classes._base import CogniteResource, PropertySpec
from cognite.client.data_classes.annotations import AnnotationReverseLookupFilter
from cognite.client.data_classes.contextualization import ResourceReference, ResourceReferenceList
from cognite.client.utils._auxiliary import assert_type
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._text import to_camel_case
from cognite.client.utils._validation import assert_type


class AnnotationsAPI(APIClient):
Expand Down
55 changes: 27 additions & 28 deletions cognite/client/_api/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,22 @@
from cognite.client.data_classes.filters import Filter, _validate_filter
from cognite.client.data_classes.shared import AggregateBucketResult
from cognite.client.exceptions import CogniteAPIError
from cognite.client.utils._auxiliary import assert_type, split_into_chunks, split_into_n_parts
from cognite.client.utils._concurrency import classify_error, execute_tasks, get_priority_executor
from cognite.client.utils._auxiliary import split_into_chunks, split_into_n_parts
from cognite.client.utils._concurrency import classify_error, execute_tasks, get_executor
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._importing import import_as_completed
from cognite.client.utils._text import to_camel_case
from cognite.client.utils._validation import prepare_filter_sort, process_asset_subtree_ids, process_data_set_ids
from cognite.client.utils._validation import (
assert_type,
prepare_filter_sort,
process_asset_subtree_ids,
process_data_set_ids,
)

if TYPE_CHECKING:
from concurrent.futures import Future

from cognite.client.utils._priority_tpe import PriorityThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor

as_completed = import_as_completed()

SortSpec: TypeAlias = Union[
AssetSort,
Expand Down Expand Up @@ -1131,11 +1136,6 @@ def list(
)


class _CreateTask(NamedTuple):
items: set[Asset]
priority: int


class _TaskResult(NamedTuple):
successful: list[Asset]
failed: list[Asset]
Expand All @@ -1156,23 +1156,23 @@ def __init__(self, hierarchy: AssetHierarchy, assets_api: AssetsAPI) -> None:
# Each thread needs to store its latest exception:
self.latest_exception: dict[int, Exception | None] = {}

self.__counter = itertools.count().__next__
self._counter = itertools.count().__next__

def create(self, upsert: bool, upsert_mode: Literal["patch", "replace"]) -> AssetList:
insert_fn = functools.partial(self._insert, upsert=upsert, upsert_mode=upsert_mode)
insert_dct = self.hierarchy.groupby_parent_xid()
subtree_count = self.hierarchy.count_subtree(insert_dct)

with get_priority_executor(max_workers=self.max_workers) as pool:
created_assets = self._create(pool, insert_fn, insert_dct, subtree_count)
pool = get_executor(max_workers=self.max_workers)
created_assets = self._create(pool, insert_fn, insert_dct, subtree_count) # type: ignore [arg-type]

if all_exceptions := [exc for exc in self.latest_exception.values() if exc is not None]:
self._raise_latest_exception(all_exceptions, created_assets)
return AssetList(created_assets, cognite_client=self.assets_api._cognite_client)

def _create(
self,
pool: PriorityThreadPoolExecutor,
pool: ThreadPoolExecutor,
insert_fn: Callable[[list[Asset]], _TaskResult],
insert_dct: dict[str | None, list[Asset]],
subtree_count: dict[str, int],
Expand All @@ -1192,7 +1192,7 @@ def _create(
futures = queue_fn(insert_dct.pop(None))

while futures:
futures.remove(fut := next(pool.as_completed(futures)))
futures.remove(fut := next(as_completed(futures)))
new_assets, failed, unknown = fut.result()
created_assets.extend(new_assets)
if unknown or failed:
Expand All @@ -1209,15 +1209,15 @@ def _queue_tasks(
self,
assets: list[Asset],
*,
pool: PriorityThreadPoolExecutor,
pool: ThreadPoolExecutor,
insert_fn: Callable,
insert_dct: dict[str | None, list[Asset]],
subtree_count: dict[str, int],
) -> set[Future]:
if not assets:
return set()
return {
pool.submit(insert_fn, task.items, priority=self.n_assets - task.priority)
pool.submit(insert_fn, task)
for task in self._split_and_prioritise_assets(assets, insert_dct, subtree_count)
}

Expand Down Expand Up @@ -1326,7 +1326,7 @@ def _split_and_prioritise_assets(
to_create: list[Asset],
insert_dct: dict[str | None, list[Asset]],
subtree_count: dict[str, int],
) -> Iterator[_CreateTask]:
) -> Iterator[set[Asset]]:
# We want to dive as deep down the hierarchy as possible while prioritising assets with the biggest
# subtree, that way we more quickly get into a state with enough unblocked parents to always keep
# our worker threads fed with create-requests.
Expand All @@ -1337,33 +1337,32 @@ def _split_and_prioritise_assets(
for chunk in split_into_n_parts(to_create, n=n_parts)
]
# Also, to not waste worker threads on tiny requests, we might recombine:
tasks.sort(key=lambda task: len(task.items))
tasks.sort(key=len)
yield from self._recombine_chunks(tasks, limit=self.create_limit)

@staticmethod
def _dump_assets(assets: Sequence[Asset] | Sequence[AssetUpdate]) -> dict[str, list[dict]]:
return {"items": [asset.dump(camel_case=True) for asset in assets]}

@staticmethod
def _recombine_chunks(lst: list[_CreateTask], limit: int) -> Iterator[_CreateTask]:
def _recombine_chunks(lst: list[set[Asset]], limit: int) -> Iterator[set[Asset]]:
task = lst[0]
for next_task in lst[1:]:
if len(task.items) + len(next_task.items) > limit:
if len(task) + len(next_task) > limit:
yield task
task = next_task
else:
task = _CreateTask(task.items | next_task.items, max(task.priority, next_task.priority))
task |= next_task
yield task

def _extend_with_unblocked_from_subtree(
self,
to_create: set[Asset],
insert_dct: dict[str | None, list[Asset]],
subtree_count: dict[str, int],
) -> _CreateTask:
pri_q = [(-subtree_count[cast(str, asset.external_id)], self.__counter(), asset) for asset in to_create]
) -> set[Asset]:
pri_q = [(-subtree_count[cast(str, asset.external_id)], self._counter(), asset) for asset in to_create]
heapq.heapify(pri_q)
priority = -pri_q[0][0] # No child asset can have a larger subtree than its parent

while pri_q: # Queue should seriously be spelled q
*_, asset = heapq.heappop(pri_q)
Expand All @@ -1373,9 +1372,9 @@ def _extend_with_unblocked_from_subtree(
if len(to_create) == self.create_limit:
break
for child in insert_dct.get(asset.external_id, []):
heapq.heappush(pri_q, (-subtree_count[cast(str, child.external_id)], self.__counter(), child))
heapq.heappush(pri_q, (-subtree_count[cast(str, child.external_id)], self._counter(), child))

return _CreateTask(to_create, priority)
return to_create

@staticmethod
def _pop_child_assets(assets: Iterable[Asset], insert_dct: dict[str | None, list[Asset]]) -> Iterator[Asset]:
Expand Down
Loading

0 comments on commit e72ae96

Please sign in to comment.