Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for reporting files that have not been uploaded #394

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4a39877
Changes for reporting files that have not been uploaded
Nov 21, 2024
2c99031
Merge branch 'master' into DOG-4448-file-extractor-simplify-extractio…
nithinb Nov 22, 2024
45a5813
formatting changes
Nov 22, 2024
26af742
Merge branch 'DOG-4448-file-extractor-simplify-extraction-failure-ana…
Nov 22, 2024
353f99c
Changes requested on PR
Nov 26, 2024
865d97a
Changes suggested on the PR
Dec 6, 2024
998f48a
Changes suggested on the PR
Dec 6, 2024
ee53fb2
removed datetime as a dependency
Dec 10, 2024
5974f9f
Added test case for file upload failure
Dec 11, 2024
fde7130
change the method being called in the test case
Dec 11, 2024
cb6b534
Undo a minor change
Dec 11, 2024
b00bce6
parameter fix
Dec 11, 2024
17475e5
validation changes
Dec 11, 2024
a95b1fc
minor change to error message
Dec 11, 2024
a892e75
path fix for file
Dec 11, 2024
932b4f1
Added file with no permission
Dec 11, 2024
bfe9af2
Changes to path for failure log
Dec 11, 2024
1e67733
Flush failure logger
Dec 11, 2024
5ce8b76
Changes to check in-memory
Dec 11, 2024
77b2ada
Added sleep for the error logs to get stored
Dec 11, 2024
2ec829f
Added print statements
Dec 11, 2024
56cb8a9
stringify the path on joinpath
Dec 11, 2024
2ba4a86
add assert
Dec 11, 2024
27fffb9
raise manual exception
Dec 11, 2024
4ad3083
Added random printing stuff
Dec 11, 2024
2231fcd
changes to assert statements
Dec 11, 2024
16f79e7
Handle the exception in upload queue and check for value in failure l…
Dec 11, 2024
f7ba15c
read jsonlines file and validate it
Dec 11, 2024
344ff62
remove incorrect validation with len
Dec 11, 2024
8cdb5af
Delete file with no permission
Dec 11, 2024
37afff9
remove print statements
Dec 11, 2024
827452b
Merge branch 'master' of https://github.com/cognitedata/python-extrac…
Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 77 additions & 11 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@
from math import ceil
from os import PathLike
from types import TracebackType
from typing import Any, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union
from typing import (
Any,
BinaryIO,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
)
from urllib.parse import ParseResult, urlparse

from httpx import URL, Client, Headers, Request, StreamConsumed, SyncByteStream
Expand All @@ -27,7 +38,9 @@
from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata, FileMetadataUpdate
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorFileApply
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import (
CogniteExtractorFileApply,
)
from cognite.client.utils._identifier import IdentifierSequence
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
Expand All @@ -42,6 +55,7 @@
FILES_UPLOADER_QUEUED,
FILES_UPLOADER_WRITTEN,
)
from cognite.extractorutils.uploader.upload_failure_handler import FileFailureManager
from cognite.extractorutils.util import cognite_exceptions, retry

_QUEUES: int = 0
Expand All @@ -54,6 +68,7 @@

_CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"}


FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply]


Expand Down Expand Up @@ -97,7 +112,10 @@ def __enter__(self) -> "ChunkedStream":
return super().__enter__()

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
return super().__exit__(exc_type, exc_val, exc_tb)

Expand Down Expand Up @@ -193,6 +211,7 @@ def __init__(
overwrite_existing: bool = False,
cancellation_token: Optional[CancellationToken] = None,
max_parallelism: Optional[int] = None,
failure_logging_path: None | str = None,
):
# Super sets post_upload and threshold
super().__init__(
Expand All @@ -208,6 +227,9 @@ def __init__(
if self.threshold <= 0:
raise ValueError("Max queue size must be positive for file upload queues")

self.failure_logging_path = failure_logging_path or None
self.initialize_failure_logging()

self.upload_queue: List[Future] = []
self.errors: List[Exception] = []

Expand Down Expand Up @@ -235,10 +257,32 @@ def __init__(
global _QUEUES, _QUEUES_LOCK
with _QUEUES_LOCK:
self._pool = ThreadPoolExecutor(
max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}"
max_workers=self.parallelism,
thread_name_prefix=f"FileUploadQueue-{_QUEUES}",
)
_QUEUES += 1

def initialize_failure_logging(self) -> None:
self._file_failure_manager: FileFailureManager | None = (
FileFailureManager(path_to_file=self.failure_logging_path)
if self.failure_logging_path is not None
else None
)

def get_failure_logger(self) -> FileFailureManager | None:
return self._file_failure_manager

def add_entry_failure_logger(self, file_name: str, error: Exception) -> None:
if self._file_failure_manager is not None:
error_reason = str(error)
self._file_failure_manager.add(file_name=file_name, error_reason=error_reason)

def flush_failure_logger(self) -> None:
if self._file_failure_manager is not None:
self.logger.info("Flushing failure logs")
self._file_failure_manager.write_to_file()
self._file_failure_manager.clear()

def _remove_done_from_queue(self) -> None:
while not self.cancellation_token.is_cancelled:
with self.lock:
Expand Down Expand Up @@ -284,6 +328,7 @@ def _upload_empty(
def _upload_bytes(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None:
file_meta, url = self._upload_empty(file_meta)
resp = self._httpx_client.send(self._get_file_upload_request(url, file, size, file_meta.mime_type))

resp.raise_for_status()

def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None:
Expand Down Expand Up @@ -329,7 +374,10 @@ def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chun
res = self.cdf_client.files._post(
url_path="/files/initmultipartupload",
json=file_meta.dump(camel_case=True),
params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count},
params={
"overwrite": self.overwrite_existing,
"parts": chunks.chunk_count,
},
)
res.raise_for_status()
return res.json()
Expand All @@ -339,7 +387,10 @@ def add_io_to_upload_queue(
file_meta: FileMetadataOrCogniteExtractorFile,
read_file: Callable[[], BinaryIO],
extra_retries: Optional[
Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]]
Union[
Tuple[Type[Exception], ...],
Dict[Type[Exception], Callable[[Any], bool]],
]
] = None,
) -> None:
"""
Expand All @@ -366,7 +417,10 @@ def add_io_to_upload_queue(
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None:
def upload_file(
read_file: Callable[[], BinaryIO],
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
with read_file() as file:
size = super_len(file)
if size == 0:
Expand All @@ -388,12 +442,18 @@ def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogn
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None:
def wrapped_upload(
read_file: Callable[[], BinaryIO],
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
try:
upload_file(read_file, file_meta)

except Exception as e:
self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}")
self.logger.exception(
f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}"
)
self.add_entry_failure_logger(file_name=str(file_meta.name), error=e)
self.errors.append(e)

finally:
Expand Down Expand Up @@ -470,6 +530,7 @@ def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -
self.queue_size.set(self.upload_queue_size)
if fail_on_errors and self.errors:
# There might be more errors, but we can only have one as the cause, so pick the first
self.flush_failure_logger()
raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0]

def __enter__(self) -> "IOFileUploadQueue":
Expand All @@ -485,7 +546,10 @@ def __enter__(self) -> "IOFileUploadQueue":
return self

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""
Wraps around stop method, for use as context manager
Expand Down Expand Up @@ -544,7 +608,9 @@ def __init__(
)

def add_to_upload_queue(
self, file_meta: FileMetadataOrCogniteExtractorFile, file_name: Union[str, PathLike]
self,
file_meta: FileMetadataOrCogniteExtractorFile,
file_name: Union[str, PathLike],
) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
Expand Down
58 changes: 58 additions & 0 deletions cognite/extractorutils/uploader/upload_failure_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from datetime import datetime
from typing import Any, Iterator, List

import jsonlines


class FileErrorMapping:
def __init__(self, file_name: str, error_reason: str) -> None:
self.file_name = file_name
self.error_reason = error_reason

def __iter__(self) -> Iterator[List[str]]:
return iter([[self.file_name, self.error_reason]])


class FileFailureManager:
MAX_QUEUE_SIZE = 500
START_TIME_KEY = "start_time"
FILE_REASON_MAP_KEY = "file_error_reason_map"

def __init__(self, start_time: str | None = None, path_to_file: str | None = None) -> None:
self.failure_logs: dict[str, Any] = {}
nithinb marked this conversation as resolved.
Show resolved Hide resolved

self.path_to_failure_log: str = self._pre_process_file_extension(path_to_file)
self.start_time = start_time or str(datetime.now())
self._initialize_failure_logs()

def _pre_process_file_extension(self, path_to_file: str | None) -> str:
if path_to_file and not path_to_file.endswith(".jsonl"):
return path_to_file + ".jsonl"
return str(path_to_file)

def _initialize_failure_logs(self) -> None:
self.failure_logs[FileFailureManager.START_TIME_KEY] = self.start_time
self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY] = {}

def __len__(self) -> int:
return len(self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY])

def clear(self) -> None:
self.failure_logs.clear()
self._initialize_failure_logs()

def add(self, file_name: str, error_reason: str) -> None:
error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason)

self.failure_logs[FileFailureManager.FILE_REASON_MAP_KEY].update(error_file_object)

if len(self) >= self.MAX_QUEUE_SIZE:
self.write_to_file()
nithinb marked this conversation as resolved.
Show resolved Hide resolved
self.clear()

def write_to_file(self) -> None:
nithinb marked this conversation as resolved.
Show resolved Hide resolved
if len(self.failure_logs[self.FILE_REASON_MAP_KEY]) == 0:
return

with jsonlines.open(self.path_to_failure_log, mode="a") as writer:
writer.write(self.failure_logs)
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ httpx = "^0.27.0"
pydantic = "^2.8.2"
pyhumps = "^3.8.0"
croniter = "^5.0.0"
jsonlines = "^4.0.0"
datetime = "5.5"
nithinb marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.extras]
experimental = ["cognite-sdk-experimental"]
Expand Down
Loading