Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change file upload queue to eager uploads #293

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## Next

### Changed

* The file upload queues have changed behaviour.
- Instead of waiting to upload until a set of conditions, it starts uploading immedeately.
- The `upload()` method now acts more like a `join`, wating on all the uploads in the queue to complete before returning.
- A call to `add_to_upload_queue` when the queue is full will hang until the queue is no longer full before returning, instead of triggering and upload and hanging until everything is uploaded.
- The queues now require to be set up with a max size. The max upload latencey is removed.
As long as you use the queue in as a context (ie, using `with FileUploadQueue(...) as queue:`) you should not have to change anything in your code. The behaviour of the queue will change, it will most likely be much faster, but it will not require any changes from you as a user of the queue.

## [6.4.1]

### Changed
Expand Down
146 changes: 77 additions & 69 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from io import BytesIO
from os import PathLike
from types import TracebackType
from typing import BinaryIO, Callable, List, Optional, Tuple, Type, Union
from typing import BinaryIO, Callable, List, Optional, Type, Union

from requests import ConnectionError

Expand Down Expand Up @@ -53,9 +53,7 @@ class IOFileUploadQueue(AbstractUploadQueue):
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
max_parallelism: Maximum number of parallel uploads. If this is greater than 0,
Expand All @@ -69,7 +67,6 @@ def __init__(
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
Expand All @@ -81,13 +78,18 @@ def __init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
None,
trigger_log_level,
thread_name,
cancellation_token,
)

self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = []
if self.threshold <= 0:
raise ValueError("Max queue size must be positive for file upload queues")

self.upload_queue: List[Future] = []
self.errors: List[Exception] = []

self.overwrite_existing = overwrite_existing

self.parallelism = self.cdf_client.config.max_workers
Expand All @@ -100,51 +102,56 @@ def __init__(
self.files_written = FILES_UPLOADER_WRITTEN
self.queue_size = FILES_UPLOADER_QUEUE_SIZE

self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True)

self._full_queue = threading.Condition()

global _QUEUES, _QUEUES_LOCK
with _QUEUES_LOCK:
self._pool = ThreadPoolExecutor(
max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}"
)
_QUEUES += 1

def _remove_done_from_queue(self) -> None:
while not self.cancellation_token.is_set():
with self.lock:
self.upload_queue = list(filter(lambda f: f.running(), self.upload_queue))

self.cancellation_token.wait(5)

def add_io_to_upload_queue(self, file_meta: FileMetadata, read_file: Callable[[], BinaryIO]) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Add file to upload queue. The file will start uploading immedeately. If the size of the queue is larger than
the specified max size, this call will block until it's

Args:
file_meta: File metadata-object
file_name: Path to file to be uploaded.
If none, the file object will still be created, but no data is uploaded
"""
if self.upload_queue_size >= self.threshold:
with self._full_queue:
while not self._full_queue.wait(timeout=2) and not self.cancellation_token.is_set():
pass

with self.lock:
self.upload_queue.append((file_meta, read_file))
self.upload_queue.append(self._pool.submit(self._upload_single, read_file, file_meta))
self.upload_queue_size += 1
self.files_queued.inc()
self.queue_size.set(self.upload_queue_size)

self._check_triggers()

def upload(self) -> None:
def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None:
"""
Trigger an upload of the queue, clears queue afterwards
Wait for all uploads to finish
"""
if len(self.upload_queue) == 0:
return

for future in self.upload_queue:
future.result(timeout=timeout)
with self.lock:
self._upload_batch()

self.files_written.inc(self.upload_queue_size)

try:
self._post_upload([el[0] for el in self.upload_queue])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))
self.upload_queue.clear()
self.logger.info(f"Uploaded {self.upload_queue_size} files")
self.upload_queue_size = 0
self.queue_size.set(self.upload_queue_size)
if fail_on_errors and self.errors:
# There might be more errors, but we can only have one as the cause, so pick the first
raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0]

@retry(
exceptions=(CogniteAPIError, ConnectionError),
Expand All @@ -153,38 +160,45 @@ def upload(self) -> None:
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_single(self, index: int, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
# Upload file
with read_file() as file:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

# Update meta-object in queue
self.upload_queue[index] = (file_meta, read_file)

def _upload_batch(self) -> None:
# Concurrently execute file-uploads

futures: List[Future] = []
for i, (file_meta, file_name) in enumerate(self.upload_queue):
futures.append(self._pool.submit(self._upload_single, i, file_name, file_meta))
for fut in futures:
fut.result()
def _upload_single(self, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
try:
# Upload file
with read_file() as file:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

if self.post_upload_function:
try:
self.post_upload_function([file_meta])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

except Exception as e:
self.logger.exception("Unexpected error while uploading file")
self.errors.append(e)

finally:
with self.lock:
self.files_written.inc()
self.upload_queue_size -= 1
self.queue_size.set(self.upload_queue_size)
with self._full_queue:
self._full_queue.notify()

def __enter__(self) -> "IOFileUploadQueue":
"""
Expand All @@ -195,6 +209,7 @@ def __enter__(self) -> "IOFileUploadQueue":
"""
self.start()
self._pool.__enter__()
self._update_queue_thread.start()
return self

def __exit__(
Expand Down Expand Up @@ -229,9 +244,7 @@ class FileUploadQueue(IOFileUploadQueue):
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
"""
Expand All @@ -252,7 +265,6 @@ def __init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
overwrite_existing,
Expand Down Expand Up @@ -284,9 +296,7 @@ class BytesUploadQueue(IOFileUploadQueue):
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten
Expand All @@ -297,7 +307,6 @@ def __init__(
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
Expand All @@ -307,7 +316,6 @@ def __init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
overwrite_existing,
Expand Down
4 changes: 2 additions & 2 deletions tests/tests_integration/test_cdf_upload_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def test_assets_upload_queue_upsert(self):
assert retrieved[2].name == "new name"

def test_file_upload_queue(self):
queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True)
queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=2)

current_dir = pathlib.Path(__file__).parent.resolve()

Expand All @@ -310,7 +310,7 @@ def test_file_upload_queue(self):
assert file2 == b"other test content\n"

def test_bytes_upload_queue(self):
queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True)
queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=1)

queue.add_to_upload_queue(
content=b"bytes content",
Expand Down
4 changes: 2 additions & 2 deletions tests/tests_unit/test_cdf_upload_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def test_file_uploader(self, MockCogniteClient):
def post(x):
post_upload_test["value"] += 1

queue = FileUploadQueue(client, max_upload_interval=2, post_upload_function=post)
queue = FileUploadQueue(client, max_queue_size=2, post_upload_function=post)
queue.start()

current_dir = pathlib.Path(__file__).parent.parent.resolve()
Expand All @@ -305,7 +305,7 @@ def test_bytes_uploader(self, MockCogniteClient):
def post(x):
post_upload_test["value"] += 1

queue = BytesUploadQueue(client, max_upload_interval=2, post_upload_function=post)
queue = BytesUploadQueue(client, max_queue_size=2, post_upload_function=post)
queue.start()

queue.add_to_upload_queue(b"bytes", FileMetadata(name="example.png"))
Expand Down