Skip to content

Commit

Permalink
Add helper function to raise first encountered exception in `TasksSum…
Browse files Browse the repository at this point in the history
…mary` (#1396)
  • Loading branch information
pratuat authored Oct 5, 2023
1 parent c37e4f7 commit f5fe440
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,6 @@ def fetch_datapoints(self) -> list[dict[str, Any]]:
for chunk in split_into_chunks(self._all_identifiers, self.dps_client._RETRIEVE_LATEST_LIMIT)
]
tasks_summary = execute_tasks(self.dps_client._post, tasks, max_workers=self.dps_client._config.max_workers)
if tasks_summary.exceptions:
raise tasks_summary.exceptions[0]
tasks_summary.raise_first_encountered_exception()

return tasks_summary.joined_results(lambda res: res.json()["items"])
4 changes: 2 additions & 2 deletions cognite/client/_api/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,6 @@ def list(
for cursor in cursors
]
summary = utils._concurrency.execute_tasks(self._list, tasks, max_workers=self._config.max_workers)
if summary.exceptions:
raise summary.exceptions[0]
summary.raise_first_encountered_exception()

return RowList(summary.joined_results())
4 changes: 2 additions & 2 deletions cognite/client/_api/relationships.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ def list(
tasks,
max_workers=self._config.max_workers,
)
if tasks_summary.exceptions:
raise tasks_summary.exceptions[0]
tasks_summary.raise_first_encountered_exception()

return RelationshipList(tasks_summary.joined_results())
return self._list(
list_cls=RelationshipList,
Expand Down
3 changes: 1 addition & 2 deletions cognite/client/_api/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -1058,8 +1058,7 @@ def _fetch_sequence(post_obj: dict[str, Any]) -> SequenceData:
tasks_summary = utils._concurrency.execute_tasks(
_fetch_sequence, [(x,) for x in post_objs], max_workers=self._config.max_workers
)
if tasks_summary.exceptions:
raise tasks_summary.exceptions[0]
tasks_summary.raise_first_encountered_exception()
results = tasks_summary.joined_results()
if len(post_objs) == 1:
return results[0]
Expand Down
4 changes: 1 addition & 3 deletions cognite/client/_api/synthetic_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ def query(
datapoints_summary = utils._concurrency.execute_tasks(
self._fetch_datapoints, tasks, max_workers=self._config.max_workers
)

if datapoints_summary.exceptions:
raise datapoints_summary.exceptions[0]
datapoints_summary.raise_first_encountered_exception()

return (
DatapointsList(datapoints_summary.results, cognite_client=self._cognite_client)
Expand Down
7 changes: 3 additions & 4 deletions cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,7 @@ def get_partition(partition_num: int) -> list[dict[str, Any]]:
tasks_summary = utils._concurrency.execute_tasks(
get_partition, [(partition,) for partition in next_cursors], max_workers=partitions
)
if tasks_summary.exceptions:
raise tasks_summary.exceptions[0]
tasks_summary.raise_first_encountered_exception()

for item in tasks_summary.joined_results():
yield resource_cls._load(item, cognite_client=self._cognite_client)
Expand Down Expand Up @@ -619,8 +618,8 @@ def get_partition(partition: int) -> list[dict[str, Any]]:

tasks = [(f"{i + 1}/{partitions}",) for i in range(partitions)]
tasks_summary = utils._concurrency.execute_tasks(get_partition, tasks, max_workers=partitions)
if tasks_summary.exceptions:
raise tasks_summary.exceptions[0]
tasks_summary.raise_first_encountered_exception()

return list_cls._load(tasks_summary.joined_results(), cognite_client=self._cognite_client)

def _aggregate(
Expand Down
4 changes: 4 additions & 0 deletions cognite/client/utils/_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def raise_compound_exception_if_failed_tasks(
self.exceptions, successful=successful, failed=failed, unknown=unknown, unwrap_fn=str_format_element_fn
)

def raise_first_encountered_exception(self) -> None:
if self.exceptions:
raise self.exceptions[0]


def collect_exc_info_and_raise(
exceptions: list[Exception],
Expand Down

0 comments on commit f5fe440

Please sign in to comment.