diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 9df4b6b..8da265f 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -18,7 +18,18 @@ from math import ceil from os import PathLike from types import TracebackType -from typing import Any, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union +from typing import ( + Any, + BinaryIO, + Callable, + Dict, + Iterator, + List, + Optional, + Tuple, + Type, + Union, +) from urllib.parse import ParseResult, urlparse from httpx import URL, Client, Headers, Request, StreamConsumed, SyncByteStream @@ -27,7 +38,9 @@ from cognite.client import CogniteClient from cognite.client.data_classes import FileMetadata, FileMetadataUpdate from cognite.client.data_classes.data_modeling import NodeId -from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorFileApply +from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import ( + CogniteExtractorFileApply, +) from cognite.client.utils._identifier import IdentifierSequence from cognite.extractorutils.threading import CancellationToken from cognite.extractorutils.uploader._base import ( @@ -97,7 +110,10 @@ def __enter__(self) -> "ChunkedStream": return super().__enter__() def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], ) -> None: return super().__exit__(exc_type, exc_val, exc_tb) @@ -235,7 +251,8 @@ def __init__( global _QUEUES, _QUEUES_LOCK with _QUEUES_LOCK: self._pool = ThreadPoolExecutor( - max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}" + max_workers=self.parallelism, + thread_name_prefix=f"FileUploadQueue-{_QUEUES}", ) _QUEUES += 1 @@ -251,7 +268,7 @@ def _apply_cognite_file(self, file_apply: CogniteExtractorFileApply) -> NodeId: node = instance_result.nodes[0] return node.as_id() - def _upload_empty( + def _upload_only_metadata( self, file_meta: FileMetadataOrCogniteExtractorFile ) -> tuple[FileMetadataOrCogniteExtractorFile, str]: if isinstance(file_meta, CogniteExtractorFileApply): @@ -281,11 +298,51 @@ def _upload_empty( return file_meta_response, url + def _upload_empty_file( + self, + file_meta: FileMetadataOrCogniteExtractorFile, + ) -> None: + file_meta_response, url = self._upload_only_metadata(file_meta) + + self._upload_only_file_reference(file_meta, url) + def _upload_bytes(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None: - file_meta, url = self._upload_empty(file_meta) + file_meta, url = self._upload_only_metadata(file_meta) resp = self._httpx_client.send(self._get_file_upload_request(url, file, size, file_meta.mime_type)) resp.raise_for_status() + def _prepare_request_data_for_empty_file(self, url_str: str) -> Request: + FILE_SIZE = 0 # this path is only entered for an empty file + EMPTY_CONTENT = "" + + url = URL(url_str) + base_url = URL(self.cdf_client.config.base_url) + + if url.host == base_url.host: + upload_url = url + else: + parsed_url: ParseResult = urlparse(url_str) + parsed_base_url: ParseResult = urlparse(self.cdf_client.config.base_url) + replaced_upload_url = parsed_url._replace(netloc=parsed_base_url.netloc).geturl() + upload_url = URL(replaced_upload_url) + + headers = Headers(self._httpx_client.headers) + headers.update( + { + "Accept": "*/*", + "Content-Length": str(FILE_SIZE), + "Host": upload_url.netloc.decode("ascii"), + "x-cdp-app": self.cdf_client._config.client_name, + } + ) + + return Request(method="PUT", url=upload_url, headers=headers, content=EMPTY_CONTENT) + + def _upload_only_file_reference(self, file_meta: FileMetadataOrCogniteExtractorFile, url_str: str) -> None: + request_data = self._prepare_request_data_for_empty_file(url_str) + resp = self._httpx_client.send(request_data) + resp.raise_for_status() + def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None: chunks = ChunkedStream(file, self.max_file_chunk_size, size) self.logger.debug( @@ -329,7 +386,10 @@ def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chun res = self.cdf_client.files._post( url_path="/files/initmultipartupload", json=file_meta.dump(camel_case=True), - params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count}, + params={ + "overwrite": self.overwrite_existing, + "parts": chunks.chunk_count, + }, ) res.raise_for_status() return res.json() @@ -339,7 +399,10 @@ def add_io_to_upload_queue( file_meta: FileMetadataOrCogniteExtractorFile, read_file: Callable[[], BinaryIO], extra_retries: Optional[ - Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]] + Union[ + Tuple[Type[Exception], ...], + Dict[Type[Exception], Callable[[Any], bool]], + ] ] = None, ) -> None: """ @@ -366,10 +429,15 @@ def add_io_to_upload_queue( max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) - def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def upload_file( + read_file: Callable[[], BinaryIO], + file_meta: FileMetadataOrCogniteExtractorFile, + ) -> None: with read_file() as file: size = super_len(file) - if size >= self.max_single_chunk_file_size: + if size == 0: + self._upload_empty_file(file_meta) + elif size >= self.max_single_chunk_file_size: # The minimum chunk size is 4000MiB. self._upload_multipart(size, file, file_meta) @@ -385,7 +453,10 @@ def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogn except Exception as e: self.logger.error("Error in upload callback: %s", str(e)) - def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def wrapped_upload( + read_file: Callable[[], BinaryIO], + file_meta: FileMetadataOrCogniteExtractorFile, + ) -> None: try: upload_file(read_file, file_meta) @@ -482,7 +553,10 @@ def __enter__(self) -> "IOFileUploadQueue": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], ) -> None: """ Wraps around stop method, for use as context manager @@ -541,7 +615,9 @@ def __init__( ) def add_to_upload_queue( - self, file_meta: FileMetadataOrCogniteExtractorFile, file_name: Union[str, PathLike] + self, + file_meta: FileMetadataOrCogniteExtractorFile, + file_name: Union[str, PathLike], ) -> None: """ Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold