Skip to content

Commit

Permalink
Change file upload queue to eager uploads (#293)
Browse files Browse the repository at this point in the history
There is no real need for the file extractor to batch uploads like the
other queue, in fact it makes it slower and worse.

This changes the file upload queues in some significant ways:

 - Instead of waiting to upload until a set of conditions, it starts
   uploading immediately.
 - The `upload()` method now acts more like a `join`, waiting on all the
   uploads in the queue to complete before returning.
 - A call to `add_to_upload_queue` when the queue is full will hang
   until the queue is no longer full before returning, instead of
   triggering and upload and hanging until everything is uploaded.
 - We require a max size, and remove the max wait time

As long as you use the queue in as a context, ie using
``` python
with FileUploadQueue(...) as queue:
```
and don't use time uploads, you should not have to change anything in
your code. The behaviour of the queue will change, it will most likely
be much faster, but it will not require any changes from you as a user
of the queue.

I propose considering this a breaking change, and bumping the major
version on next release, even though most users should be able to just
update without changing their code. It's still a big change in the
behaviour of the queue.
  • Loading branch information
mathialo authored Feb 15, 2024
1 parent 662593d commit 9810600
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 73 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## Next

### Changed

* The file upload queues have changed behaviour.
- Instead of waiting to upload until a set of conditions, it starts uploading immedeately.
- The `upload()` method now acts more like a `join`, wating on all the uploads in the queue to complete before returning.
- A call to `add_to_upload_queue` when the queue is full will hang until the queue is no longer full before returning, instead of triggering and upload and hanging until everything is uploaded.
- The queues now require to be set up with a max size. The max upload latencey is removed.
As long as you use the queue in as a context (ie, using `with FileUploadQueue(...) as queue:`) you should not have to change anything in your code. The behaviour of the queue will change, it will most likely be much faster, but it will not require any changes from you as a user of the queue.

## [6.4.1]

### Changed
Expand Down
146 changes: 77 additions & 69 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from io import BytesIO
from os import PathLike
from types import TracebackType
from typing import BinaryIO, Callable, List, Optional, Tuple, Type, Union
from typing import BinaryIO, Callable, List, Optional, Type, Union

from requests import ConnectionError

Expand Down Expand Up @@ -53,9 +53,7 @@ class IOFileUploadQueue(AbstractUploadQueue):
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
max_parallelism: Maximum number of parallel uploads. If this is greater than 0,
Expand All @@ -69,7 +67,6 @@ def __init__(
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
Expand All @@ -81,13 +78,18 @@ def __init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
None,
trigger_log_level,
thread_name,
cancellation_token,
)

self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = []
if self.threshold <= 0:
raise ValueError("Max queue size must be positive for file upload queues")

self.upload_queue: List[Future] = []
self.errors: List[Exception] = []

self.overwrite_existing = overwrite_existing

self.parallelism = self.cdf_client.config.max_workers
Expand All @@ -100,51 +102,56 @@ def __init__(
self.files_written = FILES_UPLOADER_WRITTEN
self.queue_size = FILES_UPLOADER_QUEUE_SIZE

self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True)

self._full_queue = threading.Condition()

global _QUEUES, _QUEUES_LOCK
with _QUEUES_LOCK:
self._pool = ThreadPoolExecutor(
max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}"
)
_QUEUES += 1

def _remove_done_from_queue(self) -> None:
while not self.cancellation_token.is_set():
with self.lock:
self.upload_queue = list(filter(lambda f: f.running(), self.upload_queue))

self.cancellation_token.wait(5)

def add_io_to_upload_queue(self, file_meta: FileMetadata, read_file: Callable[[], BinaryIO]) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Add file to upload queue. The file will start uploading immedeately. If the size of the queue is larger than
the specified max size, this call will block until it's
Args:
file_meta: File metadata-object
file_name: Path to file to be uploaded.
If none, the file object will still be created, but no data is uploaded
"""
if self.upload_queue_size >= self.threshold:
with self._full_queue:
while not self._full_queue.wait(timeout=2) and not self.cancellation_token.is_set():
pass

with self.lock:
self.upload_queue.append((file_meta, read_file))
self.upload_queue.append(self._pool.submit(self._upload_single, read_file, file_meta))
self.upload_queue_size += 1
self.files_queued.inc()
self.queue_size.set(self.upload_queue_size)

self._check_triggers()

def upload(self) -> None:
def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None:
"""
Trigger an upload of the queue, clears queue afterwards
Wait for all uploads to finish
"""
if len(self.upload_queue) == 0:
return

for future in self.upload_queue:
future.result(timeout=timeout)
with self.lock:
self._upload_batch()

self.files_written.inc(self.upload_queue_size)

try:
self._post_upload([el[0] for el in self.upload_queue])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))
self.upload_queue.clear()
self.logger.info(f"Uploaded {self.upload_queue_size} files")
self.upload_queue_size = 0
self.queue_size.set(self.upload_queue_size)
if fail_on_errors and self.errors:
# There might be more errors, but we can only have one as the cause, so pick the first
raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0]

@retry(
exceptions=(CogniteAPIError, ConnectionError),
Expand All @@ -153,38 +160,45 @@ def upload(self) -> None:
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_single(self, index: int, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
# Upload file
with read_file() as file:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

# Update meta-object in queue
self.upload_queue[index] = (file_meta, read_file)

def _upload_batch(self) -> None:
# Concurrently execute file-uploads

futures: List[Future] = []
for i, (file_meta, file_name) in enumerate(self.upload_queue):
futures.append(self._pool.submit(self._upload_single, i, file_name, file_meta))
for fut in futures:
fut.result()
def _upload_single(self, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
try:
# Upload file
with read_file() as file:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

if self.post_upload_function:
try:
self.post_upload_function([file_meta])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

except Exception as e:
self.logger.exception("Unexpected error while uploading file")
self.errors.append(e)

finally:
with self.lock:
self.files_written.inc()
self.upload_queue_size -= 1
self.queue_size.set(self.upload_queue_size)
with self._full_queue:
self._full_queue.notify()

def __enter__(self) -> "IOFileUploadQueue":
"""
Expand All @@ -195,6 +209,7 @@ def __enter__(self) -> "IOFileUploadQueue":
"""
self.start()
self._pool.__enter__()
self._update_queue_thread.start()
return self

def __exit__(
Expand Down Expand Up @@ -229,9 +244,7 @@ class FileUploadQueue(IOFileUploadQueue):
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
"""
Expand All @@ -252,7 +265,6 @@ def __init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
overwrite_existing,
Expand Down Expand Up @@ -284,9 +296,7 @@ class BytesUploadQueue(IOFileUploadQueue):
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
max_queue_size: Maximum size of upload queue.
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten
Expand All @@ -297,7 +307,6 @@ def __init__(
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
Expand All @@ -307,7 +316,6 @@ def __init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
overwrite_existing,
Expand Down
4 changes: 2 additions & 2 deletions tests/tests_integration/test_cdf_upload_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def test_assets_upload_queue_upsert(self):
assert retrieved[2].name == "new name"

def test_file_upload_queue(self):
queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True)
queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=2)

current_dir = pathlib.Path(__file__).parent.resolve()

Expand All @@ -310,7 +310,7 @@ def test_file_upload_queue(self):
assert file2 == b"other test content\n"

def test_bytes_upload_queue(self):
queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True)
queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=1)

queue.add_to_upload_queue(
content=b"bytes content",
Expand Down
4 changes: 2 additions & 2 deletions tests/tests_unit/test_cdf_upload_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def test_file_uploader(self, MockCogniteClient):
def post(x):
post_upload_test["value"] += 1

queue = FileUploadQueue(client, max_upload_interval=2, post_upload_function=post)
queue = FileUploadQueue(client, max_queue_size=2, post_upload_function=post)
queue.start()

current_dir = pathlib.Path(__file__).parent.parent.resolve()
Expand All @@ -305,7 +305,7 @@ def test_bytes_uploader(self, MockCogniteClient):
def post(x):
post_upload_test["value"] += 1

queue = BytesUploadQueue(client, max_upload_interval=2, post_upload_function=post)
queue = BytesUploadQueue(client, max_queue_size=2, post_upload_function=post)
queue.start()

queue.add_to_upload_queue(b"bytes", FileMetadata(name="example.png"))
Expand Down

0 comments on commit 9810600

Please sign in to comment.