Skip to content

Commit

Permalink
Improve file retries (#326)
Browse files Browse the repository at this point in the history
Include external ID when logging failed files.

I also discovered an issue with how the retries was set up which caused
the uploads to not properly retry. The internal try/catch needed for the
thread pool ended up stopping retries. I split it out so that we have a
'wrap' function which applies the final layer of error handling.
  • Loading branch information
mathialo authored May 10, 2024
1 parent 5f5681e commit aa27c11
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 64 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.


## 7.1.5

### Fixed

* Fixed an issue preventing retries in file uploads from working properly

### Added

* File external ID when logging failed file uploads

## 7.1.4

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.1.4"
__version__ = "7.1.5"
from .base import Extractor
126 changes: 64 additions & 62 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,70 +251,72 @@ def add_io_to_upload_queue(
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_single(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
def upload_file(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
with read_file() as file:
size = super_len(file)
if size == 0:
# upload just the file metadata witout data
file_meta, _url = self.cdf_client.files.create(
file_metadata=file_meta, overwrite=self.overwrite_existing
)
elif size >= self.max_single_chunk_file_size:
# The minimum chunk size is 4000MiB.
chunks = ChunkedStream(file, self.max_file_chunk_size, size)
self.logger.debug(
f"File {file_meta.external_id} is larger than 5GiB ({size})"
f", uploading in {chunks.chunk_count} chunks"
)
with self.cdf_client.files.multipart_upload_session(
file_meta.name if file_meta.name is not None else "",
parts=chunks.chunk_count,
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
) as session:
while chunks.next_chunk():
session.upload_part(chunks.current_chunk, chunks)
file_meta = session.file_metadata
else:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

if self.post_upload_function:
try:
self.post_upload_function([file_meta])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

def wrapped_upload(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
try:
# Upload file
with read_file() as file:
size = super_len(file)
if size == 0:
# upload just the file metadata witout data
file_meta, _url = self.cdf_client.files.create(
file_metadata=file_meta, overwrite=self.overwrite_existing
)
elif size >= self.max_single_chunk_file_size:
# The minimum chunk size is 4000MiB.
chunks = ChunkedStream(file, self.max_file_chunk_size, size)
self.logger.debug(
f"File {file_meta.external_id} is larger than 5GiB ({size})"
f", uploading in {chunks.chunk_count} chunks"
)
with self.cdf_client.files.multipart_upload_session(
file_meta.name if file_meta.name is not None else "",
parts=chunks.chunk_count,
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
) as session:
while chunks.next_chunk():
session.upload_part(chunks.current_chunk, chunks)
file_meta = session.file_metadata
else:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

if self.post_upload_function:
try:
self.post_upload_function([file_meta])
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))
upload_file(read_file, file_meta)

except Exception as e:
self.logger.exception("Unexpected error while uploading file")
self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}")
self.errors.append(e)

finally:
Expand All @@ -331,7 +333,7 @@ def _upload_single(read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -
pass

with self.lock:
self.upload_queue.append(self._pool.submit(_upload_single, read_file, file_meta))
self.upload_queue.append(self._pool.submit(wrapped_upload, read_file, file_meta))
self.upload_queue_size += 1
self.files_queued.inc()
self.queue_size.set(self.upload_queue_size)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "7.1.4"
version = "7.1.5"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down

0 comments on commit aa27c11

Please sign in to comment.