diff --git a/cognite/extractorutils/uploader/files.py b/cognite/extractorutils/uploader/files.py index 116945af..c723d814 100644 --- a/cognite/extractorutils/uploader/files.py +++ b/cognite/extractorutils/uploader/files.py @@ -23,7 +23,10 @@ BinaryIO, Callable, Iterator, + List, + Optional, Type, + Union, ) from urllib.parse import ParseResult, urlparse @@ -50,6 +53,7 @@ FILES_UPLOADER_QUEUED, FILES_UPLOADER_WRITTEN, ) +from cognite.extractorutils.uploader.upload_failure_handler import FileFailureManager from cognite.extractorutils.util import cognite_exceptions, retry _QUEUES: int = 0 @@ -62,7 +66,8 @@ _CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"} -FileMetadataOrCogniteExtractorFile = FileMetadata | CogniteExtractorFileApply + +FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply] class ChunkedStream(RawIOBase, BinaryIO): @@ -202,8 +207,9 @@ def __init__( trigger_log_level: str = "DEBUG", thread_name: str | None = None, overwrite_existing: bool = False, - cancellation_token: CancellationToken | None = None, - max_parallelism: int | None = None, + cancellation_token: Optional[CancellationToken] = None, + max_parallelism: Optional[int] = None, + failure_logging_path: None | str = None, ): # Super sets post_upload and threshold super().__init__( @@ -219,8 +225,11 @@ def __init__( if self.threshold <= 0: raise ValueError("Max queue size must be positive for file upload queues") - self.upload_queue: list[Future] = [] - self.errors: list[Exception] = [] + self.failure_logging_path = failure_logging_path or None + self.initialize_failure_logging() + + self.upload_queue: List[Future] = [] + self.errors: List[Exception] = [] self.overwrite_existing = overwrite_existing @@ -251,6 +260,26 @@ def __init__( ) _QUEUES += 1 + def initialize_failure_logging(self) -> None: + self._file_failure_manager: FileFailureManager | None = ( + FileFailureManager(path_to_file=self.failure_logging_path) + if self.failure_logging_path is not None + else None + ) + + def get_failure_logger(self) -> FileFailureManager | None: + return self._file_failure_manager + + def add_entry_failure_logger(self, file_name: str, error: Exception) -> None: + if self._file_failure_manager is not None: + error_reason = str(error) + self._file_failure_manager.add(file_name=file_name, error_reason=error_reason) + + def flush_failure_logger(self) -> None: + if self._file_failure_manager is not None: + self.logger.info("Flushing failure logs") + self._file_failure_manager.write_to_file() + def _remove_done_from_queue(self) -> None: while not self.cancellation_token.is_cancelled: with self.lock: @@ -451,7 +480,10 @@ def wrapped_upload( upload_file(read_file, file_meta) except Exception as e: - self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}") + self.logger.exception( + f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}" + ) + self.add_entry_failure_logger(file_name=str(file_meta.name), error=e) self.errors.append(e) finally: @@ -528,6 +560,7 @@ def upload(self, fail_on_errors: bool = True, timeout: float | None = None) -> N self.queue_size.set(self.upload_queue_size) if fail_on_errors and self.errors: # There might be more errors, but we can only have one as the cause, so pick the first + self.flush_failure_logger() raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0] def __enter__(self) -> "IOFileUploadQueue": @@ -544,9 +577,9 @@ def __enter__(self) -> "IOFileUploadQueue": def __exit__( self, - exc_type: Type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], ) -> None: """ Wraps around stop method, for use as context manager @@ -607,7 +640,7 @@ def __init__( def add_to_upload_queue( self, file_meta: FileMetadataOrCogniteExtractorFile, - file_name: str | PathLike, + file_name: Union[str, PathLike], ) -> None: """ Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold diff --git a/cognite/extractorutils/uploader/upload_failure_handler.py b/cognite/extractorutils/uploader/upload_failure_handler.py new file mode 100644 index 00000000..4dfeec76 --- /dev/null +++ b/cognite/extractorutils/uploader/upload_failure_handler.py @@ -0,0 +1,64 @@ +from datetime import datetime +from typing import Iterator, List + +import jsonlines + + +class FileErrorMapping: + def __init__(self, file_name: str, error_reason: str) -> None: + self.file_name = file_name + self.error_reason = error_reason + + def __iter__(self) -> Iterator[List[str]]: + return iter([[self.file_name, self.error_reason]]) + + +class FileFailureManager: + MAX_QUEUE_SIZE = 500 + START_TIME_KEY = "start_time" + FILE_REASON_MAP_KEY = "file_error_reason_map" + + def __init__(self, start_time: str | None = None, path_to_file: str | None = None) -> None: + self.failure_logs: dict[str, str] = {} + + self.path_to_failure_log: str = self._pre_process_file_extension(path_to_file) + self.start_time = start_time or str(datetime.now()) + self._initialize_failure_logs() + + def _pre_process_file_extension(self, path_to_file: str | None) -> str: + if path_to_file and not path_to_file.endswith(".jsonl"): + return path_to_file + ".jsonl" + return str(path_to_file) + + def _initialize_failure_logs(self) -> None: + self.failure_logs = {} + + def __len__(self) -> int: + return len(self.failure_logs) + + def clear(self) -> None: + self.failure_logs.clear() + self._initialize_failure_logs() + + def add(self, file_name: str, error_reason: str) -> None: + error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason) + error_file_dict = dict(error_file_object) + + self.failure_logs.update(error_file_dict) + + if len(self) >= self.MAX_QUEUE_SIZE: + self.write_to_file() + + def write_to_file(self) -> None: + if len(self) == 0: + return + + dict_to_write = { + self.START_TIME_KEY: self.start_time, + self.FILE_REASON_MAP_KEY: self.failure_logs, + } + + with jsonlines.open(self.path_to_failure_log, mode="a") as writer: + writer.write(dict_to_write) + + self.clear() diff --git a/pyproject.toml b/pyproject.toml index 0bbb9ec2..759db908 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ httpx = "^0.27.0" pydantic = "^2.8.2" pyhumps = "^3.8.0" croniter = "^5.0.0" +jsonlines = "^4.0.0" [tool.poetry.extras] experimental = ["cognite-sdk-experimental"] diff --git a/tests/tests_integration/test_file_integration.py b/tests/tests_integration/test_file_integration.py index 8d971b31..105acfc0 100644 --- a/tests/tests_integration/test_file_integration.py +++ b/tests/tests_integration/test_file_integration.py @@ -17,8 +17,9 @@ import pathlib import random import time -from typing import Callable +from typing import BinaryIO, Callable, Tuple +import jsonlines import pytest from cognite.client import CogniteClient @@ -28,7 +29,11 @@ CogniteExtractorFile, CogniteExtractorFileApply, ) -from cognite.extractorutils.uploader.files import BytesUploadQueue, FileUploadQueue, IOFileUploadQueue +from cognite.extractorutils.uploader.files import ( + BytesUploadQueue, + FileUploadQueue, + IOFileUploadQueue, +) from tests.conftest import ETestType, ParamTest @@ -53,7 +58,9 @@ def set_test_parameters() -> ParamTest: def await_is_uploaded_status( - client: CogniteClient, external_id: str | None = None, instance_id: NodeId | None = None + client: CogniteClient, + external_id: str | None = None, + instance_id: NodeId | None = None, ) -> None: for _ in range(10): if external_id is not None: @@ -67,6 +74,53 @@ def await_is_uploaded_status( time.sleep(1) +@pytest.mark.parametrize("functions_runtime", ["true"]) +def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None: + LOG_FAILURE_FILE = "integration_test_failure_log.jsonl" + NO_PERMISSION_FILE = "file_with_no_permission.txt" + FILE_REASON_MAP_KEY = "file_error_reason_map" + ERROR_RAISED_ON_FILE_READ = "No permission to read file" + + current_dir = pathlib.Path(__file__).parent.resolve() + os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime + client, test_parameter = set_upload_test + fully_qualified_failure_logging_path = str(current_dir.joinpath(LOG_FAILURE_FILE)) + queue = IOFileUploadQueue( + cdf_client=client, + overwrite_existing=True, + max_queue_size=2, + failure_logging_path=fully_qualified_failure_logging_path, + ) + + def load_file_from_path() -> BinaryIO: + raise Exception(ERROR_RAISED_ON_FILE_READ) + + # Upload a pair of actual files + assert test_parameter.external_ids is not None + assert test_parameter.space is not None + queue.add_io_to_upload_queue( + file_meta=FileMetadata( + external_id=test_parameter.external_ids[0], + name=NO_PERMISSION_FILE, + ), + read_file=load_file_from_path, + ) + + try: + queue.upload() + + time.sleep(5) + except Exception as e: + failure_logger = queue.get_failure_logger() + + with jsonlines.open(fully_qualified_failure_logging_path, "r") as reader: + for failure_logger_run in reader: + assert FILE_REASON_MAP_KEY in failure_logger_run + assert NO_PERMISSION_FILE in failure_logger_run[FILE_REASON_MAP_KEY] + assert ERROR_RAISED_ON_FILE_READ in failure_logger_run[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE] + os.remove(fully_qualified_failure_logging_path) + + @pytest.mark.parametrize("functions_runtime", ["true", "false"]) def test_file_upload_queue(set_upload_test: tuple[CogniteClient, ParamTest], functions_runtime: str) -> None: os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime @@ -79,28 +133,41 @@ def test_file_upload_queue(set_upload_test: tuple[CogniteClient, ParamTest], fun assert test_parameter.external_ids is not None assert test_parameter.space is not None queue.add_to_upload_queue( - file_meta=FileMetadata(external_id=test_parameter.external_ids[0], name=test_parameter.external_ids[0]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[0], + name=test_parameter.external_ids[0], + ), file_name=current_dir.joinpath("test_file_1.txt"), ) queue.add_to_upload_queue( - file_meta=FileMetadata(external_id=test_parameter.external_ids[1], name=test_parameter.external_ids[1]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[1], + name=test_parameter.external_ids[1], + ), file_name=current_dir.joinpath("test_file_2.txt"), ) # Upload the Filemetadata of an empty file without trying to upload the "content" queue.add_to_upload_queue( - file_meta=FileMetadata(external_id=test_parameter.external_ids[3], name=test_parameter.external_ids[3]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[3], + name=test_parameter.external_ids[3], + ), file_name=current_dir.joinpath("empty_file.txt"), ) queue.add_to_upload_queue( file_meta=CogniteExtractorFileApply( - external_id=test_parameter.external_ids[5], name=test_parameter.external_ids[5], space=test_parameter.space + external_id=test_parameter.external_ids[5], + name=test_parameter.external_ids[5], + space=test_parameter.space, ), file_name=current_dir.joinpath("test_file_1.txt"), ) queue.add_to_upload_queue( file_meta=CogniteExtractorFileApply( - external_id=test_parameter.external_ids[6], name=test_parameter.external_ids[6], space=test_parameter.space + external_id=test_parameter.external_ids[6], + name=test_parameter.external_ids[6], + space=test_parameter.space, ), file_name=current_dir.joinpath("test_file_2.txt"), ) @@ -137,9 +204,12 @@ def test_file_upload_queue(set_upload_test: tuple[CogniteClient, ParamTest], fun assert file4 == b"test content\n" assert file5 == b"other test content\n" + node = client.data_modeling.instances.retrieve_nodes( - NodeId(test_parameter.space, test_parameter.external_ids[8]), node_cls=CogniteExtractorFile + NodeId(test_parameter.space, test_parameter.external_ids[8]), + node_cls=CogniteExtractorFile, ) + assert isinstance(node, CogniteExtractorFile) assert file6 is not None and file6.instance_id is not None and file6.instance_id.space == test_parameter.space @@ -155,23 +225,33 @@ def test_bytes_upload_queue(set_upload_test: tuple[CogniteClient, ParamTest], fu queue.add_to_upload_queue( content=b"bytes content", - file_meta=FileMetadata(external_id=test_parameter.external_ids[0], name=test_parameter.external_ids[0]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[0], + name=test_parameter.external_ids[0], + ), ) queue.add_to_upload_queue( content=b"other bytes content", - file_meta=FileMetadata(external_id=test_parameter.external_ids[1], name=test_parameter.external_ids[1]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[1], + name=test_parameter.external_ids[1], + ), ) queue.add_to_upload_queue( content=b"bytes content", file_meta=CogniteExtractorFileApply( - external_id=test_parameter.external_ids[5], name=test_parameter.external_ids[5], space=test_parameter.space + external_id=test_parameter.external_ids[5], + name=test_parameter.external_ids[5], + space=test_parameter.space, ), ) queue.add_to_upload_queue( content=b"other bytes content", file_meta=CogniteExtractorFileApply( - external_id=test_parameter.external_ids[6], name=test_parameter.external_ids[6], space=test_parameter.space + external_id=test_parameter.external_ids[6], + name=test_parameter.external_ids[6], + space=test_parameter.space, ), ) @@ -207,12 +287,17 @@ def test_big_file_upload_queue(set_upload_test: tuple[CogniteClient, ParamTest], queue.add_to_upload_queue( content=content, - file_meta=FileMetadata(external_id=test_parameter.external_ids[2], name=test_parameter.external_ids[2]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[2], + name=test_parameter.external_ids[2], + ), ) queue.add_to_upload_queue( content=content, file_meta=CogniteExtractorFileApply( - external_id=test_parameter.external_ids[7], name=test_parameter.external_ids[7], space=test_parameter.space + external_id=test_parameter.external_ids[7], + name=test_parameter.external_ids[7], + space=test_parameter.space, ), ) @@ -238,7 +323,11 @@ def test_big_file_stream(set_upload_test: tuple[CogniteClient, ParamTest]) -> No class BufferedReadWithLength(io.BufferedReader): def __init__( - self, raw: io.RawIOBase, buffer_size: int, len: int, on_close: Callable[[], None] | None = None + self, + raw: io.RawIOBase, + buffer_size: int, + len: int, + on_close: Callable[[], None] | None = None, ) -> None: super().__init__(raw, buffer_size) # Do not remove even if it appears to be unused. :P @@ -258,12 +347,17 @@ def read_file() -> BufferedReadWithLength: assert test_parameter.space is not None queue.add_io_to_upload_queue( - file_meta=FileMetadata(external_id=test_parameter.external_ids[4], name=test_parameter.external_ids[4]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[4], + name=test_parameter.external_ids[4], + ), read_file=read_file, ) queue.add_io_to_upload_queue( file_meta=CogniteExtractorFileApply( - external_id=test_parameter.external_ids[9], name=test_parameter.external_ids[9], space=test_parameter.space + external_id=test_parameter.external_ids[9], + name=test_parameter.external_ids[9], + space=test_parameter.space, ), read_file=read_file, ) @@ -285,7 +379,10 @@ def test_update_files(set_upload_test: tuple[CogniteClient, ParamTest]) -> None: queue.add_to_upload_queue( content=b"bytes content", - file_meta=FileMetadata(external_id=test_parameter.external_ids[0], name=test_parameter.external_ids[0]), + file_meta=FileMetadata( + external_id=test_parameter.external_ids[0], + name=test_parameter.external_ids[0], + ), ) queue.add_to_upload_queue( content=b"bytes content",