Skip to content

Commit

Permalink
Merge branch 'master' into all
Browse files Browse the repository at this point in the history
  • Loading branch information
mathialo authored Dec 3, 2024
2 parents 9fabb95 + 5820486 commit 36232c3
Showing 1 changed file with 87 additions and 14 deletions.
101 changes: 87 additions & 14 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@
from math import ceil
from os import PathLike
from types import TracebackType
from typing import Any, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union
from typing import (
Any,
BinaryIO,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
)
from urllib.parse import ParseResult, urlparse

from httpx import URL, Client, Headers, Request, StreamConsumed, SyncByteStream
Expand All @@ -27,7 +38,9 @@
from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata, FileMetadataUpdate
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorFileApply
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import (
CogniteExtractorFileApply,
)
from cognite.client.utils._identifier import IdentifierSequence
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
Expand Down Expand Up @@ -97,7 +110,10 @@ def __enter__(self) -> "ChunkedStream":
return super().__enter__()

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
return super().__exit__(exc_type, exc_val, exc_tb)

Expand Down Expand Up @@ -235,7 +251,8 @@ def __init__(
global _QUEUES, _QUEUES_LOCK
with _QUEUES_LOCK:
self._pool = ThreadPoolExecutor(
max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}"
max_workers=self.parallelism,
thread_name_prefix=f"FileUploadQueue-{_QUEUES}",
)
_QUEUES += 1

Expand All @@ -251,7 +268,7 @@ def _apply_cognite_file(self, file_apply: CogniteExtractorFileApply) -> NodeId:
node = instance_result.nodes[0]
return node.as_id()

def _upload_empty(
def _upload_only_metadata(
self, file_meta: FileMetadataOrCogniteExtractorFile
) -> tuple[FileMetadataOrCogniteExtractorFile, str]:
if isinstance(file_meta, CogniteExtractorFileApply):
Expand Down Expand Up @@ -281,11 +298,51 @@ def _upload_empty(

return file_meta_response, url

def _upload_empty_file(
self,
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
file_meta_response, url = self._upload_only_metadata(file_meta)

self._upload_only_file_reference(file_meta, url)

def _upload_bytes(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None:
file_meta, url = self._upload_empty(file_meta)
file_meta, url = self._upload_only_metadata(file_meta)
resp = self._httpx_client.send(self._get_file_upload_request(url, file, size, file_meta.mime_type))
resp.raise_for_status()

def _prepare_request_data_for_empty_file(self, url_str: str) -> Request:
FILE_SIZE = 0 # this path is only entered for an empty file
EMPTY_CONTENT = ""

url = URL(url_str)
base_url = URL(self.cdf_client.config.base_url)

if url.host == base_url.host:
upload_url = url
else:
parsed_url: ParseResult = urlparse(url_str)
parsed_base_url: ParseResult = urlparse(self.cdf_client.config.base_url)
replaced_upload_url = parsed_url._replace(netloc=parsed_base_url.netloc).geturl()
upload_url = URL(replaced_upload_url)

headers = Headers(self._httpx_client.headers)
headers.update(
{
"Accept": "*/*",
"Content-Length": str(FILE_SIZE),
"Host": upload_url.netloc.decode("ascii"),
"x-cdp-app": self.cdf_client._config.client_name,
}
)

return Request(method="PUT", url=upload_url, headers=headers, content=EMPTY_CONTENT)

def _upload_only_file_reference(self, file_meta: FileMetadataOrCogniteExtractorFile, url_str: str) -> None:
request_data = self._prepare_request_data_for_empty_file(url_str)
resp = self._httpx_client.send(request_data)
resp.raise_for_status()

def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None:
chunks = ChunkedStream(file, self.max_file_chunk_size, size)
self.logger.debug(
Expand Down Expand Up @@ -329,7 +386,10 @@ def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chun
res = self.cdf_client.files._post(
url_path="/files/initmultipartupload",
json=file_meta.dump(camel_case=True),
params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count},
params={
"overwrite": self.overwrite_existing,
"parts": chunks.chunk_count,
},
)
res.raise_for_status()
return res.json()
Expand All @@ -339,7 +399,10 @@ def add_io_to_upload_queue(
file_meta: FileMetadataOrCogniteExtractorFile,
read_file: Callable[[], BinaryIO],
extra_retries: Optional[
Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]]
Union[
Tuple[Type[Exception], ...],
Dict[Type[Exception], Callable[[Any], bool]],
]
] = None,
) -> None:
"""
Expand All @@ -366,12 +429,14 @@ def add_io_to_upload_queue(
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None:
def upload_file(
read_file: Callable[[], BinaryIO],
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
with read_file() as file:
size = super_len(file)
if size == 0:
# upload just the file metadata witout data
file_meta, _ = self._upload_empty(file_meta)
self._upload_empty_file(file_meta)
elif size >= self.max_single_chunk_file_size:
# The minimum chunk size is 4000MiB.
self._upload_multipart(size, file, file_meta)
Expand All @@ -388,7 +453,10 @@ def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogn
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None:
def wrapped_upload(
read_file: Callable[[], BinaryIO],
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
try:
upload_file(read_file, file_meta)

Expand Down Expand Up @@ -485,7 +553,10 @@ def __enter__(self) -> "IOFileUploadQueue":
return self

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""
Wraps around stop method, for use as context manager
Expand Down Expand Up @@ -544,7 +615,9 @@ def __init__(
)

def add_to_upload_queue(
self, file_meta: FileMetadataOrCogniteExtractorFile, file_name: Union[str, PathLike]
self,
file_meta: FileMetadataOrCogniteExtractorFile,
file_name: Union[str, PathLike],
) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
Expand Down

0 comments on commit 36232c3

Please sign in to comment.