Skip to content

Commit

Permalink
Support for files larger than 5GiB (#310)
Browse files Browse the repository at this point in the history
* Add support for files larger than 5GiB

* Bump version

* Make mypy happy

* Require higher sdk version
  • Loading branch information
einarmo authored Mar 13, 2024
1 parent 2a754f3 commit cdd52fd
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 10 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.1.0

### Added

* The file upload queue is now able to stream files larger than 5GiB.

## 7.0.5

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.0.5"
__version__ = "7.1.0"
from .base import Extractor
134 changes: 128 additions & 6 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import threading
from concurrent.futures import Future, ThreadPoolExecutor
from io import BytesIO
from io import BytesIO, RawIOBase
from math import ceil
from os import PathLike
from types import TracebackType
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Tuple, Type, Union
Expand All @@ -41,6 +42,103 @@
_QUEUES: int = 0
_QUEUES_LOCK: threading.RLock = threading.RLock()

# 5 GiB
_MAX_SINGLE_CHUNK_FILE_SIZE = 5 * 1024 * 1024 * 1024
# 4000 MiB
_MAX_FILE_CHUNK_SIZE = 4 * 1024 * 1024 * 1000


class ChunkedStream(RawIOBase, BinaryIO):
"""
Wrapper around a read-only stream to allow treating it as a sequence of smaller streams.
`next_chunk` will return `true` if there is one more chunk, it must be called
before this is treated as a stream the first time, typically in a `while` loop.
Args:
inner: Stream to wrap.
max_chunk_size: Maximum size per stream chunk.
stream_length: Total (remaining) length of the inner stream. This must be accurate.
"""

def __init__(self, inner: BinaryIO, max_chunk_size: int, stream_length: int) -> None:
self._inner = inner
self._pos = -1
self._max_chunk_size = max_chunk_size
self._stream_length = stream_length
self._chunk_index = -1
self._current_chunk_size = -1

def tell(self) -> int:
return self._pos

# RawIOBase is (stupidly) incompatible with BinaryIO
# Implementing a correct type that inherits from both is impossible,
# but python does so anyway, (ab)using the property that if bytes() and if None
# resolve the same way. These four useless methods with liberal use of Any are
# required to satisfy mypy.
# This may be solvable by changing the typing in the python SDK to use typing.Protocol.
def writelines(self, __lines: Any) -> None:
raise NotImplementedError()

def write(self, __b: Any) -> int:
raise NotImplementedError()

def __enter__(self) -> "ChunkedStream":
return super().__enter__()

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
return super().__exit__(exc_type, exc_val, exc_tb)

@property
def chunk_count(self) -> int:
return ceil(self._stream_length / self._max_chunk_size)

@property
def len(self) -> int:
return len(self)

@property
def current_chunk(self) -> int:
"""
Current chunk number.
"""
return self._chunk_index

def __len__(self) -> int:
return self._current_chunk_size

def readable(self) -> bool:
return True

def read(self, size: int = -1) -> bytes:
if size < 0:
size = self._current_chunk_size - self._pos

size = min(size, self._current_chunk_size - self._pos)
if size > 0:
self._pos += size
return self._inner.read(size)
return bytes()

def next_chunk(self) -> bool:
"""
Step into the next chunk, letting this be read as a stream again.
Returns `False` if the stream is exhausted.
"""
if self._chunk_index >= self.chunk_count - 1:
return False

self._chunk_index += 1
inner_pos = self._inner.tell()
self._current_chunk_size = min(self._max_chunk_size, self._stream_length - inner_pos)
self._pos = 0

return True


class IOFileUploadQueue(AbstractUploadQueue):
"""
Expand Down Expand Up @@ -100,6 +198,9 @@ def __init__(
self.files_written = FILES_UPLOADER_WRITTEN
self.queue_size = FILES_UPLOADER_QUEUE_SIZE

self.max_single_chunk_file_size = _MAX_SINGLE_CHUNK_FILE_SIZE
self.max_file_chunk_size = _MAX_FILE_CHUNK_SIZE

self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True)

self._full_queue = threading.Condition()
Expand Down Expand Up @@ -160,12 +261,33 @@ def _upload_single(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -
file_meta, _url = self.cdf_client.files.create(
file_metadata=file_meta, overwrite=self.overwrite_existing
)
elif size > 5 * 1024 * 1024 * 1024:
# File bigger than 5Gb
self.logger.warning(
f"File {file_meta.external_id} is larger than 5GiB, file will not be uploaded"
elif size >= self.max_single_chunk_file_size:
# The minimum chunk size is 4000MiB.
chunks = ChunkedStream(file, self.max_file_chunk_size, size)
self.logger.debug(
f"File {file_meta.external_id} is larger than 5GiB ({size})"
f", uploading in {chunks.chunk_count} chunks"
)
file_meta = FileMetadata()
with self.cdf_client.files.multipart_upload_session(
file_meta.name if file_meta.name is not None else "",
parts=chunks.chunk_count,
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
) as session:
while chunks.next_chunk():
session.upload_part(chunks.current_chunk, chunks)
file_meta = session.file_metadata
else:
file_meta = self.cdf_client.files.upload_bytes(
file,
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "7.0.5"
version = "7.1.0"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -51,7 +51,7 @@ exclude = "tests/*"

[tool.poetry.dependencies]
python = "^3.8.0"
cognite-sdk = "^7"
cognite-sdk = "^7.28.1"
prometheus-client = ">0.7.0, <=1.0.0"
arrow = "^1.0.0"
pyyaml = ">=5.3.0, <7"
Expand Down
24 changes: 23 additions & 1 deletion tests/tests_integration/test_cdf_upload_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class IntegrationTests(unittest.TestCase):

file1: str = f"util_integration_file_test_1-{test_id}"
file2: str = f"util_integration_file_test_2-{test_id}"
bigfile: str = f"util_integration_file-big-{test_id}"
empty_file: str = f"util_integration_file_test_3-{test_id}"

def setUp(self):
Expand Down Expand Up @@ -114,7 +115,7 @@ def tearDown(self):
self.client.events.delete(external_id=[self.event1, self.event2, self.event3], ignore_unknown_ids=True)
self.client.assets.delete(external_id=[self.asset1, self.asset2, self.asset3], ignore_unknown_ids=True)
# No ignore_unknown_ids in files, so we need to delete them one at a time
for file in [self.file1, self.file2]:
for file in [self.file1, self.file2, self.bigfile, self.empty_file]:
try:
self.client.files.delete(external_id=file)
except CogniteNotFoundError:
Expand Down Expand Up @@ -336,3 +337,24 @@ def test_bytes_upload_queue(self):

assert file1 == b"bytes content"
assert file2 == b"other bytes content"

def test_big_file_upload_queue(self):
queue = BytesUploadQueue(cdf_client=self.client, overwrite_existing=True, max_queue_size=1)
queue.max_file_chunk_size = 6_000_000
queue.max_single_chunk_file_size = 6_000_000

content = b"large" * 2_000_000

queue.add_to_upload_queue(content=content, metadata=FileMetadata(external_id=self.bigfile, name=self.bigfile))

queue.upload()

for _ in range(10):
file = self.client.files.retrieve(external_id=self.bigfile)
if file.uploaded:
break
time.sleep(1)

bigfile = self.client.files.download_bytes(external_id=self.bigfile)

assert len(bigfile) == 10_000_000

0 comments on commit cdd52fd

Please sign in to comment.