Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAR-3513][DAR-3514][DAR-3515][DAR-3516][DAR-3517][External] Multi-file push #923

Merged
merged 13 commits into from
Sep 26, 2024
Merged
1 change: 1 addition & 0 deletions darwin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def _run(args: Namespace, parser: ArgumentParser) -> None:
args.extract_views,
args.preserve_folders,
args.verbose,
args.item_merge_mode,
)
# Remove a project (remotely)
elif args.action == "remove":
Expand Down
42 changes: 28 additions & 14 deletions darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ def upload_data(
extract_views: bool = False,
preserve_folders: bool = False,
verbose: bool = False,
item_merge_mode: Optional[str] = None,
) -> None:
"""
Uploads the provided files to the remote dataset.
Expand Down Expand Up @@ -684,6 +685,14 @@ def upload_data(
Specify whether or not to preserve folder paths when uploading.
verbose : bool
Specify whether to have full traces print when uploading files or not.
item_merge_mode : Optional[str]
If set, each file path passed to `files_to_upload` behaves as follows:
- Paths pointing directly to individual files are ignored
- Paths pointing to folders of files will be uploaded according to the following mode rules.
Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded:
- "slots": Each file in the folder will be uploaded to a different slot of the same item.
- "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored.
- "channels": Each file in the folder will be uploaded to a different channel of the same item.
"""
client: Client = _load_client()
try:
Expand Down Expand Up @@ -773,6 +782,7 @@ def file_upload_callback(
preserve_folders=preserve_folders,
progress_callback=progress_callback,
file_upload_callback=file_upload_callback,
item_merge_mode=item_merge_mode,
)
console = Console(theme=_console_theme())

Expand All @@ -788,10 +798,13 @@ def file_upload_callback(
already_existing_items = []
other_skipped_items = []
for item in upload_manager.blocked_items:
if (item.reason is not None) and (item.reason.upper() == "ALREADY_EXISTS"):
already_existing_items.append(item)
else:
other_skipped_items.append(item)
for slot in item.slots:
if (slot["reason"] is not None) and (
slot["reason"].upper() == "ALREADY_EXISTS"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was pre-existing but I fear strings such as "ALREADY_EXISTS" and "UPLOAD_REQUEST" it's too easy to make a typo. I usually create a var to contain such constants.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll move reasons into a const based on their BE values

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, "UPLOAD_REQUEST" doesn't appear to be a BE constant. Instead, it looks like an arbitrarily chosen string to represent the stage of the upload where the error occurred

Section of PR where it was introduced

):
already_existing_items.append(item)
else:
other_skipped_items.append(item)

if already_existing_items:
console.print(
Expand Down Expand Up @@ -821,14 +834,15 @@ def file_upload_callback(
)

for item in upload_manager.blocked_items:
if item.reason != "ALREADY_EXISTS":
error_table.add_row(
str(item.dataset_item_id),
item.filename,
item.path,
"UPLOAD_REQUEST",
item.reason,
)
for slot in item.slots:
if slot["reason"] != "ALREADY_EXISTS":

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is different from what we do at line 802. Do we expect reason to always be defined and uppercase at this point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From looking at the relevant BE section, yes - Every blocked item will have a reason, and all the reasons are uppercase. However, I don't see a downside to introduce handling for potentially missing or lowercase reasons, so I can adjust this section to function as above

error_table.add_row(
str(item.dataset_item_id),
item.filename,
item.path,
"UPLOAD_REQUEST",
slot["reason"],
)

for error in upload_manager.errors:
for local_file in upload_manager.local_files:
Expand All @@ -855,8 +869,8 @@ def file_upload_callback(
_error(f"No dataset with name '{e.name}'")
except UnsupportedFileType as e:
_error(f"Unsupported file type {e.path.suffix} ({e.path.name})")
except ValueError:
_error("No files found")
except ValueError as e:
_error(f"{e}")


def dataset_import(
Expand Down
1 change: 1 addition & 0 deletions darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def push(
preserve_folders: bool = False,
progress_callback: Optional[ProgressCallback] = None,
file_upload_callback: Optional[FileUploadCallback] = None,
item_merge_mode: Optional[str] = None,
) -> UploadHandler:
pass

Expand Down
229 changes: 191 additions & 38 deletions darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -18,7 +19,9 @@
from darwin.dataset.release import Release
from darwin.dataset.upload_manager import (
FileUploadCallback,
ItemMergeMode,
LocalFile,
MultiFileItem,
ProgressCallback,
UploadHandler,
UploadHandlerV2,
Expand Down Expand Up @@ -166,14 +169,16 @@ def push(
preserve_folders: bool = False,
progress_callback: Optional[ProgressCallback] = None,
file_upload_callback: Optional[FileUploadCallback] = None,
item_merge_mode: Optional[str] = None,
) -> UploadHandler:
"""
Uploads a local dataset (images ONLY) in the datasets directory.

Parameters
----------
files_to_upload : Optional[List[Union[PathLike, LocalFile]]]
List of files to upload. Those can be folders.
List of files to upload. These can be folders.
If `item_merge_mode` is set, these paths must be folders.
blocking : bool, default: True
If False, the dataset is not uploaded and a generator function is returned instead.
multi_threaded : bool, default: True
Expand All @@ -188,7 +193,7 @@ def push(
extract_views: bool, default: False
When the uploading file is a volume, specify whether it's going to be split into orthogonal views.
files_to_exclude : Optional[PathLike]], default: None
Optional list of files to exclude from the file scan. Those can be folders.
Optional list of files to exclude from the file scan. These can be folders.
path: Optional[str], default: None
Optional path to store the files in.
preserve_folders : bool, default: False
Expand All @@ -197,11 +202,18 @@ def push(
Optional callback, called every time the progress of an uploading files is reported.
file_upload_callback: Optional[FileUploadCallback], default: None
Optional callback, called every time a file chunk is uploaded.

item_merge_mode : Optional[str]
If set, each file path passed to `files_to_upload` behaves as follows:
- Paths pointing directly to individual files are ignored
- Paths pointing to folders of files will be uploaded according to the following mode rules.
Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded:
- "slots": Each file in the folder will be uploaded to a different slot of the same item.
- "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored.
- "channels": Each file in the folder will be uploaded to a different channel of the same item.
Returns
-------
handler : UploadHandler
Class for handling uploads, progress and error messages.
Class for handling uploads, progress and error messages.

Raises
------
Expand All @@ -210,53 +222,61 @@ def push(
- If a path is specified when uploading a LocalFile object.
- If there are no files to upload (because path is wrong or the exclude filter excludes everything).
"""
merge_incompatible_args = {
"preserve_folders": preserve_folders,
"as_frames": as_frames,
"extract_views": extract_views,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here IMO high risk of typo, I'd:

# Define constants for the keys
PRESERVE_FOLDERS_KEY = "preserve_folders"
AS_FRAMES_KEY = "as_frames"
EXTRACT_VIEWS_KEY = "extract_views"

# Use the constants in the dictionary
merge_incompatible_args = {
    PRESERVE_FOLDERS_KEY: preserve_folders,
    AS_FRAMES_KEY: as_frames,
    EXTRACT_VIEWS_KEY: extract_views,
}

if files_to_exclude is None:
files_to_exclude = []

if files_to_upload is None:
raise ValueError("No files or directory specified.")

uploading_files = [
item for item in files_to_upload if isinstance(item, LocalFile)
]
if item_merge_mode:
try:
ItemMergeMode(item_merge_mode)
except ValueError:
raise ValueError(
f"Invalid item merge mode: {item_merge_mode}. Valid options are: 'slots', 'series', 'channels'"
)
incompatible_args = [
arg for arg, value in merge_incompatible_args.items() if value
]

if incompatible_args:
incompatible_args_str = ", ".join(incompatible_args)
raise TypeError(
f"`item_merge_mode` does not support the following incompatible arguments: {incompatible_args_str}."
)

# Direct file paths
local_files = [item for item in files_to_upload if isinstance(item, LocalFile)]

# Folder paths
search_files = [
item for item in files_to_upload if not isinstance(item, LocalFile)
]

generic_parameters_specified = (
path is not None or fps != 0 or as_frames is not False
)
if uploading_files and generic_parameters_specified:
raise ValueError("Cannot specify a path when uploading a LocalFile object.")

for found_file in find_files(search_files, files_to_exclude=files_to_exclude):
local_path = path
if preserve_folders:
source_files = [
source_file
for source_file in search_files
if is_relative_to(found_file, source_file)
]
if source_files:
local_path = str(
found_file.relative_to(source_files[0]).parent.as_posix()
)
uploading_files.append(
LocalFile(
found_file,
fps=fps,
as_frames=as_frames,
extract_views=extract_views,
path=local_path,
)
if item_merge_mode:
local_files, multi_file_items = _find_files_to_upload_merging(
search_files, files_to_exclude, fps, item_merge_mode
)

if not uploading_files:
raise ValueError(
"No files to upload, check your path, exclusion filters and resume flag"
handler = UploadHandlerV2(self, local_files, multi_file_items)
else:
local_files = _find_files_to_upload_no_merging(
search_files,
files_to_exclude,
path,
fps,
as_frames,
extract_views,
preserve_folders,
local_files,
)
handler = UploadHandlerV2(self, local_files)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I usually avoid to have the same variable name local_files as input and output of a function, easy to make a mistake afterward by assuming the content. And it helps me to reason thinking that data is immutable.
  • _find_files_to_upload_no_merging takes local_files as input whilst _find_files_to_upload_merging does not. Reading from outside it smells a bit. I would expects (without knowing much) that those 2 functions take the same args but return different files depending on the merging. local_files = [item for item in files_to_upload if isinstance(item, LocalFile)] is only used by this function so it might as well be inside the function?

Copy link
Collaborator Author

@JBWilkie JBWilkie Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good feedback. I will turn these into pure functions, you're right that this is a little messy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only point I'd add here is that I don't think it makes sense for both functions to take exactly the same arguments because preserve_folders, extract_views, and as_frames are unsupported by multi-file push so shouldn't be passed to _find_files_to_upload_merging


handler = UploadHandlerV2(self, uploading_files)
if blocking:
handler.upload(
max_workers=max_workers,
Expand Down Expand Up @@ -842,3 +862,136 @@ def register_multi_slotted(
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results


def _find_files_to_upload_merging(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _find_files_to_upload_with_merging. I think it reads a bit better.
Looking at the function below this could also be _find_files_to_upload_as_multislot?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in comment below, there are better, more generic names I'll use

search_files: List[PathLike],
files_to_exclude: List[PathLike],
fps: int,
item_merge_mode: str,
) -> Tuple[List[LocalFile], List[MultiFileItem]]:
"""
Finds files to upload as either:
- Multi-slotted items
- Multi-channel items
- Single-slotted items containing multiple `.dcm` files

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this too specific? In case we'll add an additional modality we'll have to modify the doc as well. To be fair not a big deal, but it's easy to forget to updated the doc. We could say:
find files to upload accordingly to the item_merge_mode?


Does not search each directory recursively, only considers files in the first level of each directory.

Parameters
----------
search_files : List[PathLike]
List of directories to search for files.
files_to_exclude : List[PathLike]
List of files to exclude from the file scan.
item_merge_mode : str
Mode to merge the files in the folders. Valid options are: 'slots', 'series', 'channels'.
fps : int
When uploading video files, specify the framerate

Returns
-------
List[LocalFile]
List of `LocalFile` objects contained within each `MultiFileItem`
List[MultiFileItem]
List of `MultiFileItem` objects to be uploaded
"""
multi_file_items, local_files = [], []
for directory in search_files:
files_in_directory = list(
find_files(
[directory],
files_to_exclude=files_to_exclude,
recursive=False,
sort=True,
)
)
if not files_in_directory:
print(
f"Warning: There are no files in the first level of {directory}, skipping directory"
)
continue
multi_file_item = MultiFileItem(
Path(directory), files_in_directory, ItemMergeMode(item_merge_mode), fps
)
multi_file_items.append(multi_file_item)
local_files.extend(multi_file_item.files)

if not multi_file_items:
raise ValueError(
"No valid folders to upload after searching the passed directories for files"
)
return local_files, multi_file_items


def _find_files_to_upload_no_merging(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_find_files_to_upload_as_singleslot reads a bit better. Usually I avoid to define something by negating something else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming the pair of functions:

  • _find_files_to_upload_as_single_file_items
  • _find_files_to_upload_as_multi_file_items

This is generic enough that we can add modalities in the future

search_files: List[PathLike],
files_to_exclude: List[PathLike],
path: Optional[str],
fps: int,
as_frames: bool,
extract_views: bool,
preserve_folders: bool,
uploading_files: List[LocalFile],
) -> List[LocalFile]:
"""
Finds files to upload as single-slotted dataset items. Recursively searches the passed directories for files.

Parameters
----------
search_files : List[PathLike]
List of directories to search for files.
files_to_exclude : Optional[List[PathLike]]
List of files to exclude from the file scan.
path : Optional[str]
Path to store the files in.
fps: int
When uploading video files, specify the framerate.
as_frames: bool
When uploading video files, specify whether to upload as a list of frames.
extract_views: bool
When uploading volume files, specify whether to split into orthogonal views.
preserve_folders: bool
Specify whether or not to preserve folder paths when uploading.
uploading_files : List[LocalFile]
List of files to upload.

Returns
-------
List[LocalFile]
List of files to upload.
"""
generic_parameters_specified = (
path is not None or fps != 0 or as_frames is not False
)
if uploading_files and generic_parameters_specified:
raise ValueError("Cannot specify a path when uploading a LocalFile object.")

for found_file in find_files(search_files, files_to_exclude=files_to_exclude):
local_path = path
if preserve_folders:
source_files = [
source_file
for source_file in search_files
if is_relative_to(found_file, source_file)
]
if source_files:
local_path = str(
found_file.relative_to(source_files[0]).parent.as_posix()
)
Comment on lines +979 to +988

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time to interpret what's happening here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the old current push logic that I placed unchanged into a function to separate it from multi-file push. is_relative_to() is basically saying: Does the 2nd path sit somewhere inside the 1st? If not, we can't push it with preserve_folders because there would be no difference between the two paths (which is what preserve_folders represents in the dataset)

The reason this list comprehension is necessary is that it's possible for search_files to contain multiple directories, and we need to select the correct source_file directory to calculate the relative path from. For example:

If search_files contains two directories: dir1 and dir2. If found_file is dir2/subdir/file.png, the list comprehension ensures that the relative path is calculated with respect to dir2, not dir1

uploading_files.append(
LocalFile(
found_file,
fps=fps,
as_frames=as_frames,
extract_views=extract_views,
path=local_path,
)
)

if not uploading_files:
raise ValueError(
"No files to upload, check your path, exclusion filters and resume flag"
)

return uploading_files
Loading