Skip to content

Commit

Permalink
Use a shared base class for the file upload queues
Browse files Browse the repository at this point in the history
This class can also be used directly to load data that is neither a
local file or an in-memory byte buffer, for example if data is actively
being streamed from somewhere else.
  • Loading branch information
einarmo committed Nov 21, 2023
1 parent eeed8fa commit 552c756
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cognite/extractorutils/uploader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

from .assets import AssetUploadQueue
from .events import EventUploadQueue
from .files import BytesUploadQueue, FileUploadQueue
from .files import BytesUploadQueue, FileUploadQueue, IOFileUploadQueue
from .raw import RawUploadQueue
from .time_series import (
DataPoint,
Expand Down
195 changes: 93 additions & 102 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

import threading
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from io import BytesIO
from os import PathLike
from types import TracebackType
from typing import Any, Callable, List, Optional, Tuple, Type, Union
from typing import Any, BinaryIO, Callable, List, Optional, Tuple, Type, Union

from requests import ConnectionError

Expand All @@ -31,19 +32,19 @@
AbstractUploadQueue,
)
from cognite.extractorutils.uploader._metrics import (
BYTES_UPLOADER_QUEUE_SIZE,
BYTES_UPLOADER_QUEUED,
BYTES_UPLOADER_WRITTEN,
FILES_UPLOADER_QUEUE_SIZE,
FILES_UPLOADER_QUEUED,
FILES_UPLOADER_WRITTEN,
)
from cognite.extractorutils.util import retry


class FileUploadQueue(AbstractUploadQueue):
class IOFileUploadQueue(AbstractUploadQueue):
"""
Upload queue for files
Upload queue for files using BinaryIO
Note that if the upload fails, the stream needs to be restarted, so
the enqueued callback needs to produce a new IO object for each call.
Args:
cdf_client: Cognite Data Fusion client to use
Expand Down Expand Up @@ -78,14 +79,14 @@ def __init__(
cancellation_token,
)

self.upload_queue: List[Tuple[FileMetadata, Union[str, PathLike]]] = []
self.upload_queue: List[Tuple[FileMetadata, Union[str, Callable[[], BinaryIO]]]] = []
self.overwrite_existing = overwrite_existing

self.files_queued = FILES_UPLOADER_QUEUED
self.files_written = FILES_UPLOADER_WRITTEN
self.queue_size = FILES_UPLOADER_QUEUE_SIZE

def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, PathLike]) -> None:
def add_io_to_upload_queue(self, file_meta: FileMetadata, read_file: Callable[[], BinaryIO]) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Expand All @@ -96,7 +97,7 @@ def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, Pat
If none, the file object will still be created, but no data is uploaded
"""
with self.lock:
self.upload_queue.append((file_meta, file_name))
self.upload_queue.append((file_meta, read_file))
self.upload_queue_size += 1
self.files_queued.inc()
self.queue_size.set(self.upload_queue_size)
Expand Down Expand Up @@ -131,21 +132,41 @@ def upload(self) -> None:
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_single(self, index: int, file_name: Union[str, PathLike], file_meta: FileMetadata) -> None:
def _upload_single(self, index: int, read_file: Callable[[], BinaryIO], file_meta: FileMetadata) -> None:
# Upload file
file_meta = self.cdf_client.files.upload(str(file_name), overwrite=self.overwrite_existing, **file_meta.dump()) # type: ignore
with read_file() as file:
file_meta = self.cdf_client.files.upload_bytes(
file,
file_meta.name if file_meta.name is not None else "",
overwrite=self.overwrite_existing,
external_id=file_meta.external_id,
source=file_meta.source,
mime_type=file_meta.mime_type,
metadata=file_meta.metadata,
directory=file_meta.directory,
asset_ids=file_meta.asset_ids,
data_set_id=file_meta.data_set_id,
labels=file_meta.labels,
geo_location=file_meta.geo_location,
source_created_time=file_meta.source_created_time,
source_modified_time=file_meta.source_modified_time,
security_categories=file_meta.security_categories,
)

# Update meta-object in queue
self.upload_queue[index] = (file_meta, file_name)
self.upload_queue[index] = (file_meta, read_file)

def _upload_batch(self) -> None:
# Concurrently execute file-uploads

futures: list[Future] = []
with ThreadPoolExecutor(self.cdf_client.config.max_workers) as pool:
for i, (file_meta, file_name) in enumerate(self.upload_queue):
pool.submit(self._upload_single, i, file_name, file_meta)
futures.append(pool.submit(self._upload_single, i, file_name, file_meta))
for fut in futures:
fut.result(0.0)

def __enter__(self) -> "FileUploadQueue":
def __enter__(self) -> "IOFileUploadQueue":
"""
Wraps around start method, for use as context manager
Expand Down Expand Up @@ -178,9 +199,9 @@ def __len__(self) -> int:
return self.upload_queue_size


class BytesUploadQueue(AbstractUploadQueue):
class FileUploadQueue(IOFileUploadQueue):
"""
Upload queue for bytes
Upload queue for files
Args:
cdf_client: Cognite Data Fusion client to use
Expand All @@ -191,126 +212,96 @@ class BytesUploadQueue(AbstractUploadQueue):
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten
"""

def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[Any]], None]] = None,
post_upload_function: Optional[Callable[[List[Event]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
cancellation_token: threading.Event = threading.Event(),
) -> None:
):
# Super sets post_upload and threshold
super().__init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
overwrite_existing,
cancellation_token,
)
self.upload_queue: List[Tuple[bytes, FileMetadata]] = []
self.overwrite_existing = overwrite_existing
self.upload_queue_size = 0

self.bytes_queued = BYTES_UPLOADER_QUEUED
self.queue_size = BYTES_UPLOADER_QUEUE_SIZE
self.bytes_written = BYTES_UPLOADER_WRITTEN

def add_to_upload_queue(self, content: bytes, metadata: FileMetadata) -> None:
def add_to_upload_queue(self, file_meta: FileMetadata, file_name: Union[str, PathLike]) -> None:
"""
Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Args:
content: bytes object to upload
metadata: metadata for the given bytes object
"""
with self.lock:
self.upload_queue.append((content, metadata))
self.upload_queue_size += 1
self.bytes_queued.inc()
self.queue_size.set(self.upload_queue_size)
def upload(self) -> None:
"""
Trigger an upload of the queue, clears queue afterwards
Args:
file_meta: File metadata-object
file_name: Path to file to be uploaded.
If none, the file object will still be created, but no data is uploaded
"""
if len(self.upload_queue) == 0:
return

with self.lock:
# Upload frames in batches
self._upload_batch()
def load_file_from_path() -> BinaryIO:
return open(file_name, "rb")

# Log stats
self.bytes_written.inc(self.upload_queue_size)
self.add_io_to_upload_queue(file_meta, load_file_from_path)

try:
self._post_upload(self.upload_queue)
except Exception as e:
self.logger.error("Error in upload callback: %s", str(e))

# Clear queue
self.upload_queue.clear()
self.upload_queue_size = 0
self.logger.info(f"Uploaded {self.upload_queue_size} files")
self.queue_size.set(self.upload_queue_size)
class BytesUploadQueue(IOFileUploadQueue):
"""
Upload queue for bytes
def _upload_batch(self) -> None:
# Concurrently execute bytes-uploads
with ThreadPoolExecutor(self.cdf_client.config.max_workers) as pool:
for i, (frame, metadata) in enumerate(self.upload_queue):
pool.submit(self._upload_single, i, frame, metadata)
Args:
cdf_client: Cognite Data Fusion client to use
post_upload_function: A function that will be called after each upload. The function will be given one argument:
A list of the events that were uploaded.
max_queue_size: Maximum size of upload queue. Defaults to no max size.
max_upload_interval: Automatically trigger an upload each m seconds when run as a thread (use start/stop
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
overwrite_existing: If 'overwrite' is set to true, fields for the files found for externalIds can be overwritten
"""

@retry(
exceptions=(CogniteAPIError, ConnectionError),
tries=RETRIES,
delay=RETRY_DELAY,
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_single(self, index: int, content: bytes, metadata: FileMetadata) -> None:
# Upload object
file_meta_data: FileMetadata = self.cdf_client.files.upload_bytes(
content, overwrite=self.overwrite_existing, **metadata.dump()
def __init__(
self,
cdf_client: CogniteClient,
post_upload_function: Optional[Callable[[List[Any]], None]] = None,
max_queue_size: Optional[int] = None,
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
overwrite_existing: bool = False,
cancellation_token: threading.Event = threading.Event(),
) -> None:
super().__init__(
cdf_client,
post_upload_function,
max_queue_size,
max_upload_interval,
trigger_log_level,
thread_name,
overwrite_existing,
cancellation_token,
)

# Update meta-object in queue
self.upload_queue[index] = (content, file_meta_data)

def __enter__(self) -> "BytesUploadQueue":
"""
Wraps around start method, for use as context manager
Returns:
self
"""
self.start()
return self

def __exit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> None:
def add_to_upload_queue(self, content: bytes, metadata: FileMetadata) -> None:
"""
Wraps around stop method, for use as context manager
Add object to upload queue. The queue will be uploaded if the queue size is larger than the threshold
specified in the __init__.
Args:
exc_type: Exception type
exc_val: Exception value
exc_tb: Traceback
content: bytes object to upload
metadata: metadata for the given bytes object
"""
self.stop()

def __len__(self) -> int:
"""
The size of the upload queue
def get_byte_io() -> BinaryIO:
return BytesIO(content)

Returns:
Number of events in queue
"""
return self.upload_queue_size
self.add_io_to_upload_queue(metadata, get_byte_io)
Loading

0 comments on commit 552c756

Please sign in to comment.