diff --git a/CHANGELOG.md b/CHANGELOG.md index a19ab27b..ef280720 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,17 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## Next + +### Changed + + * The file upload queues have changed behaviour. + - Instead of waiting to upload until a set of conditions, it starts uploading immedeately. + - The `upload()` method now acts more like a `join`, wating on all the uploads in the queue to complete before returning. + - A call to `add_to_upload_queue` when the queue is full will hang until the queue is no longer full before returning, instead of triggering and upload and hanging until everything is uploaded. + - The queues now require to be set up with a max size. The max upload latencey is removed. + As long as you use the queue in as a context (ie, using `with FileUploadQueue(...) as queue:`) you should not have to change anything in your code. The behaviour of the queue will change, it will most likely be much faster, but it will not require any changes from you as a user of the queue. + ## [6.4.1] ### Changed diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 90d69a4d..8221e01f 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -17,7 +17,7 @@ from io import BytesIO from os import PathLike from types import TracebackType -from typing import BinaryIO, Callable, List, Optional, Tuple, Type, Union +from typing import BinaryIO, Callable, List, Optional, Type, Union from requests import ConnectionError @@ -53,9 +53,7 @@ class IOFileUploadQueue(AbstractUploadQueue): cdf_client: Cognite Data Fusion client to use post_upload_function: A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded. - max_queue_size: Maximum size of upload queue. Defaults to no max size. - max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop - methods). + max_queue_size: Maximum size of upload queue. trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. max_parallelism: Maximum number of parallel uploads. If this is greater than 0, @@ -69,7 +67,6 @@ def __init__( cdf_client: CogniteClient, post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None, max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, overwrite_existing: bool = False, @@ -81,13 +78,18 @@ def __init__( cdf_client, post_upload_function, max_queue_size, - max_upload_interval, + None, trigger_log_level, thread_name, cancellation_token, ) - self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = [] + if self.threshold <= 0: + raise ValueError("Max queue size must be positive for file upload queues") + + self.upload_queue: List[Future] = [] + self.errors: List[Exception] = [] + self.overwrite_existing = overwrite_existing self.parallelism = self.cdf_client.config.max_workers @@ -100,6 +102,10 @@ def __init__( self.files_written = FILES_UPLOADER_WRITTEN self.queue_size = FILES_UPLOADER_QUEUE_SIZE + self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True) + + self._full_queue = threading.Condition() + global _QUEUES, _QUEUES_LOCK with _QUEUES_LOCK: self._pool = ThreadPoolExecutor( @@ -107,44 +113,45 @@ def __init__( ) _QUEUES += 1 + def _remove_done_from_queue(self) -> None: + while not self.cancellation_token.is_set(): + with self.lock: + self.upload_queue = list(filter(lambda f: f.running(), self.upload_queue)) + + self.cancellation_token.wait(5) + def add_io_to_upload_queue(self, file_meta: FileMetadata, read_file: Callable[[], BinaryIO]) -> None: """ - Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold - specified in the __init__. + Add file to upload queue. The file will start uploading immedeately. If the size of the queue is larger than + the specified max size, this call will block until it's Args: file_meta: File metadata-object file_name: Path to file to be uploaded. If none, the file object will still be created, but no data is uploaded """ + if self.upload_queue_size >= self.threshold: + with self._full_queue: + while not self._full_queue.wait(timeout=2) and not self.cancellation_token.is_set(): + pass + with self.lock: - self.upload_queue.append((file_meta, read_file)) + self.upload_queue.append(self._pool.submit(self._upload_single, read_file, file_meta)) self.upload_queue_size += 1 self.files_queued.inc() self.queue_size.set(self.upload_queue_size) - self._check_triggers() - - def upload(self) -> None: + def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None: """ - Trigger an upload of the queue, clears queue afterwards + Wait for all uploads to finish """ - if len(self.upload_queue) == 0: - return - + for future in self.upload_queue: + future.result(timeout=timeout) with self.lock: - self._upload_batch() - - self.files_written.inc(self.upload_queue_size) - - try: - self._post_upload([el[0] for el in self.upload_queue]) - except Exception as e: - self.logger.error("Error in upload callback: %s", str(e)) - self.upload_queue.clear() - self.logger.info(f"Uploaded {self.upload_queue_size} files") - self.upload_queue_size = 0 self.queue_size.set(self.upload_queue_size) + if fail_on_errors and self.errors: + # There might be more errors, but we can only have one as the cause, so pick the first + raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0] @retry( exceptions=(CogniteAPIError, ConnectionError), @@ -153,38 +160,45 @@ def upload(self) -> None: max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) - def _upload_single(self, index: int, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None: - # Upload file - with read_file() as file: - file_meta = self.cdf_client.files.upload_bytes( - file, - file_meta.name if file_meta.name is not None else "", - overwrite=self.overwrite_existing, - external_id=file_meta.external_id, - source=file_meta.source, - mime_type=file_meta.mime_type, - metadata=file_meta.metadata, - directory=file_meta.directory, - asset_ids=file_meta.asset_ids, - data_set_id=file_meta.data_set_id, - labels=file_meta.labels, - geo_location=file_meta.geo_location, - source_created_time=file_meta.source_created_time, - source_modified_time=file_meta.source_modified_time, - security_categories=file_meta.security_categories, - ) - - # Update meta-object in queue - self.upload_queue[index] = (file_meta, read_file) - - def _upload_batch(self) -> None: - # Concurrently execute file-uploads - - futures: List[Future] = [] - for i, (file_meta, file_name) in enumerate(self.upload_queue): - futures.append(self._pool.submit(self._upload_single, i, file_name, file_meta)) - for fut in futures: - fut.result() + def _upload_single(self, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None: + try: + # Upload file + with read_file() as file: + file_meta = self.cdf_client.files.upload_bytes( + file, + file_meta.name if file_meta.name is not None else "", + overwrite=self.overwrite_existing, + external_id=file_meta.external_id, + source=file_meta.source, + mime_type=file_meta.mime_type, + metadata=file_meta.metadata, + directory=file_meta.directory, + asset_ids=file_meta.asset_ids, + data_set_id=file_meta.data_set_id, + labels=file_meta.labels, + geo_location=file_meta.geo_location, + source_created_time=file_meta.source_created_time, + source_modified_time=file_meta.source_modified_time, + security_categories=file_meta.security_categories, + ) + + if self.post_upload_function: + try: + self.post_upload_function([file_meta]) + except Exception as e: + self.logger.error("Error in upload callback: %s", str(e)) + + except Exception as e: + self.logger.exception("Unexpected error while uploading file") + self.errors.append(e) + + finally: + with self.lock: + self.files_written.inc() + self.upload_queue_size -= 1 + self.queue_size.set(self.upload_queue_size) + with self._full_queue: + self._full_queue.notify() def __enter__(self) -> "IOFileUploadQueue": """ @@ -195,6 +209,7 @@ def __enter__(self) -> "IOFileUploadQueue": """ self.start() self._pool.__enter__() + self._update_queue_thread.start() return self def __exit__( @@ -229,9 +244,7 @@ class FileUploadQueue(IOFileUploadQueue): cdf_client: Cognite Data Fusion client to use post_upload_function: A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded. - max_queue_size: Maximum size of upload queue. Defaults to no max size. - max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop - methods). + max_queue_size: Maximum size of upload queue. trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. """ @@ -252,7 +265,6 @@ def __init__( cdf_client, post_upload_function, max_queue_size, - max_upload_interval, trigger_log_level, thread_name, overwrite_existing, @@ -284,9 +296,7 @@ class BytesUploadQueue(IOFileUploadQueue): cdf_client: Cognite Data Fusion client to use post_upload_function: A function that will be called after each upload. The function will be given one argument: A list of the events that were uploaded. - max_queue_size: Maximum size of upload queue. Defaults to no max size. - max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop - methods). + max_queue_size: Maximum size of upload queue. trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten @@ -297,7 +307,6 @@ def __init__( cdf_client: CogniteClient, post_upload_function: Optional[Callable[[List[FileMetadata]], None]] = None, max_queue_size: Optional[int] = None, - max_upload_interval: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, overwrite_existing: bool = False, @@ -307,7 +316,6 @@ def __init__( cdf_client, post_upload_function, max_queue_size, - max_upload_interval, trigger_log_level, thread_name, overwrite_existing, diff --git a/tests/tests_integration/test_cdf_upload_integration.py b/tests/tests_integration/test_cdf_upload_integration.py index 2ec47129..68e4333a 100644 --- a/tests/tests_integration/test_cdf_upload_integration.py +++ b/tests/tests_integration/test_cdf_upload_integration.py @@ -287,7 +287,7 @@ def test_assets_upload_queue_upsert(self): assert retrieved[2].name == "new name" def test_file_upload_queue(self): - queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True) + queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=2) current_dir = pathlib.Path(__file__).parent.resolve() @@ -310,7 +310,7 @@ def test_file_upload_queue(self): assert file2 == b"other test content\n" def test_bytes_upload_queue(self): - queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True) + queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=1) queue.add_to_upload_queue( content=b"bytes content", diff --git a/tests/tests_unit/test_cdf_upload_queues.py b/tests/tests_unit/test_cdf_upload_queues.py index 47205942..49f3b59c 100644 --- a/tests/tests_unit/test_cdf_upload_queues.py +++ b/tests/tests_unit/test_cdf_upload_queues.py @@ -282,7 +282,7 @@ def test_file_uploader(self, MockCogniteClient): def post(x): post_upload_test["value"] += 1 - queue = FileUploadQueue(client, max_upload_interval=2, post_upload_function=post) + queue = FileUploadQueue(client, max_queue_size=2, post_upload_function=post) queue.start() current_dir = pathlib.Path(__file__).parent.parent.resolve() @@ -305,7 +305,7 @@ def test_bytes_uploader(self, MockCogniteClient): def post(x): post_upload_test["value"] += 1 - queue = BytesUploadQueue(client, max_upload_interval=2, post_upload_function=post) + queue = BytesUploadQueue(client, max_queue_size=2, post_upload_function=post) queue.start() queue.add_to_upload_queue(b"bytes", FileMetadata(name="example.png"))