Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for reporting files that have not been uploaded #394

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4a39877
Changes for reporting files that have not been uploaded
Nov 21, 2024
2c99031
Merge branch 'master' into DOG-4448-file-extractor-simplify-extractio…
nithinb Nov 22, 2024
45a5813
formatting changes
Nov 22, 2024
26af742
Merge branch 'DOG-4448-file-extractor-simplify-extraction-failure-ana…
Nov 22, 2024
353f99c
Changes requested on PR
Nov 26, 2024
865d97a
Changes suggested on the PR
Dec 6, 2024
998f48a
Changes suggested on the PR
Dec 6, 2024
ee53fb2
removed datetime as a dependency
Dec 10, 2024
5974f9f
Added test case for file upload failure
Dec 11, 2024
fde7130
change the method being called in the test case
Dec 11, 2024
cb6b534
Undo a minor change
Dec 11, 2024
b00bce6
parameter fix
Dec 11, 2024
17475e5
validation changes
Dec 11, 2024
a95b1fc
minor change to error message
Dec 11, 2024
a892e75
path fix for file
Dec 11, 2024
932b4f1
Added file with no permission
Dec 11, 2024
bfe9af2
Changes to path for failure log
Dec 11, 2024
1e67733
Flush failure logger
Dec 11, 2024
5ce8b76
Changes to check in-memory
Dec 11, 2024
77b2ada
Added sleep for the error logs to get stored
Dec 11, 2024
2ec829f
Added print statements
Dec 11, 2024
56cb8a9
stringify the path on joinpath
Dec 11, 2024
2ba4a86
add assert
Dec 11, 2024
27fffb9
raise manual exception
Dec 11, 2024
4ad3083
Added random printing stuff
Dec 11, 2024
2231fcd
changes to assert statements
Dec 11, 2024
16f79e7
Handle the exception in upload queue and check for value in failure l…
Dec 11, 2024
f7ba15c
read jsonlines file and validate it
Dec 11, 2024
344ff62
remove incorrect validation with len
Dec 11, 2024
8cdb5af
Delete file with no permission
Dec 11, 2024
37afff9
remove print statements
Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 76 additions & 11 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,18 @@
from math import ceil
from os import PathLike
from types import TracebackType
from typing import Any, BinaryIO, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union
from typing import (
Any,
BinaryIO,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
)
from urllib.parse import ParseResult, urlparse

from httpx import URL, Client, Headers, Request, StreamConsumed, SyncByteStream
Expand All @@ -27,7 +38,9 @@
from cognite.client import CogniteClient
from cognite.client.data_classes import FileMetadata, FileMetadataUpdate
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import CogniteExtractorFileApply
from cognite.client.data_classes.data_modeling.extractor_extensions.v1 import (
CogniteExtractorFileApply,
)
from cognite.client.utils._identifier import IdentifierSequence
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader._base import (
Expand All @@ -42,6 +55,7 @@
FILES_UPLOADER_QUEUED,
FILES_UPLOADER_WRITTEN,
)
from cognite.extractorutils.uploader.upload_failure_handler import FileFailureManager
from cognite.extractorutils.util import cognite_exceptions, retry

_QUEUES: int = 0
Expand All @@ -54,6 +68,7 @@

_CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"}


FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply]


Expand Down Expand Up @@ -97,7 +112,10 @@ def __enter__(self) -> "ChunkedStream":
return super().__enter__()

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
return super().__exit__(exc_type, exc_val, exc_tb)

Expand Down Expand Up @@ -193,6 +211,7 @@ def __init__(
overwrite_existing: bool = False,
cancellation_token: Optional[CancellationToken] = None,
max_parallelism: Optional[int] = None,
failure_logging_path: None | str = None,
):
# Super sets post_upload and threshold
super().__init__(
Expand All @@ -208,6 +227,9 @@ def __init__(
if self.threshold <= 0:
raise ValueError("Max queue size must be positive for file upload queues")

self.failure_logging_path = failure_logging_path or None
self.initialize_failure_logging()

self.upload_queue: List[Future] = []
self.errors: List[Exception] = []

Expand Down Expand Up @@ -235,10 +257,31 @@ def __init__(
global _QUEUES, _QUEUES_LOCK
with _QUEUES_LOCK:
self._pool = ThreadPoolExecutor(
max_workers=self.parallelism, thread_name_prefix=f"FileUploadQueue-{_QUEUES}"
max_workers=self.parallelism,
thread_name_prefix=f"FileUploadQueue-{_QUEUES}",
)
_QUEUES += 1

def initialize_failure_logging(self) -> None:
self._file_failure_manager: FileFailureManager | None = (
FileFailureManager(path_to_file=self.failure_logging_path)
if self.failure_logging_path is not None
else None
)

def get_failure_logger(self) -> FileFailureManager | None:
return self._file_failure_manager

def add_entry_failure_logger(self, file_name: str, error: Exception) -> None:
if self._file_failure_manager is not None:
error_reason = str(error)
self._file_failure_manager.add(file_name=file_name, error_reason=error_reason)

def flush_failure_logger(self) -> None:
if self._file_failure_manager is not None:
self.logger.info("Flushing failure logs")
self._file_failure_manager.write_to_file()

def _remove_done_from_queue(self) -> None:
while not self.cancellation_token.is_cancelled:
with self.lock:
Expand Down Expand Up @@ -284,6 +327,7 @@ def _upload_empty(
def _upload_bytes(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None:
file_meta, url = self._upload_empty(file_meta)
resp = self._httpx_client.send(self._get_file_upload_request(url, file, size, file_meta.mime_type))

resp.raise_for_status()

def _upload_multipart(self, size: int, file: BinaryIO, file_meta: FileMetadataOrCogniteExtractorFile) -> None:
Expand Down Expand Up @@ -329,7 +373,10 @@ def _create_multi_part(self, file_meta: FileMetadataOrCogniteExtractorFile, chun
res = self.cdf_client.files._post(
url_path="/files/initmultipartupload",
json=file_meta.dump(camel_case=True),
params={"overwrite": self.overwrite_existing, "parts": chunks.chunk_count},
params={
"overwrite": self.overwrite_existing,
"parts": chunks.chunk_count,
},
)
res.raise_for_status()
return res.json()
Expand All @@ -339,7 +386,10 @@ def add_io_to_upload_queue(
file_meta: FileMetadataOrCogniteExtractorFile,
read_file: Callable[[], BinaryIO],
extra_retries: Optional[
Union[Tuple[Type[Exception], ...], Dict[Type[Exception], Callable[[Any], bool]]]
Union[
Tuple[Type[Exception], ...],
Dict[Type[Exception], Callable[[Any], bool]],
]
] = None,
) -> None:
"""
Expand All @@ -366,7 +416,10 @@ def add_io_to_upload_queue(
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None:
def upload_file(
read_file: Callable[[], BinaryIO],
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
with read_file() as file:
size = super_len(file)
if size == 0:
Expand All @@ -388,12 +441,18 @@ def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogn
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadataOrCogniteExtractorFile) -> None:
def wrapped_upload(
read_file: Callable[[], BinaryIO],
file_meta: FileMetadataOrCogniteExtractorFile,
) -> None:
try:
upload_file(read_file, file_meta)

except Exception as e:
self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}")
self.logger.exception(
f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}"
)
self.add_entry_failure_logger(file_name=str(file_meta.name), error=e)
self.errors.append(e)

finally:
Expand Down Expand Up @@ -470,6 +529,7 @@ def upload(self, fail_on_errors: bool = True, timeout: Optional[float] = None) -
self.queue_size.set(self.upload_queue_size)
if fail_on_errors and self.errors:
# There might be more errors, but we can only have one as the cause, so pick the first
self.flush_failure_logger()
raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0]

def __enter__(self) -> "IOFileUploadQueue":
Expand All @@ -485,7 +545,10 @@ def __enter__(self) -> "IOFileUploadQueue":
return self

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""
Wraps around stop method, for use as context manager
Expand Down Expand Up @@ -544,7 +607,9 @@ def __init__(
)

def add_to_upload_queue(
self, file_meta: FileMetadataOrCogniteExtractorFile, file_name: Union[str, PathLike]
self,
file_meta: FileMetadataOrCogniteExtractorFile,
file_name: Union[str, PathLike],
) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
Expand Down
64 changes: 64 additions & 0 deletions cognite/extractorutils/uploader/upload_failure_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from datetime import datetime
from typing import Iterator, List

import jsonlines


class FileErrorMapping:
def __init__(self, file_name: str, error_reason: str) -> None:
self.file_name = file_name
self.error_reason = error_reason

def __iter__(self) -> Iterator[List[str]]:
return iter([[self.file_name, self.error_reason]])


class FileFailureManager:
MAX_QUEUE_SIZE = 500
START_TIME_KEY = "start_time"
FILE_REASON_MAP_KEY = "file_error_reason_map"

def __init__(self, start_time: str | None = None, path_to_file: str | None = None) -> None:
self.failure_logs: dict[str, str] = {}

self.path_to_failure_log: str = self._pre_process_file_extension(path_to_file)
self.start_time = start_time or str(datetime.now())
self._initialize_failure_logs()

def _pre_process_file_extension(self, path_to_file: str | None) -> str:
if path_to_file and not path_to_file.endswith(".jsonl"):
return path_to_file + ".jsonl"
return str(path_to_file)

def _initialize_failure_logs(self) -> None:
self.failure_logs = {}

def __len__(self) -> int:
return len(self.failure_logs)

def clear(self) -> None:
self.failure_logs.clear()
self._initialize_failure_logs()

def add(self, file_name: str, error_reason: str) -> None:
error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason)
error_file_dict = dict(error_file_object)

self.failure_logs.update(error_file_dict)

if len(self) >= self.MAX_QUEUE_SIZE:
self.write_to_file()
nithinb marked this conversation as resolved.
Show resolved Hide resolved

def write_to_file(self) -> None:
nithinb marked this conversation as resolved.
Show resolved Hide resolved
if len(self) == 0:
return

dict_to_write = {
self.START_TIME_KEY: self.start_time,
self.FILE_REASON_MAP_KEY: self.failure_logs,
}

with jsonlines.open(self.path_to_failure_log, mode="a") as writer:
writer.write(dict_to_write)

self.clear()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ httpx = "^0.27.0"
pydantic = "^2.8.2"
pyhumps = "^3.8.0"
croniter = "^5.0.0"
jsonlines = "^4.0.0"

[tool.poetry.extras]
experimental = ["cognite-sdk-experimental"]
Expand Down
52 changes: 51 additions & 1 deletion tests/tests_integration/test_file_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import pathlib
import random
import time
from typing import Callable, Optional, Tuple
from typing import BinaryIO, Callable, Optional, Tuple

import jsonlines
import pytest

from cognite.client import CogniteClient
Expand Down Expand Up @@ -67,6 +68,53 @@ def await_is_uploaded_status(
time.sleep(1)


@pytest.mark.parametrize("functions_runtime", ["true"])
def test_errored_file(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None:
LOG_FAILURE_FILE = "integration_test_failure_log.jsonl"
NO_PERMISSION_FILE = "file_with_no_permission.txt"
FILE_REASON_MAP_KEY = "file_error_reason_map"
ERROR_RAISED_ON_FILE_READ = "No permission to read file"

current_dir = pathlib.Path(__file__).parent.resolve()
os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime
client, test_parameter = set_upload_test
fully_qualified_failure_logging_path = str(current_dir.joinpath(LOG_FAILURE_FILE))
queue = IOFileUploadQueue(
cdf_client=client,
overwrite_existing=True,
max_queue_size=2,
failure_logging_path=fully_qualified_failure_logging_path,
)

def load_file_from_path() -> BinaryIO:
raise Exception(ERROR_RAISED_ON_FILE_READ)

# Upload a pair of actual files
assert test_parameter.external_ids is not None
assert test_parameter.space is not None
queue.add_io_to_upload_queue(
file_meta=FileMetadata(
external_id=test_parameter.external_ids[0],
name=NO_PERMISSION_FILE,
),
read_file=load_file_from_path,
)

try:
queue.upload()

time.sleep(5)
except Exception as e:
failure_logger = queue.get_failure_logger()

with jsonlines.open(fully_qualified_failure_logging_path, "r") as reader:
for failure_logger_run in reader:
assert FILE_REASON_MAP_KEY in failure_logger_run
assert NO_PERMISSION_FILE in failure_logger_run[FILE_REASON_MAP_KEY]
assert ERROR_RAISED_ON_FILE_READ in failure_logger_run[FILE_REASON_MAP_KEY][NO_PERMISSION_FILE]
os.remove(fully_qualified_failure_logging_path)


@pytest.mark.parametrize("functions_runtime", ["true", "false"])
def test_file_upload_queue(set_upload_test: Tuple[CogniteClient, ParamTest], functions_runtime: str) -> None:
os.environ["COGNITE_FUNCTION_RUNTIME"] = functions_runtime
Expand Down Expand Up @@ -137,9 +185,11 @@ def test_file_upload_queue(set_upload_test: Tuple[CogniteClient, ParamTest], fun

assert file4 == b"test content\n"
assert file5 == b"other test content\n"

node = client.data_modeling.instances.retrieve_nodes(
NodeId(test_parameter.space, test_parameter.external_ids[8]), node_cls=CogniteExtractorFile
)

assert isinstance(node, CogniteExtractorFile)
assert file6 is not None and file6.instance_id is not None and file6.instance_id.space == test_parameter.space

Expand Down
Loading