diff --git a/CHANGELOG.md b/CHANGELOG.md index c321ebe1..2397fc7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [6.2.0] + +### Added + + * Added `IOFileUploadQueue` as a base class of both `FileUploadQueue` and `BytesUploadQueue`. + This is an upload queue for functions that produce `BinaryIO` to CDF Files. ## [6.1.1] diff --git a/cognite/extractorutils/__init__.py b/cognite/extractorutils/__init__.py index 6020eb50..f9e8b7a1 100644 --- a/cognite/extractorutils/__init__.py +++ b/cognite/extractorutils/__init__.py @@ -16,5 +16,5 @@ Cognite extractor utils is a Python package that simplifies the development of new extractors. """ -__version__ = "6.1.1" +__version__ = "6.2.0" from .base import Extractor diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 508d3218..ed615bf8 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -55,6 +55,10 @@ class IOFileUploadQueue(AbstractUploadQueue): methods). trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. + max_parallelism: Maximum number of parallel uploads. If this is greater than 0, + the largest of this and client.config.max_workers is used to limit the number + of parallel uploads. This may be important if the IO objects being processed + also load data from an external system. """ def __init__( @@ -67,6 +71,7 @@ def __init__( thread_name: Optional[str] = None, overwrite_existing: bool = False, cancellation_token: threading.Event = threading.Event(), + max_parallelism: int = 0, ): # Super sets post_upload and threshold super().__init__( @@ -82,6 +87,12 @@ def __init__( self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = [] self.overwrite_existing = overwrite_existing + self.parallelism = self.cdf_client.config.max_workers + if max_parallelism > 0 and max_parallelism < self.parallelism: + self.parallelism = max_parallelism + if self.parallelism <= 0: + self.parallelism = 4 + self.files_queued = FILES_UPLOADER_QUEUED self.files_written = FILES_UPLOADER_WRITTEN self.queue_size = FILES_UPLOADER_QUEUE_SIZE @@ -160,7 +171,7 @@ def _upload_batch(self) -> None: # Concurrently execute file-uploads futures: List[Future] = [] - with ThreadPoolExecutor(self.cdf_client.config.max_workers) as pool: + with ThreadPoolExecutor(self.parallelism) as pool: for i, (file_meta, file_name) in enumerate(self.upload_queue): futures.append(pool.submit(self._upload_single, i, file_name, file_meta)) for fut in futures: diff --git a/pyproject.toml b/pyproject.toml index 35117400..1879dc49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognite-extractor-utils" -version = "6.1.1" +version = "6.2.0" description = "Utilities for easier development of extractors for CDF" authors = ["Mathias Lohne "] license = "Apache-2.0"