diff --git a/CHANGELOG.md b/CHANGELOG.md index db83056d..2b0bc279 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## 7.1.0 + +### Added + + * The file upload queue is now able to stream files larger than 5GiB. + ## 7.0.5 ### Fixed diff --git a/cognite/extractorutils/__init__.py b/cognite/extractorutils/__init__.py index 0fea27f9..aa10c0df 100644 --- a/cognite/extractorutils/__init__.py +++ b/cognite/extractorutils/__init__.py @@ -16,5 +16,5 @@ Cognite extractor utils is a Python package that simplifies the development of new extractors. """ -__version__ = "7.0.5" +__version__ = "7.1.0" from .base import Extractor diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index fa2c0d25..a9ebf4fe 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -14,7 +14,8 @@ import threading from concurrent.futures import Future, ThreadPoolExecutor -from io import BytesIO +from io import BytesIO, RawIOBase +from math import ceil from os import PathLike from types import TracebackType from typing import Any, BinaryIO, Callable, Dict, List, Optional, Tuple, Type, Union @@ -41,6 +42,103 @@ _QUEUES: int = 0 _QUEUES_LOCK: threading.RLock = threading.RLock() +# 5 GiB +_MAX_SINGLE_CHUNK_FILE_SIZE = 5 * 1024 * 1024 * 1024 +# 4000 MiB +_MAX_FILE_CHUNK_SIZE = 4 * 1024 * 1024 * 1000 + + +class ChunkedStream(RawIOBase, BinaryIO): + """ + Wrapper around a read-only stream to allow treating it as a sequence of smaller streams. + + `next_chunk` will return `true` if there is one more chunk, it must be called + before this is treated as a stream the first time, typically in a `while` loop. + + Args: + inner: Stream to wrap. + max_chunk_size: Maximum size per stream chunk. + stream_length: Total (remaining) length of the inner stream. This must be accurate. + """ + + def __init__(self, inner: BinaryIO, max_chunk_size: int, stream_length: int) -> None: + self._inner = inner + self._pos = -1 + self._max_chunk_size = max_chunk_size + self._stream_length = stream_length + self._chunk_index = -1 + self._current_chunk_size = -1 + + def tell(self) -> int: + return self._pos + + # RawIOBase is (stupidly) incompatible with BinaryIO + # Implementing a correct type that inherits from both is impossible, + # but python does so anyway, (ab)using the property that if bytes() and if None + # resolve the same way. These four useless methods with liberal use of Any are + # required to satisfy mypy. + # This may be solvable by changing the typing in the python SDK to use typing.Protocol. + def writelines(self, __lines: Any) -> None: + raise NotImplementedError() + + def write(self, __b: Any) -> int: + raise NotImplementedError() + + def __enter__(self) -> "ChunkedStream": + return super().__enter__() + + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: + return super().__exit__(exc_type, exc_val, exc_tb) + + @property + def chunk_count(self) -> int: + return ceil(self._stream_length / self._max_chunk_size) + + @property + def len(self) -> int: + return len(self) + + @property + def current_chunk(self) -> int: + """ + Current chunk number. + """ + return self._chunk_index + + def __len__(self) -> int: + return self._current_chunk_size + + def readable(self) -> bool: + return True + + def read(self, size: int = -1) -> bytes: + if size < 0: + size = self._current_chunk_size - self._pos + + size = min(size, self._current_chunk_size - self._pos) + if size > 0: + self._pos += size + return self._inner.read(size) + return bytes() + + def next_chunk(self) -> bool: + """ + Step into the next chunk, letting this be read as a stream again. + + Returns `False` if the stream is exhausted. + """ + if self._chunk_index >= self.chunk_count - 1: + return False + + self._chunk_index += 1 + inner_pos = self._inner.tell() + self._current_chunk_size = min(self._max_chunk_size, self._stream_length - inner_pos) + self._pos = 0 + + return True + class IOFileUploadQueue(AbstractUploadQueue): """ @@ -100,6 +198,9 @@ def __init__( self.files_written = FILES_UPLOADER_WRITTEN self.queue_size = FILES_UPLOADER_QUEUE_SIZE + self.max_single_chunk_file_size = _MAX_SINGLE_CHUNK_FILE_SIZE + self.max_file_chunk_size = _MAX_FILE_CHUNK_SIZE + self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True) self._full_queue = threading.Condition() @@ -160,12 +261,33 @@ def _upload_single(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) - file_meta, _url = self.cdf_client.files.create( file_metadata=file_meta, overwrite=self.overwrite_existing ) - elif size > 5 * 1024 * 1024 * 1024: - # File bigger than 5Gb - self.logger.warning( - f"File {file_meta.external_id} is larger than 5GiB, file will not be uploaded" + elif size >= self.max_single_chunk_file_size: + # The minimum chunk size is 4000MiB. + chunks = ChunkedStream(file, self.max_file_chunk_size, size) + self.logger.debug( + f"File {file_meta.external_id} is larger than 5GiB ({size})" + f", uploading in {chunks.chunk_count} chunks" ) - file_meta = FileMetadata() + with self.cdf_client.files.multipart_upload_session( + file_meta.name if file_meta.name is not None else "", + parts=chunks.chunk_count, + overwrite=self.overwrite_existing, + external_id=file_meta.external_id, + source=file_meta.source, + mime_type=file_meta.mime_type, + metadata=file_meta.metadata, + directory=file_meta.directory, + asset_ids=file_meta.asset_ids, + data_set_id=file_meta.data_set_id, + labels=file_meta.labels, + geo_location=file_meta.geo_location, + source_created_time=file_meta.source_created_time, + source_modified_time=file_meta.source_modified_time, + security_categories=file_meta.security_categories, + ) as session: + while chunks.next_chunk(): + session.upload_part(chunks.current_chunk, chunks) + file_meta = session.file_metadata else: file_meta = self.cdf_client.files.upload_bytes( file, diff --git a/pyproject.toml b/pyproject.toml index 54cd8f6e..6d434a5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognite-extractor-utils" -version = "7.0.5" +version = "7.1.0" description = "Utilities for easier development of extractors for CDF" authors = ["Mathias Lohne "] license = "Apache-2.0" @@ -51,7 +51,7 @@ exclude = "tests/*" [tool.poetry.dependencies] python = "^3.8.0" -cognite-sdk = "^7" +cognite-sdk = "^7.28.1" prometheus-client = ">0.7.0, <=1.0.0" arrow = "^1.0.0" pyyaml = ">=5.3.0, <7" diff --git a/tests/tests_integration/test_cdf_upload_integration.py b/tests/tests_integration/test_cdf_upload_integration.py index c95b0759..19c884d1 100644 --- a/tests/tests_integration/test_cdf_upload_integration.py +++ b/tests/tests_integration/test_cdf_upload_integration.py @@ -60,6 +60,7 @@ class IntegrationTests(unittest.TestCase): file1: str = f"util_integration_file_test_1-{test_id}" file2: str = f"util_integration_file_test_2-{test_id}" + bigfile: str = f"util_integration_file-big-{test_id}" empty_file: str = f"util_integration_file_test_3-{test_id}" def setUp(self): @@ -114,7 +115,7 @@ def tearDown(self): self.client.events.delete(external_id=[self.event1, self.event2, self.event3], ignore_unknown_ids=True) self.client.assets.delete(external_id=[self.asset1, self.asset2, self.asset3], ignore_unknown_ids=True) # No ignore_unknown_ids in files, so we need to delete them one at a time - for file in [self.file1, self.file2]: + for file in [self.file1, self.file2, self.bigfile, self.empty_file]: try: self.client.files.delete(external_id=file) except CogniteNotFoundError: @@ -336,3 +337,24 @@ def test_bytes_upload_queue(self): assert file1 == b"bytes content" assert file2 == b"other bytes content" + + def test_big_file_upload_queue(self): + queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=1) + queue.max_file_chunk_size = 6_000_000 + queue.max_single_chunk_file_size = 6_000_000 + + content = b"large" * 2_000_000 + + queue.add_to_upload_queue(content=content, metadata=FileMetadata(external_id=self.bigfile, name=self.bigfile)) + + queue.upload() + + for _ in range(10): + file = self.client.files.retrieve(external_id=self.bigfile) + if file.uploaded: + break + time.sleep(1) + + bigfile = self.client.files.download_bytes(external_id=self.bigfile) + + assert len(bigfile) == 10_000_000