Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added prefetch to export files in parallel #923

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

ilongin
Copy link
Contributor

@ilongin ilongin commented Feb 13, 2025

Adding num_workers argument to DataChain.to_storage(...) function to speedup exporting which was done one by one up until now.

Performance changes (tested on 200 images from s3://ldb-public/remote/data-lakes/dogs-and-cats/ and no cache):

Num Workers 1 2 5 (default) 10 15
Time 99s 52s 23s 13s 10.5s

@ilongin ilongin linked an issue Feb 13, 2025 that may be closed by this pull request
Copy link

codecov bot commented Feb 13, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 87.74%. Comparing base (491aab4) to head (5204fa2).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #923      +/-   ##
==========================================
+ Coverage   87.68%   87.74%   +0.06%     
==========================================
  Files         130      130              
  Lines       11714    11731      +17     
  Branches     1594     1594              
==========================================
+ Hits        10271    10293      +22     
+ Misses       1043     1039       -4     
+ Partials      400      399       -1     
Flag Coverage Δ
datachain 87.66% <100.00%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@skshetry
Copy link
Member

I like the simplicity of the implementation, @ilongin! However, prefetching uses a temporary cache when cache=False, so this solution might not be ideal in that case.

@ilongin
Copy link
Contributor Author

ilongin commented Feb 13, 2025

I like the simplicity of the implementation, @ilongin! However, prefetching uses a temporary cache when cache=False, so this solution might not be ideal in that case.

@skshetry can you explain why it's not ideal? I'm not sure I 100% understand the implication

@skshetry
Copy link
Member

skshetry commented Feb 13, 2025

I like the simplicity of the implementation, @ilongin! However, prefetching uses a temporary cache when cache=False, so this solution might not be ideal in that case.

@skshetry can you explain why it's not ideal? I'm not sure I 100% understand the implication

When prefetch>0 and cache=False, prefetching uses a temporary cache.

if prefetch and not use_cache:
return temporary_cache(tmp_dir, prefix="prefetch-")

So, all the files are prefetched to a temporary location in the background. So what this PR does in that case is that it saves files to a temporary location first before "exporting" it back out from that cache. We set caching_enabled=True so all the file operations will also try to use the cache. This is not ideal for performance.

self._set_stream(
self._catalog, caching_enabled=True, download_cb=DEFAULT_CALLBACK
)

We also remove prefetched items after we run the mapper function too. And we remove the cache directory later. This change, as a result, also breaks the export of symlink link type.

try:
catalog.cache.remove(obj)
except Exception as e: # noqa: BLE001
print(f"Failed to remove prefetched item {obj.name!r}: {e!s}")

Also, by "all the files are prefetched" above, I mean we prefetch all the File signals, which might also be suboptimal.

for obj in row:
if isinstance(obj, File) and await obj._prefetch(download_cb):
after_prefetch()

Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good to me, couple comments below about naming/docstrings.
Not approving yet since @skshetry have a reasonable comment above.

@@ -2451,6 +2451,7 @@ def export_files(
placement: FileExportPlacement = "fullpath",
use_cache: bool = True,
link_type: Literal["copy", "symlink"] = "copy",
prefetch: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure prefetch is a good param name in export_files function. "Prefetch" is usually about loading data, not saving. I know it was named to be the same as in "from_storage" method, but still, should we discuss naming here? IMO something like "parallel" or "concurrent" will be a better option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to num_workers as it's the most correct name I think

Comment on lines +2466 to +2468
prefetch: number of workers to use for downloading files in advance.
This is enabled by default and uses 2 workers.
To disable prefetching, set it to 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of workers to use for downloading files in advance

Mistype here? We are exporting files, not downloading 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link

cloudflare-workers-and-pages bot commented Feb 17, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 5204fa2
Status:🚫  Build failed.

View logs

@ilongin ilongin force-pushed the ilongin/882-export-files-in-parallel branch from a92a64e to 9d301a8 Compare February 17, 2025 14:15
@ilongin ilongin requested a review from dreadatour February 17, 2025 14:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make export files async and parallel.
3 participants