From 13d39b3e214eb17a4741a3d9b1235703adf83e17 Mon Sep 17 00:00:00 2001 From: EdwardLi-coder <2023edwardll@gmail.com> Date: Sat, 7 Sep 2024 08:40:11 +0800 Subject: [PATCH 1/4] add resolve --- src/datachain/lib/file.py | 71 ++++++++++++++++++++++++++++++++++++- tests/unit/lib/test_file.py | 58 +++++++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 2 deletions(-) diff --git a/src/datachain/lib/file.py b/src/datachain/lib/file.py index 973a82aea..1a4ec3465 100644 --- a/src/datachain/lib/file.py +++ b/src/datachain/lib/file.py @@ -1,10 +1,11 @@ import io import json +import logging import os import posixpath from abc import ABC, abstractmethod from contextlib import contextmanager -from datetime import datetime +from datetime import datetime, timezone from io import BytesIO from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING, Any, ClassVar, Literal, Optional, Union @@ -25,6 +26,8 @@ if TYPE_CHECKING: from datachain.catalog import Catalog +logger = logging.getLogger("datachain") + # how to create file path when exporting ExportPlacement = Literal["filename", "etag", "fullpath", "checksum"] @@ -313,6 +316,72 @@ def get_fs(self): """Returns `fsspec` filesystem for the file.""" return self._catalog.get_client(self.source).fs + def resolve(self) -> "File": + """ + Resolve a File object by checking its existence and updating its metadata. + + Returns: + File: The resolved File object with updated metadata. + """ + if self._catalog is None: + raise RuntimeError("Cannot resolve file: catalog is not set") + + try: + client = self._catalog.get_client(self.source) + except NotImplementedError as e: + raise RuntimeError( + f"Unsupported protocol for file source: {self.source}" + ) from e + + try: + info = client.fs.info(client.get_full_path(self.path)) + converted_info = client.convert_info(info, self.source) + return type(self)( + path=self.path, + source=self.source, + size=getattr(converted_info, "size", 0), + etag=getattr(converted_info, "etag", ""), + version=getattr(converted_info, "version", None) or "", + is_latest=getattr(converted_info, "is_latest", True), + last_modified=getattr( + converted_info, "last_modified", datetime.now(timezone.utc) + ), + location=self.location, + ) + except (FileNotFoundError, PermissionError, OSError) as e: + logger.warning("File system error when resolving %s: %s", self.path, str(e)) + + return type(self)( + path=self.path, + source=self.source, + size=0, + etag="", + version="", + is_latest=True, + last_modified=TIME_ZERO, + location=self.location, + ) + + +def resolve(file: File) -> File: + """ + Resolve a File object by checking its existence and updating its metadata. + + This function is a wrapper around the File.resolve() method, designed to be + used as a mapper in DataChain operations. + + Args: + file (File): The File object to resolve. + + Returns: + File: The resolved File object with updated metadata. + + Raises: + RuntimeError: If the file's catalog is not set or if + the file source protocol is unsupported. + """ + return file.resolve() + class TextFile(File): """`DataModel` for reading text files.""" diff --git a/tests/unit/lib/test_file.py b/tests/unit/lib/test_file.py index 2a847e769..38d177b04 100644 --- a/tests/unit/lib/test_file.py +++ b/tests/unit/lib/test_file.py @@ -1,12 +1,15 @@ import json +from unittest.mock import Mock import pytest from fsspec.implementations.local import LocalFileSystem from PIL import Image +from datachain import DataChain from datachain.cache import UniqueId from datachain.catalog import Catalog -from datachain.lib.file import File, ImageFile, TextFile +from datachain.lib.file import File, ImageFile, TextFile, resolve +from datachain.utils import TIME_ZERO def create_file(source: str): @@ -319,3 +322,56 @@ def test_read_text(tmp_path, catalog): file = File(path=file_name, source=f"file://{tmp_path}") file._set_stream(catalog, True) assert file.read_text() == data + + +def test_resolve_file(cloud_test_catalog): + ctc = cloud_test_catalog + + dc = DataChain.from_storage(ctc.src_uri, session=ctc.session) + for orig_file in dc.collect("file"): + resolved_file = File(source=orig_file.source, path=orig_file.path) + resolved_file._catalog = ctc.catalog + assert orig_file == resolved_file.resolve() + + +def test_resolve_file_no_exist(cloud_test_catalog): + ctc = cloud_test_catalog + + non_existent_file = File(source=ctc.src_uri, path="non_existent_file.txt") + non_existent_file._catalog = ctc.catalog + resolved_non_existent = non_existent_file.resolve() + assert resolved_non_existent.size == 0 + assert resolved_non_existent.etag == "" + assert resolved_non_existent.last_modified == TIME_ZERO + + +def test_resolve_unsupported_protocol(): + mock_catalog = Mock() + mock_catalog.get_client.side_effect = NotImplementedError("Unsupported protocol") + + file = File(source="unsupported://example.com", path="test.txt") + file._catalog = mock_catalog + + with pytest.raises(RuntimeError) as exc_info: + file.resolve() + + assert ( + str(exc_info.value) + == "Unsupported protocol for file source: unsupported://example.com" + ) + + +def test_file_resolve_no_catalog(): + file = File(path="test.txt", source="s3://mybucket") + with pytest.raises(RuntimeError, match="Cannot resolve file: catalog is not set"): + file.resolve() + + +def test_resolve_function(): + mock_file = Mock(spec=File) + mock_file.resolve.return_value = "resolved_file" + + result = resolve(mock_file) + + assert result == "resolved_file" + mock_file.resolve.assert_called_once() From 8c949172f4ecb1822de3704fd38ef0562cf9315a Mon Sep 17 00:00:00 2001 From: EdwardLi-coder <2023edwardll@gmail.com> Date: Thu, 12 Sep 2024 09:54:29 +0800 Subject: [PATCH 2/4] update to info_to_file --- src/datachain/client/s3.py | 2 +- src/datachain/lib/file.py | 20 +++++++++----------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/datachain/client/s3.py b/src/datachain/client/s3.py index 37de24442..4e55e97b9 100644 --- a/src/datachain/client/s3.py +++ b/src/datachain/client/s3.py @@ -160,7 +160,7 @@ def info_to_file(self, v: dict[str, Any], path: str) -> File: source=self.uri, path=path, size=v["size"], - version=ClientS3.clean_s3_version(v.get("VersionId", "")), + version=ClientS3.clean_s3_version(v.get("VersionId", "")) or "", etag=v.get("ETag", "").strip('"'), is_latest=v.get("IsLatest", True), last_modified=v.get("LastModified", ""), diff --git a/src/datachain/lib/file.py b/src/datachain/lib/file.py index 1a4ec3465..4317dad0f 100644 --- a/src/datachain/lib/file.py +++ b/src/datachain/lib/file.py @@ -5,10 +5,10 @@ import posixpath from abc import ABC, abstractmethod from contextlib import contextmanager -from datetime import datetime, timezone +from datetime import datetime from io import BytesIO from pathlib import Path, PurePosixPath -from typing import TYPE_CHECKING, Any, ClassVar, Literal, Optional, Union +from typing import TYPE_CHECKING, Any, ClassVar, Literal, Optional, Self, Union from urllib.parse import unquote, urlparse from urllib.request import url2pathname @@ -316,7 +316,7 @@ def get_fs(self): """Returns `fsspec` filesystem for the file.""" return self._catalog.get_client(self.source).fs - def resolve(self) -> "File": + def resolve(self) -> "Self": """ Resolve a File object by checking its existence and updating its metadata. @@ -335,17 +335,15 @@ def resolve(self) -> "File": try: info = client.fs.info(client.get_full_path(self.path)) - converted_info = client.convert_info(info, self.source) + converted_info = client.info_to_file(info, self.source) return type(self)( path=self.path, source=self.source, - size=getattr(converted_info, "size", 0), - etag=getattr(converted_info, "etag", ""), - version=getattr(converted_info, "version", None) or "", - is_latest=getattr(converted_info, "is_latest", True), - last_modified=getattr( - converted_info, "last_modified", datetime.now(timezone.utc) - ), + size=converted_info.size, + etag=converted_info.etag, + version=converted_info.version or "", + is_latest=converted_info.is_latest, + last_modified=converted_info.last_modified, location=self.location, ) except (FileNotFoundError, PermissionError, OSError) as e: From 43c2f8d984127d4ad8e3a46f936f69a3391a7968 Mon Sep 17 00:00:00 2001 From: EdwardLi-coder <2023edwardll@gmail.com> Date: Thu, 12 Sep 2024 16:48:43 +0800 Subject: [PATCH 3/4] update typing to typing_extensions --- src/datachain/lib/file.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/datachain/lib/file.py b/src/datachain/lib/file.py index 4317dad0f..b29a4ccd4 100644 --- a/src/datachain/lib/file.py +++ b/src/datachain/lib/file.py @@ -8,7 +8,7 @@ from datetime import datetime from io import BytesIO from pathlib import Path, PurePosixPath -from typing import TYPE_CHECKING, Any, ClassVar, Literal, Optional, Self, Union +from typing import TYPE_CHECKING, Any, ClassVar, Literal, Optional, Union from urllib.parse import unquote, urlparse from urllib.request import url2pathname @@ -16,6 +16,9 @@ from PIL import Image from pydantic import Field, field_validator +if TYPE_CHECKING: + from typing_extensions import Self + from datachain.cache import UniqueId from datachain.client.fileslice import FileSlice from datachain.lib.data_model import DataModel From 3cc64c634a2072d24a6d6224d827c1b4098493af Mon Sep 17 00:00:00 2001 From: EdwardLi-coder <2023edwardll@gmail.com> Date: Thu, 12 Sep 2024 23:12:48 +0800 Subject: [PATCH 4/4] remove or "" --- src/datachain/client/s3.py | 2 +- src/datachain/lib/file.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datachain/client/s3.py b/src/datachain/client/s3.py index 4e55e97b9..37de24442 100644 --- a/src/datachain/client/s3.py +++ b/src/datachain/client/s3.py @@ -160,7 +160,7 @@ def info_to_file(self, v: dict[str, Any], path: str) -> File: source=self.uri, path=path, size=v["size"], - version=ClientS3.clean_s3_version(v.get("VersionId", "")) or "", + version=ClientS3.clean_s3_version(v.get("VersionId", "")), etag=v.get("ETag", "").strip('"'), is_latest=v.get("IsLatest", True), last_modified=v.get("LastModified", ""), diff --git a/src/datachain/lib/file.py b/src/datachain/lib/file.py index b29a4ccd4..4818bf439 100644 --- a/src/datachain/lib/file.py +++ b/src/datachain/lib/file.py @@ -344,7 +344,7 @@ def resolve(self) -> "Self": source=self.source, size=converted_info.size, etag=converted_info.etag, - version=converted_info.version or "", + version=converted_info.version, is_latest=converted_info.is_latest, last_modified=converted_info.last_modified, location=self.location,