Skip to content

Commit

Permalink
Bump version and add option for max parallelism in IO upload queue (#278
Browse files Browse the repository at this point in the history
)
  • Loading branch information
einarmo authored Jan 3, 2024
1 parent 8bf2a19 commit acff7aa
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.2.0]

### Added

* Added `IOFileUploadQueue` as a base class of both `FileUploadQueue` and `BytesUploadQueue`.
This is an upload queue for functions that produce `BinaryIO` to CDF Files.

## [6.1.1]

Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "6.1.1"
__version__ = "6.2.0"
from .base import Extractor
13 changes: 12 additions & 1 deletion cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class IOFileUploadQueue(AbstractUploadQueue):
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
max_parallelism: Maximum number of parallel uploads. If this is greater than 0,
the largest of this and client.config.max_workers is used to limit the number
of parallel uploads. This may be important if the IO objects being processed
also load data from an external system.
"""

def __init__(
Expand All @@ -67,6 +71,7 @@ def __init__(
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
cancellation_token: threading.Event = threading.Event(),
max_parallelism: int = 0,
):
# Super sets post_upload and threshold
super().__init__(
Expand All @@ -82,6 +87,12 @@ def __init__(
self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = []
self.overwrite_existing = overwrite_existing

self.parallelism = self.cdf_client.config.max_workers
if max_parallelism > 0 and max_parallelism < self.parallelism:
self.parallelism = max_parallelism
if self.parallelism <= 0:
self.parallelism = 4

self.files_queued = FILES_UPLOADER_QUEUED
self.files_written = FILES_UPLOADER_WRITTEN
self.queue_size = FILES_UPLOADER_QUEUE_SIZE
Expand Down Expand Up @@ -160,7 +171,7 @@ def _upload_batch(self) -> None:
# Concurrently execute file-uploads

futures: List[Future] = []
with ThreadPoolExecutor(self.cdf_client.config.max_workers) as pool:
with ThreadPoolExecutor(self.parallelism) as pool:
for i, (file_meta, file_name) in enumerate(self.upload_queue):
futures.append(pool.submit(self._upload_single, i, file_name, file_meta))
for fut in futures:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "6.1.1"
version = "6.2.0"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down

0 comments on commit acff7aa

Please sign in to comment.