Skip to content

Commit

Permalink
Re-use the thread pool in file upload queues (#292)
Browse files Browse the repository at this point in the history
Instead of creating a new threadpool in each upload call, re-use a
single one.

Waiting on futures (without a timeout) will still make the the call to
`upload()` blocking, so there should be no change in external usage.
  • Loading branch information
mathialo authored Feb 13, 2024
1 parent b02e87e commit 662593d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.4.1]

### Changed

* File upload queues now reuse a single thread pool across runs instead of creating a new one each time `upload()` is called.

## [6.4.0]

### Added
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "6.4.0"
__version__ = "6.4.1"
from .base import Extractor
19 changes: 15 additions & 4 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
)
from cognite.extractorutils.util import retry

_QUEUES: int = 0
_QUEUES_LOCK: threading.RLock = threading.RLock()


class IOFileUploadQueue(AbstractUploadQueue):
"""
Expand Down Expand Up @@ -97,6 +100,13 @@ def __init__(
self.files_written = FILES_UPLOADER_WRITTEN
self.queue_size = FILES_UPLOADER_QUEUE_SIZE

global _QUEUES, _QUEUES_LOCK
with _QUEUES_LOCK:
self._pool = ThreadPoolExecutor(
max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}"
)
_QUEUES += 1

def add_io_to_upload_queue(self, file_meta: FileMetadata, read_file: Callable[[], BinaryIO]) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
Expand Down Expand Up @@ -171,11 +181,10 @@ def _upload_batch(self) -> None:
# Concurrently execute file-uploads

futures: List[Future] = []
with ThreadPoolExecutor(self.parallelism) as pool:
for i, (file_meta, file_name) in enumerate(self.upload_queue):
futures.append(pool.submit(self._upload_single, i, file_name, file_meta))
for i, (file_meta, file_name) in enumerate(self.upload_queue):
futures.append(self._pool.submit(self._upload_single, i, file_name, file_meta))
for fut in futures:
fut.result(0.0)
fut.result()

def __enter__(self) -> "IOFileUploadQueue":
"""
Expand All @@ -185,6 +194,7 @@ def __enter__(self) -> "IOFileUploadQueue":
self
"""
self.start()
self._pool.__enter__()
return self

def __exit__(
Expand All @@ -198,6 +208,7 @@ def __exit__(
exc_val: Exception value
exc_tb: Traceback
"""
self._pool.__exit__(exc_type, exc_val, exc_tb)
self.stop()

def __len__(self) -> int:
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "6.4.0"
version = "6.4.1"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -85,5 +85,5 @@ types-requests = "^2.31.0.20240125"
httpx = "^0.26.0"

[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

0 comments on commit 662593d

Please sign in to comment.