diff --git a/cognite/extractorutils/uploader/__init__.py b/cognite/extractorutils/uploader/__init__.py index 50ce6560..2d3eeb85 100644 --- a/cognite/extractorutils/uploader/__init__.py +++ b/cognite/extractorutils/uploader/__init__.py @@ -66,7 +66,7 @@ from .assets import AssetUploadQueue from .events import EventUploadQueue -from .files import BytesUploadQueue, FileUploadQueue +from .files import BytesUploadQueue, FileUploadQueue, IOFileUploadQueue from .raw import RawUploadQueue from .time_series import ( DataPoint, diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index c6e2a434..508d3218 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -13,10 +13,11 @@ # limitations under the License. import threading -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor +from io import BytesIO from os import PathLike from types import TracebackType -from typing import Any, Callable, List, Optional, Tuple, Type, Union +from typing import Any, BinaryIO, Callable, List, Optional, Tuple, Type, Union from requests import ConnectionError @@ -31,9 +32,6 @@ AbstractUploadQueue, ) from cognite.extractorutils.uploader._metrics import ( - BYTES_UPLOADER_QUEUE_SIZE, - BYTES_UPLOADER_QUEUED, - BYTES_UPLOADER_WRITTEN, FILES_UPLOADER_QUEUE_SIZE, FILES_UPLOADER_QUEUED, FILES_UPLOADER_WRITTEN, @@ -41,9 +39,12 @@ from cognite.extractorutils.util import retry -class FileUploadQueue(AbstractUploadQueue): +class IOFileUploadQueue(AbstractUploadQueue): """ - Upload queue for files + Upload queue for files using BinaryIO + + Note that if the upload fails, the stream needs to be restarted, so + the enqueued callback needs to produce a new IO object for each call. Args: cdf_client: Cognite Data Fusion client to use @@ -78,14 +79,14 @@ def __init__( cancellation_token, ) - self.upload_queue: List[Tuple[FileMetadata, Union[str, PathLike]]] = [] + self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = [] self.overwrite_existing = overwrite_existing self.files_queued = FILES_UPLOADER_QUEUED self.files_written = FILES_UPLOADER_WRITTEN self.queue_size = FILES_UPLOADER_QUEUE_SIZE - def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, PathLike]) -> None: + def add_io_to_upload_queue(self, file_meta: FileMetadata, read_file: Callable[[], BinaryIO]) -> None: """ Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__. @@ -96,7 +97,7 @@ def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, Pat If none, the file object will still be created, but no data is uploaded """ with self.lock: - self.upload_queue.append((file_meta, file_name)) + self.upload_queue.append((file_meta, read_file)) self.upload_queue_size += 1 self.files_queued.inc() self.queue_size.set(self.upload_queue_size) @@ -131,21 +132,41 @@ def upload(self) -> None: max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) - def _upload_single(self, index: int, file_name: Union[str, PathLike], file_meta: FileMetadata) -> None: + def _upload_single(self, index: int, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None: # Upload file - file_meta = self.cdf_client.files.upload(str(file_name), overwrite=self.overwrite_existing, **file_meta.dump()) # type: ignore + with read_file() as file: + file_meta = self.cdf_client.files.upload_bytes( + file, + file_meta.name if file_meta.name is not None else "", + overwrite=self.overwrite_existing, + external_id=file_meta.external_id, + source=file_meta.source, + mime_type=file_meta.mime_type, + metadata=file_meta.metadata, + directory=file_meta.directory, + asset_ids=file_meta.asset_ids, + data_set_id=file_meta.data_set_id, + labels=file_meta.labels, + geo_location=file_meta.geo_location, + source_created_time=file_meta.source_created_time, + source_modified_time=file_meta.source_modified_time, + security_categories=file_meta.security_categories, + ) # Update meta-object in queue - self.upload_queue[index] = (file_meta, file_name) + self.upload_queue[index] = (file_meta, read_file) def _upload_batch(self) -> None: # Concurrently execute file-uploads + futures: List[Future] = [] with ThreadPoolExecutor(self.cdf_client.config.max_workers) as pool: for i, (file_meta, file_name) in enumerate(self.upload_queue): - pool.submit(self._upload_single, i, file_name, file_meta) + futures.append(pool.submit(self._upload_single, i, file_name, file_meta)) + for fut in futures: + fut.result(0.0) - def __enter__(self) -> "FileUploadQueue": + def __enter__(self) -> "IOFileUploadQueue": """ Wraps around start method, for use as context manager @@ -178,9 +199,9 @@ def __len__(self) -> int: return self.upload_queue_size -class BytesUploadQueue(AbstractUploadQueue): +class FileUploadQueue(IOFileUploadQueue): """ - Upload queue for bytes + Upload queue for files Args: cdf_client: Cognite Data Fusion client to use @@ -191,20 +212,20 @@ class BytesUploadQueue(AbstractUploadQueue): methods). trigger_log_level: Log level to log upload triggers to. thread_name: Thread name of uploader thread. - overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten """ def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[Any]], None]] = None, + post_upload_function: Optional[Callable[[List[Event]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, overwrite_existing: bool = False, cancellation_token: threading.Event = threading.Event(), - ) -> None: + ): + # Super sets post_upload and threshold super().__init__( cdf_client, post_upload_function, @@ -212,105 +233,75 @@ def __init__( max_upload_interval, trigger_log_level, thread_name, + overwrite_existing, cancellation_token, ) - self.upload_queue: List[Tuple[bytes, FileMetadata]] = [] - self.overwrite_existing = overwrite_existing - self.upload_queue_size = 0 - self.bytes_queued = BYTES_UPLOADER_QUEUED - self.queue_size = BYTES_UPLOADER_QUEUE_SIZE - self.bytes_written = BYTES_UPLOADER_WRITTEN - - def add_to_upload_queue(self, content: bytes, metadata: FileMetadata) -> None: + def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, PathLike]) -> None: """ - Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold + Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__. - Args: - content: bytes object to upload - metadata: metadata for the given bytes object - """ - with self.lock: - self.upload_queue.append((content, metadata)) - self.upload_queue_size += 1 - self.bytes_queued.inc() - self.queue_size.set(self.upload_queue_size) - def upload(self) -> None: - """ - Trigger an upload of the queue, clears queue afterwards + Args: + file_meta: File metadata-object + file_name: Path to file to be uploaded. + If none, the file object will still be created, but no data is uploaded """ - if len(self.upload_queue) == 0: - return - with self.lock: - # Upload frames in batches - self._upload_batch() + def load_file_from_path() -> BinaryIO: + return open(file_name, "rb") - # Log stats - self.bytes_written.inc(self.upload_queue_size) + self.add_io_to_upload_queue(file_meta, load_file_from_path) - try: - self._post_upload(self.upload_queue) - except Exception as e: - self.logger.error("Error in upload callback: %s", str(e)) - # Clear queue - self.upload_queue.clear() - self.upload_queue_size = 0 - self.logger.info(f"Uploaded {self.upload_queue_size} files") - self.queue_size.set(self.upload_queue_size) +class BytesUploadQueue(IOFileUploadQueue): + """ + Upload queue for bytes - def _upload_batch(self) -> None: - # Concurrently execute bytes-uploads - with ThreadPoolExecutor(self.cdf_client.config.max_workers) as pool: - for i, (frame, metadata) in enumerate(self.upload_queue): - pool.submit(self._upload_single, i, frame, metadata) + Args: + cdf_client: Cognite Data Fusion client to use + post_upload_function: A function that will be called after each upload. The function will be given one argument: + A list of the events that were uploaded. + max_queue_size: Maximum size of upload queue. Defaults to no max size. + max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop + methods). + trigger_log_level: Log level to log upload triggers to. + thread_name: Thread name of uploader thread. + overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten + """ - @retry( - exceptions=(CogniteAPIError, ConnectionError), - tries=RETRIES, - delay=RETRY_DELAY, - max_delay=RETRY_MAX_DELAY, - backoff=RETRY_BACKOFF_FACTOR, - ) - def _upload_single(self, index: int, content: bytes, metadata: FileMetadata) -> None: - # Upload object - file_meta_data: FileMetadata = self.cdf_client.files.upload_bytes( - content, overwrite=self.overwrite_existing, **metadata.dump() + def __init__( + self, + cdf_client: CogniteClient, + post_upload_function: Optional[Callable[[List[Any]], None]] = None, + max_queue_size: Optional[int] = None, + max_upload_interval: Optional[int] = None, + trigger_log_level: str = "DEBUG", + thread_name: Optional[str] = None, + overwrite_existing: bool = False, + cancellation_token: threading.Event = threading.Event(), + ) -> None: + super().__init__( + cdf_client, + post_upload_function, + max_queue_size, + max_upload_interval, + trigger_log_level, + thread_name, + overwrite_existing, + cancellation_token, ) - # Update meta-object in queue - self.upload_queue[index] = (content, file_meta_data) - - def __enter__(self) -> "BytesUploadQueue": - """ - Wraps around start method, for use as context manager - - Returns: - self - """ - self.start() - return self - - def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] - ) -> None: + def add_to_upload_queue(self, content: bytes, metadata: FileMetadata) -> None: """ - Wraps around stop method, for use as context manager - + Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold + specified in the __init__. Args: - exc_type: Exception type - exc_val: Exception value - exc_tb: Traceback + content: bytes object to upload + metadata: metadata for the given bytes object """ - self.stop() - def __len__(self) -> int: - """ - The size of the upload queue + def get_byte_io() -> BinaryIO: + return BytesIO(content) - Returns: - Number of events in queue - """ - return self.upload_queue_size + self.add_io_to_upload_queue(metadata, get_byte_io) diff --git a/tests/tests_integration/test_cdf_upload_integration.py b/tests/tests_integration/test_cdf_upload_integration.py index 03229b2c..2ec47129 100644 --- a/tests/tests_integration/test_cdf_upload_integration.py +++ b/tests/tests_integration/test_cdf_upload_integration.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import pathlib import random import string import time @@ -24,12 +25,13 @@ from cognite.client import CogniteClient from cognite.client.config import ClientConfig from cognite.client.credentials import OAuthClientCredentials -from cognite.client.data_classes import Event, Row, TimeSeries +from cognite.client.data_classes import Event, FileMetadata, Row, TimeSeries from cognite.client.data_classes.assets import Asset from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError from cognite.extractorutils.uploader import RawUploadQueue, TimeSeriesUploadQueue from cognite.extractorutils.uploader.assets import AssetUploadQueue from cognite.extractorutils.uploader.events import EventUploadQueue +from cognite.extractorutils.uploader.files import BytesUploadQueue, FileUploadQueue test_id = random.randint(0, 2**31) @@ -56,6 +58,9 @@ class IntegrationTests(unittest.TestCase): asset2: str = f"util_integration_asset_test_2-{test_id}" asset3: str = f"util_integration_asset_test_3-{test_id}" + file1: str = f"util_integration_file_test_1-{test_id}" + file2: str = f"util_integration_file_test_2-{test_id}" + def setUp(self): os.environ["COGNITE_FUNCTION_RUNTIME"] = self.functions_runtime cognite_project = os.environ["COGNITE_PROJECT"] @@ -92,6 +97,13 @@ def setUp(self): except CogniteNotFoundError: pass + # No ignore_unknown_ids in files, so we need to delete them one at a time + for file in [self.file1, self.file2]: + try: + self.client.files.delete(external_id=file) + except CogniteNotFoundError: + pass + def tearDown(self): try: self.client.raw.tables.delete(self.database_name, self.table_name) @@ -100,6 +112,12 @@ def tearDown(self): self.client.time_series.delete(external_id=[self.time_series1, self.time_series2], ignore_unknown_ids=True) self.client.events.delete(external_id=[self.event1, self.event2, self.event3], ignore_unknown_ids=True) self.client.assets.delete(external_id=[self.asset1, self.asset2, self.asset3], ignore_unknown_ids=True) + # No ignore_unknown_ids in files, so we need to delete them one at a time + for file in [self.file1, self.file2]: + try: + self.client.files.delete(external_id=file) + except CogniteNotFoundError: + pass def test_raw_upload_queue(self): queue = RawUploadQueue(cdf_client=self.client, max_queue_size=500) @@ -267,3 +285,46 @@ def test_assets_upload_queue_upsert(self): assert retrieved[2].description == "new desc" assert retrieved[1].name == "new name" assert retrieved[2].name == "new name" + + def test_file_upload_queue(self): + queue = FileUploadQueue(cdf_client=self.client, overwrite_existing=True) + + current_dir = pathlib.Path(__file__).parent.resolve() + + # Upload a pair of actual files + queue.add_to_upload_queue( + file_meta=FileMetadata(external_id=self.file1, name=self.file1), + file_name=current_dir.joinpath("test_file_1.txt"), + ) + queue.add_to_upload_queue( + file_meta=FileMetadata(external_id=self.file2, name=self.file2), + file_name=current_dir.joinpath("test_file_2.txt"), + ) + + queue.upload() + + file1 = self.client.files.download_bytes(external_id=self.file1) + file2 = self.client.files.download_bytes(external_id=self.file2) + + assert file1 == b"test content\n" + assert file2 == b"other test content\n" + + def test_bytes_upload_queue(self): + queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True) + + queue.add_to_upload_queue( + content=b"bytes content", + metadata=FileMetadata(external_id=self.file1, name=self.file1), + ) + queue.add_to_upload_queue( + content=b"other bytes content", + metadata=FileMetadata(external_id=self.file2, name=self.file2), + ) + + queue.upload() + + file1 = self.client.files.download_bytes(external_id=self.file1) + file2 = self.client.files.download_bytes(external_id=self.file2) + + assert file1 == b"bytes content" + assert file2 == b"other bytes content" diff --git a/tests/tests_integration/test_file_1.txt b/tests/tests_integration/test_file_1.txt new file mode 100644 index 00000000..d670460b --- /dev/null +++ b/tests/tests_integration/test_file_1.txt @@ -0,0 +1 @@ +test content diff --git a/tests/tests_integration/test_file_2.txt b/tests/tests_integration/test_file_2.txt new file mode 100644 index 00000000..19290bff --- /dev/null +++ b/tests/tests_integration/test_file_2.txt @@ -0,0 +1 @@ +other test content diff --git a/tests/tests_unit/test_cdf_upload_queues.py b/tests/tests_unit/test_cdf_upload_queues.py index ffd39b68..47205942 100644 --- a/tests/tests_unit/test_cdf_upload_queues.py +++ b/tests/tests_unit/test_cdf_upload_queues.py @@ -14,6 +14,7 @@ import datetime import math +import pathlib import time import unittest from unittest.mock import patch @@ -284,7 +285,11 @@ def post(x): queue = FileUploadQueue(client, max_upload_interval=2, post_upload_function=post) queue.start() - queue.add_to_upload_queue(FileMetadata(name="hello.txt"), None) + current_dir = pathlib.Path(__file__).parent.parent.resolve() + + queue.add_to_upload_queue( + FileMetadata(name="hello.txt"), current_dir.joinpath("tests_integration/test_file_1.txt") + ) time.sleep(2.1)