Skip to content

Commit

Permalink
Don't cap file upload queue parallelism by the client's max paralleli…
Browse files Browse the repository at this point in the history
…sm (#309)

* Don't cap file upload queue parallelism by the client's max parallelism

* Release 7.0.4

* add lock update to changelog

* deflake tests
  • Loading branch information
mathialo authored Mar 6, 2024
1 parent b342b6d commit a3b3d1f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 9 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.0.4

### Fixed

* Max parallelism in file upload queue properly can set larger values than the `max_workers` in the `ClientConfig` object.
* Storing states with the state store will lock the state store. This fixes an issue where iterating through a changing dict could cause issues.

## 7.0.3

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.0.3"
__version__ = "7.0.4"
from .base import Extractor
14 changes: 7 additions & 7 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ class IOFileUploadQueue(AbstractUploadQueue):
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
max_parallelism: Maximum number of parallel uploads. If this is greater than 0,
the largest of this and client.config.max_workers is used to limit the number
of parallel uploads. This may be important if the IO objects being processed
also load data from an external system.
max_parallelism: Maximum number of parallel uploads. If nothing is given, the parallelism will be capped by the
max_workers of the cognite client.
"""

def __init__(
Expand All @@ -71,7 +69,7 @@ def __init__(
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
cancellation_token: Optional[CancellationToken] = None,
max_parallelism: int = 0,
max_parallelism: Optional[int] = None,
):
# Super sets post_upload and threshold
super().__init__(
Expand All @@ -93,7 +91,7 @@ def __init__(
self.overwrite_existing = overwrite_existing

self.parallelism = self.cdf_client.config.max_workers
if max_parallelism > 0 and max_parallelism < self.parallelism:
if max_parallelism:
self.parallelism = max_parallelism
if self.parallelism <= 0:
self.parallelism = 4
Expand Down Expand Up @@ -164,7 +162,9 @@ def _upload_single(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -
)
elif size > 5 * 1024 * 1024 * 1024:
# File bigger than 5Gb
self.logger.warning(f"File {file_meta.source} is larger than 5GiB, file will not be uploaded")
self.logger.warning(
f"File {file_meta.external_id} is larger than 5GiB, file will not be uploaded"
)
file_meta = FileMetadata()
else:
file_meta = self.cdf_client.files.upload_bytes(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "7.0.3"
version = "7.0.4"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down
2 changes: 2 additions & 0 deletions tests/tests_unit/test_statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ def test_start_stop(self):

state_store.stop(ensure_synchronize=True)

time.sleep(1)

new_state_store = LocalStateStore(filename, 10)
new_state_store.start()

Expand Down

0 comments on commit a3b3d1f

Please sign in to comment.