From 4a3987773abb7e2c0b7e4e474ba9b530b0451f56 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 21 Nov 2024 17:45:05 +0530 Subject: [PATCH 01/29] Changes for reporting files that have not been uploaded --- cognite/extractorutils/uploader/files.py | 180 ++++++++++++++---- .../uploader/upload_failure_handler.py | 61 ++++++ 2 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 cognite/extractorutils/uploader/upload_failure_handler.py diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index bfdda308..9c4ce432 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -18,7 +18,18 @@ from math import ceil from os import PathLike from types import TracebackType -from typing import Any, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union +from typing import ( + Any, + BinaryIO, + Callable, + Dict, + Iterator, + List, + Optional, + Tuple, + Type, + Union, +) from urllib.parse import ParseResult, urlparse from httpx import URL, Client, Headers, Request, StreamConsumed, SyncByteStream @@ -27,7 +38,9 @@ from cognite.client import CogniteClient from cognite.client.data_classes import FileMetadata, FileMetadataUpdate from cognite.client.data_classes.data_modeling import NodeId -from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorFileApply +from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import ( + CogniteExtractorFileApply, +) from cognite.client.utils._identifier import IdentifierSequence from cognite.extractorutils.threading import CancellationToken from cognite.extractorutils.uploader._base import ( @@ -42,6 +55,7 @@ FILES_UPLOADER_QUEUED, FILES_UPLOADER_WRITTEN, ) +from cognite.extractorutils.uploader.upload_failure_handler import FileFailureManager from cognite.extractorutils.util import cognite_exceptions, retry _QUEUES: int = 0 @@ -56,7 +70,6 @@ FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply] - class ChunkedStream(RawIOBase, BinaryIO): """ Wrapper around a read-only stream to allow treating it as a sequence of smaller streams. @@ -70,7 +83,9 @@ class ChunkedStream(RawIOBase, BinaryIO): stream_length: Total (remaining) length of the inner stream. This must be accurate. """ - def __init__(self, inner: BinaryIO, max_chunk_size: int, stream_length: int) -> None: + def __init__( + self, inner: BinaryIO, max_chunk_size: int, stream_length: int + ) -> None: self._inner = inner self._pos = -1 self._max_chunk_size = max_chunk_size @@ -97,7 +112,10 @@ def __enter__(self) -> "ChunkedStream": return super().__enter__() def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], ) -> None: return super().__exit__(exc_type, exc_val, exc_tb) @@ -143,7 +161,9 @@ def next_chunk(self) -> bool: self._chunk_index += 1 inner_pos = self._inner.tell() - self._current_chunk_size = min(self._max_chunk_size, self._stream_length - inner_pos) + self._current_chunk_size = min( + self._max_chunk_size, self._stream_length - inner_pos + ) self._pos = 0 return True @@ -186,13 +206,16 @@ class IOFileUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, + post_upload_function: Optional[ + Callable[[List[FileMetadataOrCogniteExtractorFile]], None] + ] = None, max_queue_size: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, overwrite_existing: bool = False, cancellation_token: Optional[CancellationToken] = None, max_parallelism: Optional[int] = None, + failure_logging_path: None | str = None, ): # Super sets post_upload and threshold super().__init__( @@ -208,6 +231,9 @@ def __init__( if self.threshold <= 0: raise ValueError("Max queue size must be positive for file upload queues") + self.failure_logging_path = failure_logging_path or None + self.initialize_failure_logging() + self.upload_queue: List[Future] = [] self.errors: List[Exception] = [] @@ -226,23 +252,54 @@ def __init__( self.max_single_chunk_file_size = _MAX_SINGLE_CHUNK_FILE_SIZE self.max_file_chunk_size = _MAX_FILE_CHUNK_SIZE - self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True) + self._update_queue_thread = threading.Thread( + target=self._remove_done_from_queue, daemon=True + ) self._full_queue = threading.Condition() - self._httpx_client = Client(follow_redirects=True, timeout=cdf_client.config.file_transfer_timeout) + self._httpx_client = Client( + follow_redirects=True, timeout=cdf_client.config.file_transfer_timeout + ) global _QUEUES, _QUEUES_LOCK with _QUEUES_LOCK: self._pool = ThreadPoolExecutor( - max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}" + max_workers=self.parallelism, + thread_name_prefix=f"FileUploadQueue-{_QUEUES}", ) _QUEUES += 1 + def initialize_failure_logging(self): + if self.failure_logging_path: + self._file_failure_manager = FileFailureManager( + path_to_file=self.failure_logging_path + ) + else: + self._file_failure_manager = None + + def get_failure_logger(self) -> FileFailureManager: + return self._file_failure_manager + + def add_entry_failure_logger(self, file_name: str, error: Exception) -> None: + if self._file_failure_manager is not None: + error_reason = str(error) + self._file_failure_manager.add( + file_name=file_name, error_reason=error_reason + ) + + def flush_failure_logger(self): + if self._file_failure_manager is not None: + self.logger.info("Flushing failure logs") + self._file_failure_manager.write_to_file() + self._file_failure_manager.clear() + def _remove_done_from_queue(self) -> None: while not self.cancellation_token.is_cancelled: with self.lock: - self.upload_queue = list(filter(lambda f: f.running(), self.upload_queue)) + self.upload_queue = list( + filter(lambda f: f.running(), self.upload_queue) + ) self.cancellation_token.wait(5) @@ -265,7 +322,8 @@ def _upload_empty( # The files API for whatever reason doesn't update directory or source when you overwrite, # so we need to update those later. any_unchaged = ( - file_meta_response.directory != file_meta.directory or file_meta_response.source != file_meta.source + file_meta_response.directory != file_meta.directory + or file_meta_response.source != file_meta.source ) if any_unchaged: update = FileMetadataUpdate(external_id=file_meta.external_id) @@ -281,15 +339,23 @@ def _upload_empty( return file_meta_response, url - def _upload_bytes(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def _upload_bytes( + self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile + ) -> None: file_meta, url = self._upload_empty(file_meta) - resp = self._httpx_client.send(self._get_file_upload_request(url, file, size, file_meta.mime_type)) + resp = self._httpx_client.send( + self._get_file_upload_request(url, file, size, file_meta.mime_type) + ) + resp.raise_for_status() - def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def _upload_multipart( + self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile + ) -> None: chunks = ChunkedStream(file, self.max_file_chunk_size, size) self.logger.debug( - f"File {file_meta.external_id} is larger than 5GiB ({size})" f", uploading in {chunks.chunk_count} chunks" + f"File {file_meta.external_id} is larger than 5GiB ({size})" + f", uploading in {chunks.chunk_count} chunks" ) returned_file_metadata = self._create_multi_part(file_meta, chunks) @@ -299,11 +365,17 @@ def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOr for url in upload_urls: chunks.next_chunk() - resp = self._httpx_client.send(self._get_file_upload_request(url, chunks, len(chunks), file_meta.mime_type)) + resp = self._httpx_client.send( + self._get_file_upload_request( + url, chunks, len(chunks), file_meta.mime_type + ) + ) resp.raise_for_status() completed_headers = ( - _CDF_ALPHA_VERSION_HEADER if isinstance(file_meta, CogniteExtractorFileApply) is not None else None + _CDF_ALPHA_VERSION_HEADER + if isinstance(file_meta, CogniteExtractorFileApply) is not None + else None ) res = self.cdf_client.files._post( @@ -313,7 +385,9 @@ def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOr ) res.raise_for_status() - def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chunks: ChunkedStream) -> dict: + def _create_multi_part( + self, file_meta: FileMetadataOrCogniteExtractorFile, chunks: ChunkedStream + ) -> dict: if isinstance(file_meta, CogniteExtractorFileApply): node_id = self._apply_cognite_file(file_meta) identifiers = IdentifierSequence.load(instance_ids=node_id).as_singleton() @@ -329,7 +403,10 @@ def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chun res = self.cdf_client.files._post( url_path="/files/initmultipartupload", json=file_meta.dump(camel_case=True), - params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count}, + params={ + "overwrite": self.overwrite_existing, + "parts": chunks.chunk_count, + }, ) res.raise_for_status() return res.json() @@ -339,7 +416,10 @@ def add_io_to_upload_queue( file_meta: FileMetadataOrCogniteExtractorFile, read_file: Callable[[], BinaryIO], extra_retries: Optional[ - Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]] + Union[ + Tuple[Type[Exception], ...], + Dict[Type[Exception], Callable[[Any], bool]], + ] ] = None, ) -> None: """ @@ -366,7 +446,10 @@ def add_io_to_upload_queue( max_delay=RETRY_MAX_DELAY, backoff=RETRY_BACKOFF_FACTOR, ) - def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def upload_file( + read_file: Callable[[], BinaryIO], + file_meta: FileMetadataOrCogniteExtractorFile, + ) -> None: with read_file() as file: size = super_len(file) if size == 0: @@ -388,12 +471,18 @@ def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogn except Exception as e: self.logger.error("Error in upload callback: %s", str(e)) - def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def wrapped_upload( + read_file: Callable[[], BinaryIO], + file_meta: FileMetadataOrCogniteExtractorFile, + ) -> None: try: upload_file(read_file, file_meta) except Exception as e: - self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}") + self.logger.exception( + f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}" + ) + self.add_entry_failure_logger(file_name=file_meta.name, error=e) self.errors.append(e) finally: @@ -406,11 +495,16 @@ def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrC if self.upload_queue_size >= self.threshold: with self._full_queue: - while not self._full_queue.wait(timeout=2) and not self.cancellation_token.is_cancelled: + while ( + not self._full_queue.wait(timeout=2) + and not self.cancellation_token.is_cancelled + ): pass with self.lock: - self.upload_queue.append(self._pool.submit(wrapped_upload, read_file, file_meta)) + self.upload_queue.append( + self._pool.submit(wrapped_upload, read_file, file_meta) + ) self.upload_queue_size += 1 self.files_queued.inc() self.queue_size.set(self.upload_queue_size) @@ -426,7 +520,9 @@ def _get_file_upload_request( else: parsed_url: ParseResult = urlparse(url_str) parsed_base_url: ParseResult = urlparse(self.cdf_client.config.base_url) - replaced_upload_url = parsed_url._replace(netloc=parsed_base_url.netloc).geturl() + replaced_upload_url = parsed_url._replace( + netloc=parsed_base_url.netloc + ).geturl() upload_url = URL(replaced_upload_url) headers = Headers(self._httpx_client.headers) @@ -460,7 +556,9 @@ def _create_cdm(self, instance_id: NodeId) -> tuple[FileMetadata, str]: resp_json = res.json()["items"][0] return FileMetadata.load(resp_json), resp_json["uploadUrl"] - def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None: + def upload( + self, fail_on_errors: bool = True, timeout: Optional[float] = None + ) -> None: """ Wait for all uploads to finish """ @@ -470,7 +568,10 @@ def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) - self.queue_size.set(self.upload_queue_size) if fail_on_errors and self.errors: # There might be more errors, but we can only have one as the cause, so pick the first - raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0] + self.flush_failure_logger() + raise RuntimeError( + f"{len(self.errors)} upload(s) finished with errors" + ) from self.errors[0] def __enter__(self) -> "IOFileUploadQueue": """ @@ -485,7 +586,10 @@ def __enter__(self) -> "IOFileUploadQueue": return self def __exit__( - self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], ) -> None: """ Wraps around stop method, for use as context manager @@ -524,7 +628,9 @@ class FileUploadQueue(IOFileUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, + post_upload_function: Optional[ + Callable[[List[FileMetadataOrCogniteExtractorFile]], None] + ] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = "DEBUG", @@ -544,7 +650,9 @@ def __init__( ) def add_to_upload_queue( - self, file_meta: FileMetadataOrCogniteExtractorFile, file_name: Union[str, PathLike] + self, + file_meta: FileMetadataOrCogniteExtractorFile, + file_name: Union[str, PathLike], ) -> None: """ Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold @@ -579,7 +687,9 @@ class BytesUploadQueue(IOFileUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, + post_upload_function: Optional[ + Callable[[List[FileMetadataOrCogniteExtractorFile]], None] + ] = None, max_queue_size: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, @@ -596,7 +706,9 @@ def __init__( cancellation_token, ) - def add_to_upload_queue(self, content: bytes, file_meta: FileMetadataOrCogniteExtractorFile) -> None: + def add_to_upload_queue( + self, content: bytes, file_meta: FileMetadataOrCogniteExtractorFile + ) -> None: """ Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__. diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py new file mode 100644 index 00000000..884df684 --- /dev/null +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -0,0 +1,61 @@ +from datetime import datetime +import jsonlines + + +class FileErrorMapping: + def __init__(self, file_name: str, error_reason: str): + self.file_name = file_name + self.error_reason = error_reason + + def __iter__(self): + return iter([[self.file_name, self.error_reason]]) + + +class FileFailureManager: + MAX_QUEUE_SIZE = 500 + START_TIME_KEY = "start_time" + FILE_REASON_MAP_KEY = "file_error_reason_map" + + def __init__(self, start_time=None, path_to_file=None): + self.failure_logs = {} + + self.path_to_failure_log = self._pre_process_file_extension(path_to_file) + self.start_time = start_time or str(datetime.now()) + self._initialize_failure_logs() + + def _pre_process_file_extension(self, path_to_file): + if path_to_file and not path_to_file.endswith(".jsonl"): + return path_to_file + ".jsonl" + return path_to_file + + def _initialize_failure_logs(self): + self.failure_logs[FileFailureManager.START_TIME_KEY] = self.start_time + self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY] = {} + + def __len__(self): + return len(self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY]) + + def clear(self): + self.failure_logs.clear() + self._initialize_failure_logs() + + def add(self, file_name: str, error_reason: str): + error_file_object = FileErrorMapping( + file_name=file_name, error_reason=error_reason + ) + + self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY].update( + error_file_object + ) + + if len(self) >= self.MAX_QUEUE_SIZE: + self.write_to_file() + self.clear() + + def write_to_file(self): + + if len(self.failure_logs[self.FILE_REASON_MAP_KEY]) == 0: + return + + with jsonlines.open(self.path_to_failure_log, mode="a") as writer: + writer.write(self.failure_logs) From 45a58136a4d015ba87442feb917b1759c9825d80 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Fri, 22 Nov 2024 10:42:34 +0530 Subject: [PATCH 02/29] formatting changes --- cognite/extractorutils/uploader/files.py | 114 ++++++------------ .../uploader/upload_failure_handler.py | 35 +++--- pyproject.toml | 2 + 3 files changed, 52 insertions(+), 99 deletions(-) diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 9c4ce432..c9f79507 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -68,8 +68,10 @@ _CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"} + FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply] + class ChunkedStream(RawIOBase, BinaryIO): """ Wrapper around a read-only stream to allow treating it as a sequence of smaller streams. @@ -83,9 +85,7 @@ class ChunkedStream(RawIOBase, BinaryIO): stream_length: Total (remaining) length of the inner stream. This must be accurate. """ - def __init__( - self, inner: BinaryIO, max_chunk_size: int, stream_length: int - ) -> None: + def __init__(self, inner: BinaryIO, max_chunk_size: int, stream_length: int) -> None: self._inner = inner self._pos = -1 self._max_chunk_size = max_chunk_size @@ -161,9 +161,7 @@ def next_chunk(self) -> bool: self._chunk_index += 1 inner_pos = self._inner.tell() - self._current_chunk_size = min( - self._max_chunk_size, self._stream_length - inner_pos - ) + self._current_chunk_size = min(self._max_chunk_size, self._stream_length - inner_pos) self._pos = 0 return True @@ -206,9 +204,7 @@ class IOFileUploadQueue(AbstractUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[ - Callable[[List[FileMetadataOrCogniteExtractorFile]], None] - ] = None, + post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, max_queue_size: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, @@ -252,15 +248,11 @@ def __init__( self.max_single_chunk_file_size = _MAX_SINGLE_CHUNK_FILE_SIZE self.max_file_chunk_size = _MAX_FILE_CHUNK_SIZE - self._update_queue_thread = threading.Thread( - target=self._remove_done_from_queue, daemon=True - ) + self._update_queue_thread = threading.Thread(target=self._remove_done_from_queue, daemon=True) self._full_queue = threading.Condition() - self._httpx_client = Client( - follow_redirects=True, timeout=cdf_client.config.file_transfer_timeout - ) + self._httpx_client = Client(follow_redirects=True, timeout=cdf_client.config.file_transfer_timeout) global _QUEUES, _QUEUES_LOCK with _QUEUES_LOCK: @@ -270,25 +262,22 @@ def __init__( ) _QUEUES += 1 - def initialize_failure_logging(self): - if self.failure_logging_path: - self._file_failure_manager = FileFailureManager( - path_to_file=self.failure_logging_path - ) - else: - self._file_failure_manager = None + def initialize_failure_logging(self) -> None: + self._file_failure_manager: FileFailureManager | None = ( + FileFailureManager(path_to_file=self.failure_logging_path) + if self.failure_logging_path is not None + else None + ) - def get_failure_logger(self) -> FileFailureManager: + def get_failure_logger(self) -> FileFailureManager | None: return self._file_failure_manager def add_entry_failure_logger(self, file_name: str, error: Exception) -> None: if self._file_failure_manager is not None: error_reason = str(error) - self._file_failure_manager.add( - file_name=file_name, error_reason=error_reason - ) + self._file_failure_manager.add(file_name=file_name, error_reason=error_reason) - def flush_failure_logger(self): + def flush_failure_logger(self) -> None: if self._file_failure_manager is not None: self.logger.info("Flushing failure logs") self._file_failure_manager.write_to_file() @@ -297,9 +286,7 @@ def flush_failure_logger(self): def _remove_done_from_queue(self) -> None: while not self.cancellation_token.is_cancelled: with self.lock: - self.upload_queue = list( - filter(lambda f: f.running(), self.upload_queue) - ) + self.upload_queue = list(filter(lambda f: f.running(), self.upload_queue)) self.cancellation_token.wait(5) @@ -322,8 +309,7 @@ def _upload_empty( # The files API for whatever reason doesn't update directory or source when you overwrite, # so we need to update those later. any_unchaged = ( - file_meta_response.directory != file_meta.directory - or file_meta_response.source != file_meta.source + file_meta_response.directory != file_meta.directory or file_meta_response.source != file_meta.source ) if any_unchaged: update = FileMetadataUpdate(external_id=file_meta.external_id) @@ -339,23 +325,16 @@ def _upload_empty( return file_meta_response, url - def _upload_bytes( - self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile - ) -> None: + def _upload_bytes(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None: file_meta, url = self._upload_empty(file_meta) - resp = self._httpx_client.send( - self._get_file_upload_request(url, file, size, file_meta.mime_type) - ) + resp = self._httpx_client.send(self._get_file_upload_request(url, file, size, file_meta.mime_type)) resp.raise_for_status() - def _upload_multipart( - self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile - ) -> None: + def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None: chunks = ChunkedStream(file, self.max_file_chunk_size, size) self.logger.debug( - f"File {file_meta.external_id} is larger than 5GiB ({size})" - f", uploading in {chunks.chunk_count} chunks" + f"File {file_meta.external_id} is larger than 5GiB ({size})" f", uploading in {chunks.chunk_count} chunks" ) returned_file_metadata = self._create_multi_part(file_meta, chunks) @@ -365,17 +344,11 @@ def _upload_multipart( for url in upload_urls: chunks.next_chunk() - resp = self._httpx_client.send( - self._get_file_upload_request( - url, chunks, len(chunks), file_meta.mime_type - ) - ) + resp = self._httpx_client.send(self._get_file_upload_request(url, chunks, len(chunks), file_meta.mime_type)) resp.raise_for_status() completed_headers = ( - _CDF_ALPHA_VERSION_HEADER - if isinstance(file_meta, CogniteExtractorFileApply) is not None - else None + _CDF_ALPHA_VERSION_HEADER if isinstance(file_meta, CogniteExtractorFileApply) is not None else None ) res = self.cdf_client.files._post( @@ -385,9 +358,7 @@ def _upload_multipart( ) res.raise_for_status() - def _create_multi_part( - self, file_meta: FileMetadataOrCogniteExtractorFile, chunks: ChunkedStream - ) -> dict: + def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chunks: ChunkedStream) -> dict: if isinstance(file_meta, CogniteExtractorFileApply): node_id = self._apply_cognite_file(file_meta) identifiers = IdentifierSequence.load(instance_ids=node_id).as_singleton() @@ -482,7 +453,7 @@ def wrapped_upload( self.logger.exception( f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}" ) - self.add_entry_failure_logger(file_name=file_meta.name, error=e) + self.add_entry_failure_logger(file_name=str(file_meta.name), error=e) self.errors.append(e) finally: @@ -495,16 +466,11 @@ def wrapped_upload( if self.upload_queue_size >= self.threshold: with self._full_queue: - while ( - not self._full_queue.wait(timeout=2) - and not self.cancellation_token.is_cancelled - ): + while not self._full_queue.wait(timeout=2) and not self.cancellation_token.is_cancelled: pass with self.lock: - self.upload_queue.append( - self._pool.submit(wrapped_upload, read_file, file_meta) - ) + self.upload_queue.append(self._pool.submit(wrapped_upload, read_file, file_meta)) self.upload_queue_size += 1 self.files_queued.inc() self.queue_size.set(self.upload_queue_size) @@ -520,9 +486,7 @@ def _get_file_upload_request( else: parsed_url: ParseResult = urlparse(url_str) parsed_base_url: ParseResult = urlparse(self.cdf_client.config.base_url) - replaced_upload_url = parsed_url._replace( - netloc=parsed_base_url.netloc - ).geturl() + replaced_upload_url = parsed_url._replace(netloc=parsed_base_url.netloc).geturl() upload_url = URL(replaced_upload_url) headers = Headers(self._httpx_client.headers) @@ -556,9 +520,7 @@ def _create_cdm(self, instance_id: NodeId) -> tuple[FileMetadata, str]: resp_json = res.json()["items"][0] return FileMetadata.load(resp_json), resp_json["uploadUrl"] - def upload( - self, fail_on_errors: bool = True, timeout: Optional[float] = None - ) -> None: + def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -> None: """ Wait for all uploads to finish """ @@ -569,9 +531,7 @@ def upload( if fail_on_errors and self.errors: # There might be more errors, but we can only have one as the cause, so pick the first self.flush_failure_logger() - raise RuntimeError( - f"{len(self.errors)} upload(s) finished with errors" - ) from self.errors[0] + raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0] def __enter__(self) -> "IOFileUploadQueue": """ @@ -628,9 +588,7 @@ class FileUploadQueue(IOFileUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[ - Callable[[List[FileMetadataOrCogniteExtractorFile]], None] - ] = None, + post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, max_queue_size: Optional[int] = None, max_upload_interval: Optional[int] = None, trigger_log_level: str = "DEBUG", @@ -687,9 +645,7 @@ class BytesUploadQueue(IOFileUploadQueue): def __init__( self, cdf_client: CogniteClient, - post_upload_function: Optional[ - Callable[[List[FileMetadataOrCogniteExtractorFile]], None] - ] = None, + post_upload_function: Optional[Callable[[List[FileMetadataOrCogniteExtractorFile]], None]] = None, max_queue_size: Optional[int] = None, trigger_log_level: str = "DEBUG", thread_name: Optional[str] = None, @@ -706,9 +662,7 @@ def __init__( cancellation_token, ) - def add_to_upload_queue( - self, content: bytes, file_meta: FileMetadataOrCogniteExtractorFile - ) -> None: + def add_to_upload_queue(self, content: bytes, file_meta: FileMetadataOrCogniteExtractorFile) -> None: """ Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold specified in the __init__. diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py index 884df684..b0a1e47b 100644 --- a/cognite/extractorutils/uploader/upload_failure_handler.py +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -1,13 +1,15 @@ from datetime import datetime +from typing import Any, Iterator, List + import jsonlines class FileErrorMapping: - def __init__(self, file_name: str, error_reason: str): + def __init__(self, file_name: str, error_reason: str) -> None: self.file_name = file_name self.error_reason = error_reason - def __iter__(self): + def __iter__(self) -> Iterator[List[str]]: return iter([[self.file_name, self.error_reason]]) @@ -16,44 +18,39 @@ class FileFailureManager: START_TIME_KEY = "start_time" FILE_REASON_MAP_KEY = "file_error_reason_map" - def __init__(self, start_time=None, path_to_file=None): - self.failure_logs = {} + def __init__(self, start_time: str | None = None, path_to_file: str | None = None) -> None: + self.failure_logs: dict[str, Any] = {} - self.path_to_failure_log = self._pre_process_file_extension(path_to_file) + self.path_to_failure_log: str = self._pre_process_file_extension(path_to_file) self.start_time = start_time or str(datetime.now()) self._initialize_failure_logs() - def _pre_process_file_extension(self, path_to_file): + def _pre_process_file_extension(self, path_to_file: str | None) -> str: if path_to_file and not path_to_file.endswith(".jsonl"): return path_to_file + ".jsonl" - return path_to_file + return str(path_to_file) - def _initialize_failure_logs(self): + def _initialize_failure_logs(self) -> None: self.failure_logs[FileFailureManager.START_TIME_KEY] = self.start_time self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY] = {} - def __len__(self): + def __len__(self) -> int: return len(self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY]) - def clear(self): + def clear(self) -> None: self.failure_logs.clear() self._initialize_failure_logs() - def add(self, file_name: str, error_reason: str): - error_file_object = FileErrorMapping( - file_name=file_name, error_reason=error_reason - ) + def add(self, file_name: str, error_reason: str) -> None: + error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason) - self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY].update( - error_file_object - ) + self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY].update(error_file_object) if len(self) >= self.MAX_QUEUE_SIZE: self.write_to_file() self.clear() - def write_to_file(self): - + def write_to_file(self) -> None: if len(self.failure_logs[self.FILE_REASON_MAP_KEY]) == 0: return diff --git a/pyproject.toml b/pyproject.toml index c6cf6f4c..73b3ac59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,8 @@ httpx = "^0.27.0" pydantic = "^2.8.2" pyhumps = "^3.8.0" croniter = "^5.0.0" +jsonlines = "^4.0.0" +datetime = "5.5" [tool.poetry.extras] experimental = ["cognite-sdk-experimental"] From 353f99cd1990473b3bbb496aea8d484be535f242 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Tue, 26 Nov 2024 11:16:19 +0530 Subject: [PATCH 03/29] Changes requested on PR --- cognite/extractorutils/uploader/files.py | 1 - cognite/extractorutils/uploader/upload_failure_handler.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index c9f79507..42bda7e3 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -281,7 +281,6 @@ def flush_failure_logger(self) -> None: if self._file_failure_manager is not None: self.logger.info("Flushing failure logs") self._file_failure_manager.write_to_file() - self._file_failure_manager.clear() def _remove_done_from_queue(self) -> None: while not self.cancellation_token.is_cancelled: diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py index b0a1e47b..4ef972a5 100644 --- a/cognite/extractorutils/uploader/upload_failure_handler.py +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -48,7 +48,6 @@ def add(self, file_name: str, error_reason: str) -> None: if len(self) >= self.MAX_QUEUE_SIZE: self.write_to_file() - self.clear() def write_to_file(self) -> None: if len(self.failure_logs[self.FILE_REASON_MAP_KEY]) == 0: @@ -56,3 +55,5 @@ def write_to_file(self) -> None: with jsonlines.open(self.path_to_failure_log, mode="a") as writer: writer.write(self.failure_logs) + + self.clear() From 865d97a5214c7982fb076593c95afab3b4395cea Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Fri, 6 Dec 2024 11:52:19 +0530 Subject: [PATCH 04/29] Changes suggested on the PR --- .../uploader/upload_failure_handler.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py index 4ef972a5..83a3c6dc 100644 --- a/cognite/extractorutils/uploader/upload_failure_handler.py +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, Iterator, List +from typing import Iterator, List import jsonlines @@ -19,7 +19,7 @@ class FileFailureManager: FILE_REASON_MAP_KEY = "file_error_reason_map" def __init__(self, start_time: str | None = None, path_to_file: str | None = None) -> None: - self.failure_logs: dict[str, Any] = {} + self.failure_logs: dict[str, str] = {} self.path_to_failure_log: str = self._pre_process_file_extension(path_to_file) self.start_time = start_time or str(datetime.now()) @@ -31,11 +31,10 @@ def _pre_process_file_extension(self, path_to_file: str | None) -> str: return str(path_to_file) def _initialize_failure_logs(self) -> None: - self.failure_logs[FileFailureManager.START_TIME_KEY] = self.start_time - self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY] = {} + self.failure_logs = {} def __len__(self) -> int: - return len(self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY]) + return len(self.failure_logs) def clear(self) -> None: self.failure_logs.clear() @@ -44,16 +43,21 @@ def clear(self) -> None: def add(self, file_name: str, error_reason: str) -> None: error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason) - self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY].update(error_file_object) + self.failure_logs.update(dict(error_file_object)) if len(self) >= self.MAX_QUEUE_SIZE: self.write_to_file() def write_to_file(self) -> None: - if len(self.failure_logs[self.FILE_REASON_MAP_KEY]) == 0: + if len(self) == 0: return + dict_to_write = { + self.START_TIME_KEY: self.start_time, + self.FILE_REASON_MAP_KEY: self.failure_logs, + } + with jsonlines.open(self.path_to_failure_log, mode="a") as writer: - writer.write(self.failure_logs) + writer.write(dict_to_write) self.clear() From 998f48a06c622a7cae9200bd004a5c3300198241 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Fri, 6 Dec 2024 11:52:54 +0530 Subject: [PATCH 05/29] Changes suggested on the PR --- cognite/extractorutils/uploader/upload_failure_handler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py index 83a3c6dc..4dfeec76 100644 --- a/cognite/extractorutils/uploader/upload_failure_handler.py +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -42,8 +42,9 @@ def clear(self) -> None: def add(self, file_name: str, error_reason: str) -> None: error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason) + error_file_dict = dict(error_file_object) - self.failure_logs.update(dict(error_file_object)) + self.failure_logs.update(error_file_dict) if len(self) >= self.MAX_QUEUE_SIZE: self.write_to_file() From ee53fb2e47cb6c66164a4a4229de49610f9d8674 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Tue, 10 Dec 2024 16:11:17 +0530 Subject: [PATCH 06/29] removed datetime as a dependency --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 73b3ac59..faa1e10f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,6 @@ pydantic = "^2.8.2" pyhumps = "^3.8.0" croniter = "^5.0.0" jsonlines = "^4.0.0" -datetime = "5.5" [tool.poetry.extras] experimental = ["cognite-sdk-experimental"] From 5974f9f38564edab2a1ac1591dc3d70b33ba5fc0 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 12:30:57 +0530 Subject: [PATCH 07/29] Added test case for file upload failure --- .../file_with_no_permission.txt | 0 .../test_file_integration.py | 45 +++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100755 tests/tests_integration/file_with_no_permission.txt diff --git a/tests/tests_integration/file_with_no_permission.txt b/tests/tests_integration/file_with_no_permission.txt new file mode 100755 index 00000000..e69de29b diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 92fc68d1..e40885a1 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -19,6 +19,7 @@ import time from typing import Callable, Optional, Tuple +import jsonlines import pytest from cognite.client import CogniteClient @@ -67,6 +68,48 @@ def await_is_uploaded_status( time.sleep(1) +@pytest.mark.parametrize("functions_runtime", ["true", "false"]) +def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None: + LOG_FAILURE_FILE = "integration_test_failure_log.jsonl" + NO_PERMISSION_FILE = "file_with_no_permission.txt" + FILE_REASON_MAP_KEY = "file_error_reason_map" + + os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime + client, test_parameter = set_upload_test + queue = IOFileUploadQueue( + cdf_client=client, + overwrite_existing=True, + max_queue_size=2, + failure_logging_path=LOG_FAILURE_FILE, + ) + + current_dir = pathlib.Path(__file__).parent.resolve() + + test_parameter.space = None + + # Upload a pair of actual files + assert test_parameter.external_ids is not None + assert test_parameter.space is None + queue.add_to_upload_queue( + file_meta=FileMetadata( + external_id=test_parameter.external_ids[0], + name=test_parameter.external_ids[0], + ), + file_name=current_dir.joinpath(NO_PERMISSION_FILE), + ) + + queue.upload() + + assert os.path.isfile(LOG_FAILURE_FILE) + + with jsonlines.open(LOG_FAILURE_FILE) as reader: + for obj in reader: + assert FILE_REASON_MAP_KEY in obj + assert "Permission denied" in obj[FILE_REASON_MAP_KEY] + + os.remove(LOG_FAILURE_FILE) + + @pytest.mark.parametrize("functions_runtime", ["true", "false"]) def test_file_upload_queue(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None: os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime @@ -137,9 +180,11 @@ def test_file_upload_queue(set_upload_test: Tuple[CogniteClient, ParamTest], fun assert file4 == b"test content\n" assert file5 == b"other test content\n" + node = client.data_modeling.instances.retrieve_nodes( NodeId(test_parameter.space, test_parameter.external_ids[8]), node_cls=CogniteExtractorFile ) + assert isinstance(node, CogniteExtractorFile) assert file6 is not None and file6.instance_id is not None and file6.instance_id.space == test_parameter.space From fde7130e00624b0516be85fa13e27ac6658cfc8c Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 12:46:51 +0530 Subject: [PATCH 08/29] change the method being called in the test case --- tests/tests_integration/test_file_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index e40885a1..1b02cd8c 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -90,7 +90,7 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function # Upload a pair of actual files assert test_parameter.external_ids is not None assert test_parameter.space is None - queue.add_to_upload_queue( + queue.add_io_to_upload_queue( file_meta=FileMetadata( external_id=test_parameter.external_ids[0], name=test_parameter.external_ids[0], From cb6b5349905d7a055980314e10d4842918eb4394 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 12:53:13 +0530 Subject: [PATCH 09/29] Undo a minor change --- tests/tests_integration/test_file_integration.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 1b02cd8c..983de70b 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -85,11 +85,9 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function current_dir = pathlib.Path(__file__).parent.resolve() - test_parameter.space = None - # Upload a pair of actual files assert test_parameter.external_ids is not None - assert test_parameter.space is None + assert test_parameter.space is not None queue.add_io_to_upload_queue( file_meta=FileMetadata( external_id=test_parameter.external_ids[0], From b00bce6c9b82cc2262810b049ac3e9492ee2179d Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 13:06:21 +0530 Subject: [PATCH 10/29] parameter fix --- tests/tests_integration/test_file_integration.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 983de70b..52a3d799 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -17,7 +17,7 @@ import pathlib import random import time -from typing import Callable, Optional, Tuple +from typing import BinaryIO, Callable, Optional, Tuple import jsonlines import pytest @@ -83,6 +83,9 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function failure_logging_path=LOG_FAILURE_FILE, ) + def load_file_from_path() -> BinaryIO: + return open(NO_PERMISSION_FILE, "rb") + current_dir = pathlib.Path(__file__).parent.resolve() # Upload a pair of actual files @@ -93,7 +96,7 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function external_id=test_parameter.external_ids[0], name=test_parameter.external_ids[0], ), - file_name=current_dir.joinpath(NO_PERMISSION_FILE), + read_file=load_file_from_path, ) queue.upload() From 17475e5e9bc3dae92346855a609ae4a63125957e Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 13:10:48 +0530 Subject: [PATCH 11/29] validation changes --- tests/tests_integration/test_file_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 52a3d799..b5c68e6a 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -106,7 +106,7 @@ def load_file_from_path() -> BinaryIO: with jsonlines.open(LOG_FAILURE_FILE) as reader: for obj in reader: assert FILE_REASON_MAP_KEY in obj - assert "Permission denied" in obj[FILE_REASON_MAP_KEY] + assert "Permission denied" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] os.remove(LOG_FAILURE_FILE) From a95b1fcc1f547cca98e0600c880f528fa5333a19 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 13:14:24 +0530 Subject: [PATCH 12/29] minor change to error message --- tests/tests_integration/file_with_no_permission.txt | 0 tests/tests_integration/test_file_integration.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100755 tests/tests_integration/file_with_no_permission.txt diff --git a/tests/tests_integration/file_with_no_permission.txt b/tests/tests_integration/file_with_no_permission.txt deleted file mode 100755 index e69de29b..00000000 diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index b5c68e6a..afb401ea 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -106,7 +106,7 @@ def load_file_from_path() -> BinaryIO: with jsonlines.open(LOG_FAILURE_FILE) as reader: for obj in reader: assert FILE_REASON_MAP_KEY in obj - assert "Permission denied" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] + assert "FileNotFoundError" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] os.remove(LOG_FAILURE_FILE) From a892e75baecb79db344b3aa347b3cc6e58f53ccc Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 13:23:43 +0530 Subject: [PATCH 13/29] path fix for file --- tests/tests_integration/test_file_integration.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index afb401ea..c77180fd 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -83,11 +83,11 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function failure_logging_path=LOG_FAILURE_FILE, ) - def load_file_from_path() -> BinaryIO: - return open(NO_PERMISSION_FILE, "rb") - current_dir = pathlib.Path(__file__).parent.resolve() + def load_file_from_path() -> BinaryIO: + return open(current_dir.joinpath(NO_PERMISSION_FILE), "rb") + # Upload a pair of actual files assert test_parameter.external_ids is not None assert test_parameter.space is not None @@ -101,14 +101,14 @@ def load_file_from_path() -> BinaryIO: queue.upload() - assert os.path.isfile(LOG_FAILURE_FILE) + assert os.path.isfile(current_dir.joinpath(LOG_FAILURE_FILE)) - with jsonlines.open(LOG_FAILURE_FILE) as reader: + with jsonlines.open(current_dir.joinpath(LOG_FAILURE_FILE)) as reader: for obj in reader: assert FILE_REASON_MAP_KEY in obj assert "FileNotFoundError" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] - os.remove(LOG_FAILURE_FILE) + os.remove(current_dir.joinpath(LOG_FAILURE_FILE)) @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 932b4f13ef52156e20536c3827ee05512aa4bb52 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 13:32:19 +0530 Subject: [PATCH 14/29] Added file with no permission --- tests/tests_integration/file_with_no_permission.txt | 0 tests/tests_integration/test_file_integration.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100755 tests/tests_integration/file_with_no_permission.txt diff --git a/tests/tests_integration/file_with_no_permission.txt b/tests/tests_integration/file_with_no_permission.txt new file mode 100755 index 00000000..e69de29b diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index c77180fd..966ad533 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -106,7 +106,7 @@ def load_file_from_path() -> BinaryIO: with jsonlines.open(current_dir.joinpath(LOG_FAILURE_FILE)) as reader: for obj in reader: assert FILE_REASON_MAP_KEY in obj - assert "FileNotFoundError" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] + assert "Permission denied" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] os.remove(current_dir.joinpath(LOG_FAILURE_FILE)) From bfe9af22b20681e7f7b0b10ee75bf98950b98436 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 13:58:32 +0530 Subject: [PATCH 15/29] Changes to path for failure log --- tests/tests_integration/test_file_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 966ad533..970fb53a 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -101,14 +101,14 @@ def load_file_from_path() -> BinaryIO: queue.upload() - assert os.path.isfile(current_dir.joinpath(LOG_FAILURE_FILE)) + assert os.path.isfile(LOG_FAILURE_FILE) - with jsonlines.open(current_dir.joinpath(LOG_FAILURE_FILE)) as reader: + with jsonlines.open(LOG_FAILURE_FILE) as reader: for obj in reader: assert FILE_REASON_MAP_KEY in obj assert "Permission denied" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] - os.remove(current_dir.joinpath(LOG_FAILURE_FILE)) + os.remove(LOG_FAILURE_FILE) @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 1e67733f3549d8b3d340af3703694076c3bb918d Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 14:08:16 +0530 Subject: [PATCH 16/29] Flush failure logger --- tests/tests_integration/test_file_integration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 970fb53a..e21db07d 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -100,6 +100,7 @@ def load_file_from_path() -> BinaryIO: ) queue.upload() + queue.flush_failure_logger() assert os.path.isfile(LOG_FAILURE_FILE) From 5ce8b76a87020b8c4a59e34587351213f69ee94a Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 14:57:03 +0530 Subject: [PATCH 17/29] Changes to check in-memory --- tests/tests_integration/test_file_integration.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index e21db07d..66e3a7f3 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -19,7 +19,6 @@ import time from typing import BinaryIO, Callable, Optional, Tuple -import jsonlines import pytest from cognite.client import CogniteClient @@ -100,16 +99,11 @@ def load_file_from_path() -> BinaryIO: ) queue.upload() - queue.flush_failure_logger() - assert os.path.isfile(LOG_FAILURE_FILE) - - with jsonlines.open(LOG_FAILURE_FILE) as reader: - for obj in reader: - assert FILE_REASON_MAP_KEY in obj - assert "Permission denied" in obj[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] - - os.remove(LOG_FAILURE_FILE) + failure_logger = queue.get_failure_logger() + assert len(failure_logger) == 1 + assert NO_PERMISSION_FILE in failure_logger.failure_logs + assert "Permission denied" in failure_logger.failure_logs[NO_PERMISSION_FILE] @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 77b2ada6d231ce35ec0e67a5341a1edaafc81b95 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 15:10:45 +0530 Subject: [PATCH 18/29] Added sleep for the error logs to get stored --- cognite/extractorutils/uploader/files.py | 2 ++ tests/tests_integration/test_file_integration.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 42bda7e3..5cb02114 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -452,6 +452,8 @@ def wrapped_upload( self.logger.exception( f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}" ) + # TODO: Remove after integration test passes + print("faced error while uploading error") # noqa self.add_entry_failure_logger(file_name=str(file_meta.name), error=e) self.errors.append(e) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 66e3a7f3..95a9024d 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -100,6 +100,8 @@ def load_file_from_path() -> BinaryIO: queue.upload() + time.sleep(5) + failure_logger = queue.get_failure_logger() assert len(failure_logger) == 1 assert NO_PERMISSION_FILE in failure_logger.failure_logs From 2ec829ff7034ed30261cd4f0a7822b1afc76d3f2 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 15:23:36 +0530 Subject: [PATCH 19/29] Added print statements --- tests/tests_integration/test_file_integration.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 95a9024d..d0107ba6 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -67,23 +67,22 @@ def await_is_uploaded_status( time.sleep(1) -@pytest.mark.parametrize("functions_runtime", ["true", "false"]) +@pytest.mark.parametrize("functions_runtime", ["true"]) def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None: LOG_FAILURE_FILE = "integration_test_failure_log.jsonl" NO_PERMISSION_FILE = "file_with_no_permission.txt" FILE_REASON_MAP_KEY = "file_error_reason_map" + current_dir = pathlib.Path(__file__).parent.resolve() os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime client, test_parameter = set_upload_test queue = IOFileUploadQueue( cdf_client=client, overwrite_existing=True, max_queue_size=2, - failure_logging_path=LOG_FAILURE_FILE, + failure_logging_path=current_dir.joinpath(LOG_FAILURE_FILE), ) - current_dir = pathlib.Path(__file__).parent.resolve() - def load_file_from_path() -> BinaryIO: return open(current_dir.joinpath(NO_PERMISSION_FILE), "rb") @@ -103,9 +102,8 @@ def load_file_from_path() -> BinaryIO: time.sleep(5) failure_logger = queue.get_failure_logger() - assert len(failure_logger) == 1 - assert NO_PERMISSION_FILE in failure_logger.failure_logs - assert "Permission denied" in failure_logger.failure_logs[NO_PERMISSION_FILE] + print(f"Failure logs: {failure_logger.failure_logs}") + print(f"Check for file: {os.path.isfile(current_dir.joinpath(LOG_FAILURE_FILE))}") @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 56cb8a9d01a46d429a71de14a3be5f9bdc097eed Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 15:33:11 +0530 Subject: [PATCH 20/29] stringify the path on joinpath --- tests/tests_integration/test_file_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index d0107ba6..1092b4a2 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -80,11 +80,11 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function cdf_client=client, overwrite_existing=True, max_queue_size=2, - failure_logging_path=current_dir.joinpath(LOG_FAILURE_FILE), + failure_logging_path=str(current_dir.joinpath(LOG_FAILURE_FILE)), ) def load_file_from_path() -> BinaryIO: - return open(current_dir.joinpath(NO_PERMISSION_FILE), "rb") + return open(str(current_dir.joinpath(NO_PERMISSION_FILE)), "rb") # Upload a pair of actual files assert test_parameter.external_ids is not None @@ -103,7 +103,7 @@ def load_file_from_path() -> BinaryIO: failure_logger = queue.get_failure_logger() print(f"Failure logs: {failure_logger.failure_logs}") - print(f"Check for file: {os.path.isfile(current_dir.joinpath(LOG_FAILURE_FILE))}") + print(f"Check for file: {os.path.isfile(str(current_dir.joinpath(LOG_FAILURE_FILE)))}") @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 2ba4a8606a6ad59c0b41f3aad4f62290dbcc25f9 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 15:37:48 +0530 Subject: [PATCH 21/29] add assert --- tests/tests_integration/test_file_integration.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 1092b4a2..e5d2a7ec 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -105,6 +105,8 @@ def load_file_from_path() -> BinaryIO: print(f"Failure logs: {failure_logger.failure_logs}") print(f"Check for file: {os.path.isfile(str(current_dir.joinpath(LOG_FAILURE_FILE)))}") + assert os.path.isfile(str(current_dir.joinpath(LOG_FAILURE_FILE))) + @pytest.mark.parametrize("functions_runtime", ["true", "false"]) def test_file_upload_queue(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None: From 27fffb974fb34893ef3d1c2e9b95bf6c04c37d74 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 16:10:44 +0530 Subject: [PATCH 22/29] raise manual exception --- tests/tests_integration/test_file_integration.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index e5d2a7ec..f632efdb 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -76,15 +76,16 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function current_dir = pathlib.Path(__file__).parent.resolve() os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime client, test_parameter = set_upload_test + qualified_failure_logging_path = str(current_dir.joinpath(LOG_FAILURE_FILE)) queue = IOFileUploadQueue( cdf_client=client, overwrite_existing=True, max_queue_size=2, - failure_logging_path=str(current_dir.joinpath(LOG_FAILURE_FILE)), + failure_logging_path=qualified_failure_logging_path, ) def load_file_from_path() -> BinaryIO: - return open(str(current_dir.joinpath(NO_PERMISSION_FILE)), "rb") + raise Exception("No permission to read file") # Upload a pair of actual files assert test_parameter.external_ids is not None @@ -103,9 +104,9 @@ def load_file_from_path() -> BinaryIO: failure_logger = queue.get_failure_logger() print(f"Failure logs: {failure_logger.failure_logs}") - print(f"Check for file: {os.path.isfile(str(current_dir.joinpath(LOG_FAILURE_FILE)))}") + print(f"Check for file: {os.path.isfile(qualified_failure_logging_path)}") - assert os.path.isfile(str(current_dir.joinpath(LOG_FAILURE_FILE))) + assert os.path.isfile(qualified_failure_logging_path) @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 4ad3083d425d9532ec06f305c7dc34fe3d732449 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Wed, 11 Dec 2024 16:19:00 +0530 Subject: [PATCH 23/29] Added random printing stuff --- cognite/extractorutils/uploader/files.py | 1 + cognite/extractorutils/uploader/upload_failure_handler.py | 1 + 2 files changed, 2 insertions(+) diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 5cb02114..4befd413 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -273,6 +273,7 @@ def get_failure_logger(self) -> FileFailureManager | None: return self._file_failure_manager def add_entry_failure_logger(self, file_name: str, error: Exception) -> None: + print(f"is {self._file_failure_manager} is none?") # noqa if self._file_failure_manager is not None: error_reason = str(error) self._file_failure_manager.add(file_name=file_name, error_reason=error_reason) diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py index 4dfeec76..a929b8a2 100644 --- a/cognite/extractorutils/uploader/upload_failure_handler.py +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -41,6 +41,7 @@ def clear(self) -> None: self._initialize_failure_logs() def add(self, file_name: str, error_reason: str) -> None: + print(f"file_name: {file_name}, error_reason: {error_reason}") # noqa error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason) error_file_dict = dict(error_file_object) From 2231fcd1f74cd56d0e732ed2b256d18d66cbc195 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 12 Dec 2024 03:46:14 +0530 Subject: [PATCH 24/29] changes to assert statements --- tests/tests_integration/test_file_integration.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index f632efdb..c99b2bda 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -72,6 +72,7 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function LOG_FAILURE_FILE = "integration_test_failure_log.jsonl" NO_PERMISSION_FILE = "file_with_no_permission.txt" FILE_REASON_MAP_KEY = "file_error_reason_map" + ERROR_RAISED_ON_FILE_READ = "No permission to read file" current_dir = pathlib.Path(__file__).parent.resolve() os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime @@ -85,7 +86,7 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function ) def load_file_from_path() -> BinaryIO: - raise Exception("No permission to read file") + raise Exception(ERROR_RAISED_ON_FILE_READ) # Upload a pair of actual files assert test_parameter.external_ids is not None @@ -106,7 +107,10 @@ def load_file_from_path() -> BinaryIO: print(f"Failure logs: {failure_logger.failure_logs}") print(f"Check for file: {os.path.isfile(qualified_failure_logging_path)}") - assert os.path.isfile(qualified_failure_logging_path) + assert len(failure_logger) == 1 + assert FILE_REASON_MAP_KEY in failure_logger + assert NO_PERMISSION_FILE in failure_logger[FILE_REASON_MAP_KEY] + assert ERROR_RAISED_ON_FILE_READ in failure_logger[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 16f79e7c67c1b0a987d15ed05da02293b4f3f1fe Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 12 Dec 2024 03:55:23 +0530 Subject: [PATCH 25/29] Handle the exception in upload queue and check for value in failure logger --- .../test_file_integration.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index c99b2bda..704cc1cb 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -99,18 +99,19 @@ def load_file_from_path() -> BinaryIO: read_file=load_file_from_path, ) - queue.upload() - - time.sleep(5) - - failure_logger = queue.get_failure_logger() - print(f"Failure logs: {failure_logger.failure_logs}") - print(f"Check for file: {os.path.isfile(qualified_failure_logging_path)}") - - assert len(failure_logger) == 1 - assert FILE_REASON_MAP_KEY in failure_logger - assert NO_PERMISSION_FILE in failure_logger[FILE_REASON_MAP_KEY] - assert ERROR_RAISED_ON_FILE_READ in failure_logger[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] + try: + queue.upload() + + time.sleep(5) + except Exception as e: + failure_logger = queue.get_failure_logger() + print(f"Failure logs: {failure_logger.failure_logs}") + print(f"Check for file: {os.path.isfile(qualified_failure_logging_path)}") + + assert len(failure_logger) == 1 + assert FILE_REASON_MAP_KEY in failure_logger + assert NO_PERMISSION_FILE in failure_logger[FILE_REASON_MAP_KEY] + assert ERROR_RAISED_ON_FILE_READ in failure_logger[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From f7ba15c12ab7fa4722f4d4cf9c4ddf91ef898355 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 12 Dec 2024 04:06:37 +0530 Subject: [PATCH 26/29] read jsonlines file and validate it --- .../test_file_integration.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 704cc1cb..9ba8eb10 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -19,6 +19,7 @@ import time from typing import BinaryIO, Callable, Optional, Tuple +import jsonlines import pytest from cognite.client import CogniteClient @@ -77,12 +78,12 @@ def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], function current_dir = pathlib.Path(__file__).parent.resolve() os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime client, test_parameter = set_upload_test - qualified_failure_logging_path = str(current_dir.joinpath(LOG_FAILURE_FILE)) + fully_qualified_failure_logging_path = str(current_dir.joinpath(LOG_FAILURE_FILE)) queue = IOFileUploadQueue( cdf_client=client, overwrite_existing=True, max_queue_size=2, - failure_logging_path=qualified_failure_logging_path, + failure_logging_path=fully_qualified_failure_logging_path, ) def load_file_from_path() -> BinaryIO: @@ -106,12 +107,15 @@ def load_file_from_path() -> BinaryIO: except Exception as e: failure_logger = queue.get_failure_logger() print(f"Failure logs: {failure_logger.failure_logs}") - print(f"Check for file: {os.path.isfile(qualified_failure_logging_path)}") - - assert len(failure_logger) == 1 - assert FILE_REASON_MAP_KEY in failure_logger - assert NO_PERMISSION_FILE in failure_logger[FILE_REASON_MAP_KEY] - assert ERROR_RAISED_ON_FILE_READ in failure_logger[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] + print(f"Check for file: {os.path.isfile(fully_qualified_failure_logging_path)}") + + with jsonlines.open(fully_qualified_failure_logging_path, "r") as reader: + for failure_logger_run in reader: + assert len(failure_logger_run) == 1 + assert FILE_REASON_MAP_KEY in failure_logger_run + assert NO_PERMISSION_FILE in failure_logger_run[FILE_REASON_MAP_KEY] + assert ERROR_RAISED_ON_FILE_READ in failure_logger_run[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] + os.remove(fully_qualified_failure_logging_path) @pytest.mark.parametrize("functions_runtime", ["true", "false"]) From 344ff6268c231460c276027aa2a11dcf9be1b51b Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 12 Dec 2024 04:12:18 +0530 Subject: [PATCH 27/29] remove incorrect validation with len --- tests/tests_integration/test_file_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 9ba8eb10..d07ebc9d 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -111,7 +111,6 @@ def load_file_from_path() -> BinaryIO: with jsonlines.open(fully_qualified_failure_logging_path, "r") as reader: for failure_logger_run in reader: - assert len(failure_logger_run) == 1 assert FILE_REASON_MAP_KEY in failure_logger_run assert NO_PERMISSION_FILE in failure_logger_run[FILE_REASON_MAP_KEY] assert ERROR_RAISED_ON_FILE_READ in failure_logger_run[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] From 8cdb5afde66b45f639578860057012a535ee1ea6 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 12 Dec 2024 04:13:40 +0530 Subject: [PATCH 28/29] Delete file with no permission --- tests/tests_integration/file_with_no_permission.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100755 tests/tests_integration/file_with_no_permission.txt diff --git a/tests/tests_integration/file_with_no_permission.txt b/tests/tests_integration/file_with_no_permission.txt deleted file mode 100755 index e69de29b..00000000 From 37afff9d586fd0b008647ed98f149443256e83c0 Mon Sep 17 00:00:00 2001 From: Nithin Bodanapu Date: Thu, 12 Dec 2024 04:21:24 +0530 Subject: [PATCH 29/29] remove print statements --- cognite/extractorutils/uploader/files.py | 3 --- cognite/extractorutils/uploader/upload_failure_handler.py | 1 - tests/tests_integration/test_file_integration.py | 4 +--- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 4befd413..42bda7e3 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -273,7 +273,6 @@ def get_failure_logger(self) -> FileFailureManager | None: return self._file_failure_manager def add_entry_failure_logger(self, file_name: str, error: Exception) -> None: - print(f"is {self._file_failure_manager} is none?") # noqa if self._file_failure_manager is not None: error_reason = str(error) self._file_failure_manager.add(file_name=file_name, error_reason=error_reason) @@ -453,8 +452,6 @@ def wrapped_upload( self.logger.exception( f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}" ) - # TODO: Remove after integration test passes - print("faced error while uploading error") # noqa self.add_entry_failure_logger(file_name=str(file_meta.name), error=e) self.errors.append(e) diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py index a929b8a2..4dfeec76 100644 --- a/cognite/extractorutils/uploader/upload_failure_handler.py +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -41,7 +41,6 @@ def clear(self) -> None: self._initialize_failure_logs() def add(self, file_name: str, error_reason: str) -> None: - print(f"file_name: {file_name}, error_reason: {error_reason}") # noqa error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason) error_file_dict = dict(error_file_object) diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index d07ebc9d..dd06cbcd 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -95,7 +95,7 @@ def load_file_from_path() -> BinaryIO: queue.add_io_to_upload_queue( file_meta=FileMetadata( external_id=test_parameter.external_ids[0], - name=test_parameter.external_ids[0], + name=NO_PERMISSION_FILE, ), read_file=load_file_from_path, ) @@ -106,8 +106,6 @@ def load_file_from_path() -> BinaryIO: time.sleep(5) except Exception as e: failure_logger = queue.get_failure_logger() - print(f"Failure logs: {failure_logger.failure_logs}") - print(f"Check for file: {os.path.isfile(fully_qualified_failure_logging_path)}") with jsonlines.open(fully_qualified_failure_logging_path, "r") as reader: for failure_logger_run in reader: