diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 201e8386..f9c9bb2b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,11 +3,11 @@ default_language_version: exclude: ^upath/tests/pathlib/test_pathlib.*\.py|^upath/tests/pathlib/_test_support\.py repos: - repo: https://github.com/psf/black - rev: 23.9.1 + rev: 24.1.1 hooks: - id: black - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: check-added-large-files - id: check-case-conflict @@ -25,30 +25,30 @@ repos: - id: sort-simple-yaml - id: trailing-whitespace - repo: https://github.com/codespell-project/codespell - rev: v2.2.5 + rev: v2.2.6 hooks: - id: codespell additional_dependencies: ["tomli"] - repo: https://github.com/asottile/pyupgrade - rev: v3.13.0 + rev: v3.15.0 hooks: - id: pyupgrade args: [--py38-plus] - repo: https://github.com/PyCQA/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort - repo: https://github.com/pycqa/flake8 - rev: 6.1.0 + rev: 7.0.0 hooks: - id: flake8 additional_dependencies: - - flake8-bugbear==23.1.20 - - flake8-comprehensions==3.10.1 + - flake8-bugbear==24.1.17 + - flake8-comprehensions==3.14.0 - flake8-debugger==4.1.2 - flake8-string-format==0.3.0 - repo: https://github.com/pycqa/bandit - rev: 1.7.5 + rev: 1.7.7 hooks: - id: bandit args: [-c, pyproject.toml] diff --git a/README.md b/README.md index 0ab6c5fd..955f9c74 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ For more examples, see the [example notebook here](notebooks/examples.ipynb) ### Currently supported filesystems (and schemes) -- `file:` Local filessystem +- `file:` Local filesystem - `memory:` Ephemeral filesystem in RAM - `az:`, `adl:`, `abfs:` and `abfss:` Azure Storage (requires `adlfs` to be installed) - `http:` and `https:` HTTP(S)-based filesystem @@ -99,7 +99,7 @@ flowchart TB U(UPath) UP(PosixUPath) UW(WindowsUPath) - UL(LocalPath) + UL(FilePath) US3(S3Path) UH(HttpPath) UO(...Path) @@ -168,13 +168,13 @@ register_implementation(my_protocol, MyPath) #### Registration via entry points -```toml +``` # pyproject.toml [project.entry-points."unversal_pathlib.implementations"] myproto = "my_module.submodule:MyPath" ``` -```ini +``` # setup.cfg [options.entry_points] universal_pathlib.implementations = diff --git a/noxfile.py b/noxfile.py index 65f41eb2..53a4eb59 100644 --- a/noxfile.py +++ b/noxfile.py @@ -1,4 +1,5 @@ """Automation using nox.""" + import glob import os @@ -46,8 +47,6 @@ def lint(session: nox.Session) -> None: args = *(session.posargs or ("--show-diff-on-failure",)), "--all-files" session.run("pre-commit", "run", *args) - # session.run("python", "-m", "mypy") - # session.run("python", "-m", "pylint", *locations) @nox.session diff --git a/pyproject.toml b/pyproject.toml index ed54d8a6..efd59939 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,7 @@ module = "fsspec.*" ignore_missing_imports = true [[tool.mypy.overrides]] -module = "webdav4.fsspec.*" +module = "webdav4.*" ignore_missing_imports = true [tool.pylint.format] diff --git a/upath/__init__.py b/upath/__init__.py index f5ec5279..1cd4a44f 100644 --- a/upath/__init__.py +++ b/upath/__init__.py @@ -1,4 +1,5 @@ """Pathlib API extended to use fsspec backends.""" + import sys try: @@ -6,14 +7,6 @@ except ImportError: __version__ = "not-installed" -if sys.version_info >= (3, 12): - import upath.core312plus as core - - sys.modules["upath.core"] = core - -else: - import upath.core as core - -UPath = core.UPath +from upath.core import UPath __all__ = ["UPath"] diff --git a/upath/_compat.py b/upath/_compat.py new file mode 100644 index 00000000..d80a0f0b --- /dev/null +++ b/upath/_compat.py @@ -0,0 +1,486 @@ +from __future__ import annotations + +import ntpath +import os +import posixpath +import sys +import warnings +from collections.abc import Sequence +from pathlib import Path +from pathlib import PurePath +from typing import TYPE_CHECKING +from typing import Any +from urllib.parse import SplitResult + +from fsspec import get_filesystem_class + +if TYPE_CHECKING: + from upath import UPath + +__all__ = [ + "PathlibPathShim", + "str_remove_prefix", + "str_remove_suffix", + "FSSpecAccessorShim", +] + + +if sys.version_info >= (3, 12): # noqa: C901 + + class PathlibPathShim: + """no need to shim pathlib.Path in Python 3.12+""" + + __slots__ = () + __missing_py312_slots__ = () + + def __init__(self, *args): + super().__init__(*args) + +else: + + def _get_missing_py312_pathlib_slots(): + """Return a tuple of slots that are present in Python 3.12's + pathlib.Path but not in the current version of pathlib.Path + """ + py312_slots = ( + "_raw_paths", + "_drv", + "_root", + "_tail_cached", + "_str", + "_str_normcase_cached", + "_parts_normcase_cached", + "_lines_cached", + "_hash", + ) + current_slots = [ + slot for cls in Path.__mro__ for slot in getattr(cls, "__slots__", []) + ] + return tuple([slot for slot in py312_slots if slot not in current_slots]) + + class PathlibPathShim: + """A compatibility shim for python < 3.12 + + Basically vendoring the functionality of pathlib.Path from Python 3.12 + that's not overwritten in upath.core.UPath + + """ + + __slots__ = () + __missing_py312_slots__ = _get_missing_py312_pathlib_slots() + + def __init__(self, *args): + paths = [] + for arg in args: + if isinstance(arg, PurePath) and hasattr(arg, "_raw_paths"): + if arg._flavour is ntpath and self._flavour is posixpath: + # GH-103631: Convert separators for backwards compatibility. + paths.extend(path.replace("\\", "/") for path in arg._raw_paths) + else: + paths.extend(arg._raw_paths) + else: + try: + path = os.fspath(arg) + except TypeError: + path = arg + if not isinstance(path, str): + raise TypeError( + "argument should be a str or an os.PathLike " + "object where __fspath__ returns a str, " + f"not {type(path).__name__!r}" + ) + paths.append(path) + self._raw_paths = paths + + @classmethod + def _parse_path(cls, path): + if not path: + return "", "", [] + sep = cls._flavour.sep + altsep = cls._flavour.altsep + if altsep: + path = path.replace(altsep, sep) + drv, root, rel = cls._flavour.splitroot(path) + if not root and drv.startswith(sep) and not drv.endswith(sep): + drv_parts = drv.split(sep) + if len(drv_parts) == 4 and drv_parts[2] not in "?.": + # e.g. //server/share + root = sep + elif len(drv_parts) == 6: + # e.g. //?/unc/server/share + root = sep + parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] + return drv, root, parsed + + def _load_parts(self): + paths = self._raw_paths + if len(paths) == 0: + path = "" + elif len(paths) == 1: + path = paths[0] + else: + path = self._flavour.join(*paths) + drv, root, tail = self._parse_path(path) + self._drv = drv + self._root = root + self._tail_cached = tail + + def _from_parsed_parts(self, drv, root, tail): + path_str = self._format_parsed_parts(drv, root, tail) + path = self.with_segments(path_str) + path._str = path_str or "." + path._drv = drv + path._root = root + path._tail_cached = tail + return path + + @classmethod + def _format_parsed_parts(cls, drv, root, tail): + if drv or root: + return drv + root + cls._flavour.sep.join(tail) + elif tail and cls._flavour.splitdrive(tail[0])[0]: + tail = ["."] + tail + return cls._flavour.sep.join(tail) + + def __str__(self): + try: + return self._str + except AttributeError: + self._str = ( + self._format_parsed_parts(self.drive, self.root, self._tail) or "." + ) + return self._str + + @property + def drive(self): + try: + return self._drv + except AttributeError: + self._load_parts() + return self._drv + + @property + def root(self): + try: + return self._root + except AttributeError: + self._load_parts() + return self._root + + @property + def _tail(self): + try: + return self._tail_cached + except AttributeError: + self._load_parts() + return self._tail_cached + + @property + def anchor(self): + anchor = self.drive + self.root + return anchor + + @property + def name(self): + tail = self._tail + if not tail: + return "" + return tail[-1] + + @property + def suffix(self): + name = self.name + i = name.rfind(".") + if 0 < i < len(name) - 1: + return name[i:] + else: + return "" + + @property + def suffixes(self): + name = self.name + if name.endswith("."): + return [] + name = name.lstrip(".") + return ["." + suffix for suffix in name.split(".")[1:]] + + @property + def stem(self): + name = self.name + i = name.rfind(".") + if 0 < i < len(name) - 1: + return name[:i] + else: + return name + + def with_name(self, name): + if not self.name: + raise ValueError(f"{self!r} has an empty name") + f = self._flavour + if ( + not name + or f.sep in name + or (f.altsep and f.altsep in name) + or name == "." + ): + raise ValueError("Invalid name %r" % (name)) + return self._from_parsed_parts( + self.drive, self.root, self._tail[:-1] + [name] + ) + + def with_stem(self, stem): + return self.with_name(stem + self.suffix) + + def with_suffix(self, suffix): + f = self._flavour + if f.sep in suffix or f.altsep and f.altsep in suffix: + raise ValueError(f"Invalid suffix {suffix!r}") + if suffix and not suffix.startswith(".") or suffix == ".": + raise ValueError("Invalid suffix %r" % (suffix)) + name = self.name + if not name: + raise ValueError(f"{self!r} has an empty name") + old_suffix = self.suffix + if not old_suffix: + name = name + suffix + else: + name = name[: -len(old_suffix)] + suffix + return self._from_parsed_parts( + self.drive, self.root, self._tail[:-1] + [name] + ) + + def relative_to(self, other, /, *_deprecated, walk_up=False): + if _deprecated: + msg = ( + "support for supplying more than one positional argument " + "to pathlib.PurePath.relative_to() is deprecated and " + "scheduled for removal in Python 3.14" + ) + warnings.warn( + f"pathlib.PurePath.relative_to(*args) {msg}", + DeprecationWarning, + stacklevel=2, + ) + other = self.with_segments(other, *_deprecated) + for step, path in enumerate([other] + list(other.parents)): # noqa: B007 + if self.is_relative_to(path): + break + elif not walk_up: + raise ValueError( + f"{str(self)!r} is not in the subpath of {str(other)!r}" + ) + elif path.name == "..": + raise ValueError(f"'..' segment in {str(other)!r} cannot be walked") + else: + raise ValueError( + f"{str(self)!r} and {str(other)!r} have different anchors" + ) + parts = [".."] * step + self._tail[len(path._tail) :] + return self.with_segments(*parts) + + def is_relative_to(self, other, /, *_deprecated): + if _deprecated: + msg = ( + "support for supplying more than one argument to " + "pathlib.PurePath.is_relative_to() is deprecated and " + "scheduled for removal in Python 3.14" + ) + warnings.warn( + f"pathlib.PurePath.is_relative_to(*args) {msg}", + DeprecationWarning, + stacklevel=2, + ) + other = self.with_segments(other, *_deprecated) + return other == self or other in self.parents + + @property + def parts(self): + if self.drive or self.root: + return (self.drive + self.root,) + tuple(self._tail) + else: + return tuple(self._tail) + + def joinpath(self, *pathsegments): + return self.with_segments(self, *pathsegments) + + def __truediv__(self, key): + try: + return self.joinpath(key) + except TypeError: + return NotImplemented + + def __rtruediv__(self, key): + try: + return self.with_segments(key, self) + except TypeError: + return NotImplemented + + @property + def parent(self): + drv = self.drive + root = self.root + tail = self._tail + if not tail: + return self + return self._from_parsed_parts(drv, root, tail[:-1]) + + @property + def parents(self): + return _PathParents(self) + + def _make_child_relpath(self, name): + path_str = str(self) + tail = self._tail + if tail: + path_str = f"{path_str}{self._flavour.sep}{name}" + elif path_str != ".": + path_str = f"{path_str}{name}" + else: + path_str = name + path = self.with_segments(path_str) + path._str = path_str + path._drv = self.drive + path._root = self.root + path._tail_cached = tail + [name] + return path + + def lchmod(self, mode): + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + self.chmod(mode, follow_symlinks=False) + + class _PathParents(Sequence): + __slots__ = ("_path", "_drv", "_root", "_tail") + + def __init__(self, path): + self._path = path + self._drv = path.drive + self._root = path.root + self._tail = path._tail + + def __len__(self): + return len(self._tail) + + def __getitem__(self, idx): + if isinstance(idx, slice): + return tuple(self[i] for i in range(*idx.indices(len(self)))) + + if idx >= len(self) or idx < -len(self): + raise IndexError(idx) + if idx < 0: + idx += len(self) + return self._path._from_parsed_parts( + self._drv, self._root, self._tail[: -idx - 1] + ) + + def __repr__(self): + return f"<{type(self._path).__name__}.parents>" + + +if sys.version_info >= (3, 9): + str_remove_suffix = str.removesuffix + str_remove_prefix = str.removeprefix + +else: + + def str_remove_suffix(s: str, suffix: str) -> str: + if s.endswith(suffix): + return s[: -len(suffix)] + else: + return s + + def str_remove_prefix(s: str, prefix: str) -> str: + if s.startswith(prefix): + return s[len(prefix) :] + else: + return s + + +class FSSpecAccessorShim: + """this is a compatibility shim and will be removed""" + + def __init__(self, parsed_url: SplitResult | None, **kwargs: Any) -> None: + if parsed_url and parsed_url.scheme: + cls = get_filesystem_class(parsed_url.scheme) + url_kwargs = cls._get_kwargs_from_urls(parsed_url.geturl()) + else: + cls = get_filesystem_class(None) + url_kwargs = {} + url_kwargs.update(kwargs) + self._fs = cls(**url_kwargs) + + def __init_subclass__(cls, **kwargs): + warnings.warn( + "All _FSSpecAccessor subclasses have been deprecated. " + " Please follow the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + + @classmethod + def from_path(cls, path: UPath) -> FSSpecAccessorShim: + """internal accessor for backwards compatibility""" + url = path._url._replace(scheme=path.protocol) + obj = cls(url, **path.storage_options) + obj.__dict__["_fs"] = path.fs + return obj + + def _format_path(self, path: UPath) -> str: + return path.path + + def open(self, path, mode="r", *args, **kwargs): + return path.fs.open(self._format_path(path), mode, *args, **kwargs) + + def stat(self, path, **kwargs): + return path.fs.stat(self._format_path(path), **kwargs) + + def listdir(self, path, **kwargs): + p_fmt = self._format_path(path) + contents = path.fs.listdir(p_fmt, **kwargs) + if len(contents) == 0 and not path.fs.isdir(p_fmt): + raise NotADirectoryError(str(self)) + elif ( + len(contents) == 1 + and contents[0]["name"] == p_fmt + and contents[0]["type"] == "file" + ): + raise NotADirectoryError(str(self)) + return contents + + def glob(self, _path, path_pattern, **kwargs): + return _path.fs.glob(self._format_path(path_pattern), **kwargs) + + def exists(self, path, **kwargs): + return path.fs.exists(self._format_path(path), **kwargs) + + def info(self, path, **kwargs): + return path.fs.info(self._format_path(path), **kwargs) + + def rm(self, path, recursive, **kwargs): + return path.fs.rm(self._format_path(path), recursive=recursive, **kwargs) + + def mkdir(self, path, create_parents=True, **kwargs): + return path.fs.mkdir( + self._format_path(path), create_parents=create_parents, **kwargs + ) + + def makedirs(self, path, exist_ok=False, **kwargs): + return path.fs.makedirs(self._format_path(path), exist_ok=exist_ok, **kwargs) + + def touch(self, path, **kwargs): + return path.fs.touch(self._format_path(path), **kwargs) + + def mv(self, path, target, recursive=False, maxdepth=None, **kwargs): + if hasattr(target, "_accessor"): + target = target._accessor._format_path(target) + return path.fs.mv( + self._format_path(path), + target, + recursive=recursive, + maxdepth=maxdepth, + **kwargs, + ) diff --git a/upath/_flavour.py b/upath/_flavour.py new file mode 100644 index 00000000..3b64e0fb --- /dev/null +++ b/upath/_flavour.py @@ -0,0 +1,301 @@ +from __future__ import annotations + +import ntpath +import os.path +import posixpath +import sys +import warnings +from functools import lru_cache +from functools import wraps +from typing import Any +from typing import Callable +from typing import Iterable +from typing import Union +from urllib.parse import urlsplit + +if sys.version_info >= (3, 12): + from typing import TypeAlias +else: + TypeAlias = Any + +from upath._compat import str_remove_prefix +from upath._compat import str_remove_suffix +from upath._protocol import get_upath_protocol +from upath._protocol import strip_upath_protocol + +PathOrStr: TypeAlias = Union[str, "os.PathLike[str]"] + +__all__ = [ + "FSSpecFlavour", +] + + +def _deprecated(func): + if sys.version_info >= (3, 12): + + @wraps(func) + def wrapper(*args, **kwargs): + warnings.warn( + f"{func.__name__} is deprecated on py3.12", + DeprecationWarning, + stacklevel=2, + ) + return func(*args, **kwargs) + + return wrapper + else: + return func + + +class FSSpecFlavour: + """fsspec flavour for universal_pathlib + + **INTERNAL AND VERY MUCH EXPERIMENTAL** + + Implements the fsspec compatible low-level lexical operations on + PurePathBase-like objects. + + Note: + In case you find yourself in need of subclassing FSSpecFlavour, + please open an issue in the universal_pathlib issue tracker: + https://github.com/fsspec/universal_pathlib/issues + Ideally we can find a way to make your use-case work by adding + more functionality to this class. + + """ + + def __init__( + self, + *, + # URI behavior + join_prepends_protocol: bool = False, + join_like_urljoin: bool = False, + supports_empty_parts: bool = False, + supports_netloc: bool = False, + supports_query_parameters: bool = False, + supports_fragments: bool = False, + posixpath_only: bool = True, + # configurable separators + sep: str = "/", + altsep: str | None = None, + ): + self._owner = None + # separators + self.sep = sep + self.altsep = altsep + # configuration + self.join_prepends_protocol = join_prepends_protocol + self.join_like_urljoin = join_like_urljoin + self.supports_empty_parts = supports_empty_parts + self.supports_netloc = supports_netloc + self.supports_query_parameters = supports_query_parameters + self.supports_fragments = supports_fragments + self.posixpath_only = posixpath_only + + def __set_name__(self, owner, name): + # helper to provide a more informative repr + self._owner = owner.__name__ + + def _asdict(self) -> dict[str, Any]: + """return a dict representation of the flavour's settings""" + dct = vars(self).copy() + dct.pop("_owner") + return dct + + def __repr__(self): + return f"<{__name__}.{type(self).__name__} of {self._owner}>" + + def join(self, __path: PathOrStr, *paths: PathOrStr) -> str: + """Join two or more path components, inserting '/' as needed.""" + + # [py38-py312] _flavour.join is Callable[[list[str]], str] + if isinstance(__path, (list, tuple)) and not paths: + if not __path: + return "" + __path, *paths = __path # type: ignore + + _path0: str = strip_upath_protocol(__path) + _paths: Iterable[str] = map(strip_upath_protocol, paths) + + if self.join_like_urljoin: + pth = str_remove_suffix(str(_path0), "/") + sep = self.sep + for b in _paths: + if b.startswith(sep): + pth = b + elif not pth: + pth += b + else: + pth += sep + b + joined = pth + elif self.posixpath_only: + joined = posixpath.join(_path0, *_paths) + else: + joined = os.path.join(_path0, *_paths) + + if self.join_prepends_protocol and (protocol := get_upath_protocol(__path)): + joined = f"{protocol}://{joined}" + + return joined + + def splitroot(self, __path: PathOrStr) -> tuple[str, str, str]: + """Split a path in the drive, the root and the rest.""" + if self.supports_fragments or self.supports_query_parameters: + url = urlsplit(str(__path)) + drive = url._replace(path="", query="", fragment="").geturl() + path = url._replace(scheme="", netloc="").geturl() + # root = "/" if path.startswith("/") else "" + root = "/" # emulate upath.core.UPath < 3.12 behaviour + return drive, root, str_remove_prefix(path, "/") + + if self.supports_netloc: + path = strip_upath_protocol(__path, allow_unknown=True) + protocol = get_upath_protocol(__path) + if protocol: + drive, root, tail = path.partition("/") + return drive, root or "/", tail + else: + return "", "", path + + elif self.posixpath_only: + path = strip_upath_protocol(__path, allow_unknown=True) + return _get_splitroot(posixpath)(path) + + else: + path = strip_upath_protocol(__path, allow_unknown=True) + drv, root, path = _get_splitroot(os.path)(path) + if os.name == "nt" and not drv: + drv = "C:" + return drv, root, path + + def splitdrive(self, __path: PathOrStr) -> tuple[str, str]: + """Split a path into drive and path.""" + if self.supports_fragments or self.supports_query_parameters: + path = strip_upath_protocol(__path) + url = urlsplit(path) + path = url._replace(scheme="", netloc="").geturl() + drive = url._replace(path="", query="", fragment="").geturl() + return drive, path + + path = strip_upath_protocol(__path) + if self.supports_netloc: + protocol = get_upath_protocol(__path) + if protocol: + drive, root, tail = path.partition("/") + return drive, f"{root}{tail}" + else: + return "", path + elif self.posixpath_only: + return posixpath.splitdrive(path) + else: + drv, path = os.path.splitdrive(path) + if os.name == "nt" and not drv: + drv = "C:" + return drv, path + + def normcase(self, __path: PathOrStr) -> str: + """Normalize case of pathname. Has no effect under Posix""" + if self.posixpath_only: + return posixpath.normcase(__path) + else: + return os.path.normcase(__path) + + @_deprecated + def parse_parts(self, parts): + parsed = [] + sep = self.sep + drv = root = "" + it = reversed(parts) + for part in it: + if part: + drv, root, rel = self.splitroot(part) + if not root or root and rel: + for x in reversed(rel.split(sep)): + parsed.append(sys.intern(x)) + + if drv or root: + parsed.append(drv + root) + parsed.reverse() + return drv, root, parsed + + @_deprecated + def join_parsed_parts(self, drv, root, parts, drv2, root2, parts2): + """ + Join the two paths represented by the respective + (drive, root, parts) tuples. Return a new (drive, root, parts) tuple. + """ + if root2: + if not drv2 and drv: + return drv, root2, [drv + root2] + parts2[1:] + elif drv2: + if drv2 == drv or self.casefold(drv2) == self.casefold(drv): + # Same drive => second path is relative to the first + return drv, root, parts + parts2[1:] + else: + # Second path is non-anchored (common case) + return drv, root, parts + parts2 + return drv2, root2, parts2 + + @_deprecated + def casefold(self, s: str) -> str: + """Casefold the string s.""" + if self.posixpath_only or os.name != "nt": + return s + else: + return s.lower() + + +@lru_cache +def _get_splitroot(mod) -> Callable[[PathOrStr], tuple[str, str, str]]: + """return the splitroot function from the given module""" + if hasattr(mod, "splitroot"): + return mod.splitroot + + elif mod is posixpath: + + def splitroot(p): + p = os.fspath(p) + sep = "/" + empty = "" + if p[:1] != sep: + return empty, empty, p + elif p[1:2] != sep or p[2:3] == sep: + return empty, sep, p[1:] + else: + return empty, p[:2], p[2:] + + return splitroot + + elif mod is ntpath: + + def splitroot(p): + p = os.fspath(p) + sep = "\\" + altsep = "/" + colon = ":" + unc_prefix = "\\\\?\\UNC\\" + empty = "" + normp = p.replace(altsep, sep) + if normp[:1] == sep: + if normp[1:2] == sep: + start = 8 if normp[:8].upper() == unc_prefix else 2 + index = normp.find(sep, start) + if index == -1: + return p, empty, empty + index2 = normp.find(sep, index + 1) + if index2 == -1: + return p, empty, empty + return p[:index2], p[index2 : index2 + 1], p[index2 + 1 :] + else: + return empty, p[:1], p[1:] + elif normp[1:2] == colon: + if normp[2:3] == sep: + return p[:2], p[2:3], p[3:] + else: + return p[:2], empty, p[2:] + else: + return empty, empty, p + + return splitroot + else: + raise NotImplementedError(f"unsupported module: {mod!r}") diff --git a/upath/_protocol.py b/upath/_protocol.py new file mode 100644 index 00000000..568dae04 --- /dev/null +++ b/upath/_protocol.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import os +import re +from pathlib import PurePath +from typing import Any + +from fsspec.core import strip_protocol as strip_fsspec_protocol +from fsspec.spec import AbstractFileSystem + +__all__ = [ + "get_upath_protocol", + "strip_upath_protocol", +] + +# Regular expression to match fsspec style protocols. +# Matches single slash usage too for compatibility. +_PROTOCOL_RE = re.compile( + r"^(?P[A-Za-z][A-Za-z0-9+]+):(?P//?)(?P.*)" +) + +# Matches data URIs +_DATA_URI_RE = re.compile(r"^data:[^,]*,") + + +def _match_protocol(pth: str) -> str: + if m := _PROTOCOL_RE.match(pth): + return m.group("protocol") + elif _DATA_URI_RE.match(pth): + return "data" + return "" + + +def get_upath_protocol( + pth: str | PurePath | os.PathLike, + *, + protocol: str | None = None, + storage_options: dict[str, Any] | None = None, +) -> str: + """return the filesystem spec protocol""" + if isinstance(pth, str): + pth_protocol = _match_protocol(pth) + elif isinstance(pth, PurePath): + pth_protocol = getattr(pth, "protocol", "") + else: + pth_protocol = _match_protocol(str(pth)) + # if storage_options and not protocol and not pth_protocol: + # protocol = "file" + if protocol and pth_protocol and not pth_protocol.startswith(protocol): + raise ValueError( + f"requested protocol {protocol!r} incompatible with {pth_protocol!r}" + ) + return protocol or pth_protocol or "" + + +def strip_upath_protocol( + pth: str | os.PathLike[str], + *, + allow_unknown: bool = False, +) -> str: + """strip protocol from path""" + if isinstance(pth, PurePath): + pth = str(pth) + elif not isinstance(pth, str): + pth = os.fspath(pth) + if m := _PROTOCOL_RE.match(pth): + if len(m.group("slashes")) == 1: + protocol = m.group("protocol") + path = m.group("path") + pth = f"{protocol}:///{path}" + try: + return strip_fsspec_protocol(pth) + except ValueError as err: + if allow_unknown and str(err).endswith(m.group("protocol")): + # fsspec raised ValueError because the protocol is not registered + return AbstractFileSystem._strip_protocol(pth) + raise + else: + return pth diff --git a/upath/core.py b/upath/core.py index 8ba8078f..4b41eda6 100644 --- a/upath/core.py +++ b/upath/core.py @@ -1,33 +1,55 @@ from __future__ import annotations -import re +import os import sys -from os import PathLike +import warnings +from copy import copy from pathlib import Path -from pathlib import PurePath -from pathlib import _PosixFlavour # type: ignore +from types import MappingProxyType from typing import TYPE_CHECKING -from typing import Sequence +from typing import Any +from typing import Mapping from typing import TypeVar from urllib.parse import urlsplit -from urllib.parse import urlunsplit -from fsspec.core import split_protocol -from fsspec.registry import get_filesystem_class -from fsspec.utils import stringify_path +from fsspec import AbstractFileSystem +from fsspec import get_filesystem_class +from upath._compat import FSSpecAccessorShim +from upath._compat import PathlibPathShim +from upath._compat import str_remove_prefix +from upath._compat import str_remove_suffix +from upath._flavour import FSSpecFlavour +from upath._protocol import get_upath_protocol from upath.registry import get_upath_class -if TYPE_CHECKING: - from typing import Any - from typing import Generator - from urllib.parse import SplitResult +__all__ = ["UPath"] - from fsspec.spec import AbstractFileSystem -__all__ = [ - "UPath", -] +def __getattr__(name): + if name == "_UriFlavour": + warnings.warn( + "upath.core._UriFlavour should not be used anymore." + " Please follow the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + return FSSpecFlavour + elif name == "PT": + warnings.warn( + "upath.core.PT should not be used anymore." + " Please follow the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + return TypeVar("PT", bound="UPath") + else: + raise AttributeError(name) + _FSSPEC_HAS_WORKING_GLOB = None @@ -43,522 +65,634 @@ def _check_fsspec_has_working_glob(): return g -class _FSSpecAccessor: - __slots__ = ("_fs",) - - def __init__(self, parsed_url: SplitResult | None, **kwargs: Any) -> None: - if parsed_url and parsed_url.scheme: - cls = get_filesystem_class(parsed_url.scheme) - url_kwargs = cls._get_kwargs_from_urls(urlunsplit(parsed_url)) - else: - cls = get_filesystem_class(None) - url_kwargs = {} - url_kwargs.update(kwargs) - self._fs = cls(**url_kwargs) +def _make_instance(cls, args, kwargs): + """helper for pickling UPath instances""" + return cls(*args, **kwargs) - def _format_path(self, path: UPath) -> str: - return path._path - def open(self, path, mode="r", *args, **kwargs): - return self._fs.open(self._format_path(path), mode, *args, **kwargs) - - def stat(self, path, **kwargs): - return self._fs.stat(self._format_path(path), **kwargs) - - def listdir(self, path, **kwargs): - p_fmt = self._format_path(path) - contents = self._fs.listdir(p_fmt, **kwargs) - if len(contents) == 0 and not self._fs.isdir(p_fmt): - raise NotADirectoryError(str(self)) - elif ( - len(contents) == 1 - and contents[0]["name"] == p_fmt - and contents[0]["type"] == "file" - ): - raise NotADirectoryError(str(self)) - return contents +# accessors are deprecated +_FSSpecAccessor = FSSpecAccessorShim - def glob(self, _path, path_pattern, **kwargs): - return self._fs.glob(self._format_path(path_pattern), **kwargs) - def exists(self, path, **kwargs): - return self._fs.exists(self._format_path(path), **kwargs) - - def info(self, path, **kwargs): - return self._fs.info(self._format_path(path), **kwargs) - - def rm(self, path, recursive, **kwargs): - return self._fs.rm(self._format_path(path), recursive=recursive, **kwargs) +class UPath(PathlibPathShim, Path): + __slots__ = ( + "_protocol", + "_storage_options", + "_fs_cached", + *PathlibPathShim.__missing_py312_slots__, + "__drv", + "__root", + "__parts", + ) + if TYPE_CHECKING: + _protocol: str + _storage_options: dict[str, Any] + _fs_cached: AbstractFileSystem + + _protocol_dispatch: bool | None = None + _flavour = FSSpecFlavour() + + # === upath.UPath constructor ===================================== + + def __new__( + cls, *args, protocol: str | None = None, **storage_options: Any + ) -> UPath: + # fill empty arguments + if not args: + args = (".",) + + # create a copy if UPath class + part0, *parts = args + if not parts and not storage_options and isinstance(part0, cls): + return copy(part0) + + # deprecate 'scheme' + if "scheme" in storage_options: + warnings.warn( + "use 'protocol' kwarg instead of 'scheme'", + DeprecationWarning, + stacklevel=2, + ) + protocol = storage_options.pop("scheme") - def mkdir(self, path, create_parents=True, **kwargs): - return self._fs.mkdir( - self._format_path(path), create_parents=create_parents, **kwargs + # determine the protocol + pth_protocol = get_upath_protocol( + part0, protocol=protocol, storage_options=storage_options ) + # determine which UPath subclass to dispatch to + if cls._protocol_dispatch or cls._protocol_dispatch is None: + upath_cls = get_upath_class(protocol=pth_protocol) + if upath_cls is None: + raise ValueError(f"Unsupported filesystem: {pth_protocol!r}") + else: + # user subclasses can request to disable protocol dispatch + # by setting MyUPathSubclass._protocol_dispatch to `False`. + # This will effectively ignore the registered UPath + # implementations and return an instance of MyUPathSubclass. + # This can be useful if a subclass wants to extend the UPath + # api, and it is fine to rely on the default implementation + # for all supported user protocols. + upath_cls = cls + + # create a new instance + if cls is UPath: + # we called UPath() directly, and want an instance based on the + # provided or detected protocol (i.e. upath_cls) + obj: UPath = object.__new__(upath_cls) + obj._protocol = pth_protocol + + elif issubclass(cls, upath_cls): + # we called a sub- or sub-sub-class of UPath, i.e. S3Path() and the + # corresponding upath_cls based on protocol is equal-to or a + # parent-of the cls. + obj = object.__new__(cls) + obj._protocol = pth_protocol + + elif issubclass(cls, UPath): + # we called a subclass of UPath directly, i.e. S3Path() but the + # detected protocol would return a non-related UPath subclass, i.e. + # S3Path("file:///abc"). This behavior is going to raise an error + # in future versions + msg_protocol = repr(pth_protocol) + if not pth_protocol: + msg_protocol += " (empty string)" + msg = ( + f"{cls.__name__!s}(...) detected protocol {msg_protocol!s} and" + f" returns a {upath_cls.__name__} instance that isn't a direct" + f" subclass of {cls.__name__}. This will raise an exception in" + " future universal_pathlib versions. To prevent the issue, use" + " UPath(...) to create instances of unrelated protocols or you" + f" can instead derive your subclass {cls.__name__!s}(...) from" + f" {upath_cls.__name__} or alternatively override behavior via" + f" registering the {cls.__name__} implementation with protocol" + f" {msg_protocol!s} replacing the default implementation." + ) + warnings.warn(msg, DeprecationWarning, stacklevel=2) - def makedirs(self, path, exist_ok=False, **kwargs): - return self._fs.makedirs(self._format_path(path), exist_ok=exist_ok, **kwargs) - - def touch(self, path, **kwargs): - return self._fs.touch(self._format_path(path), **kwargs) + obj = object.__new__(upath_cls) + obj._protocol = pth_protocol - def mv(self, path, target, recursive=False, maxdepth=None, **kwargs): - if hasattr(target, "_accessor"): - target = target._accessor._format_path(target) - return self._fs.mv( - self._format_path(path), - target, - recursive=recursive, - maxdepth=maxdepth, - **kwargs, - ) + upath_cls.__init__( + obj, *args, protocol=pth_protocol, **storage_options + ) # type: ignore + else: + raise RuntimeError("UPath.__new__ expected cls to be subclass of UPath") -class _UriFlavour(_PosixFlavour): - def parse_parts(self, parts): - parsed = [] - sep = self.sep - drv = root = "" - it = reversed(parts) - for part in it: - if part: - drv, root, rel = self.splitroot(part) - if not root or root and rel: - for x in reversed(rel.split(sep)): - parsed.append(sys.intern(x)) + return obj - if drv or root: - parsed.append(drv + root) - parsed.reverse() - return drv, root, parsed + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + # allow subclasses to customize __init__ arg parsing + base_options = getattr(self, "_storage_options", {}) + args, protocol, storage_options = type(self)._transform_init_args( + args, protocol or self._protocol, {**base_options, **storage_options} + ) + if self._protocol != protocol and protocol: + self._protocol = protocol + + # retrieve storage_options + if args: + args0 = args[0] + if isinstance(args0, UPath): + self._storage_options = {**args0.storage_options, **storage_options} + else: + self._storage_options = type(self)._parse_storage_options( + str(args0), protocol, storage_options + ) + else: + self._storage_options = storage_options.copy() - def splitroot(self, part, sep="/"): - # Treat the first slash in the path as the root if it exists - if part and part[0] == sep: - return "", sep, part[1:] - return "", "", part + # check that UPath subclasses in args are compatible + # --> ensures items in _raw_paths are compatible + for arg in args: + if not isinstance(arg, UPath): + continue + # protocols: only identical (or empty "") protocols can combine + if arg.protocol and arg.protocol != self._protocol: + raise TypeError("can't combine different UPath protocols as parts") + # storage_options: args may not define other storage_options + if any( + self._storage_options.get(key) != value + for key, value in arg.storage_options.items() + ): + # TODO: + # Future versions of UPath could verify that storage_options + # can be combined between UPath instances. Not sure if this + # is really necessary though. A warning might be enough... + pass + + # fill ._raw_paths + if hasattr(self, "_raw_paths"): + return + super().__init__(*args) + # === upath.UPath PUBLIC ADDITIONAL API =========================== -PT = TypeVar("PT", bound="UPath") + @property + def protocol(self) -> str: + return self._protocol + @property + def storage_options(self) -> Mapping[str, Any]: + return MappingProxyType(self._storage_options) -class UPath(Path): - __slots__ = ( - "_url", - "_kwargs", - "_accessor", # overwritten because of default in Python 3.10 - ) - _flavour = _UriFlavour() - _default_accessor = _FSSpecAccessor - - # typing - _drv: str - _root: str - _str: str - _url: SplitResult | None - _parts: list[str] - _closed: bool - _accessor: _FSSpecAccessor - - def __new__(cls: type[PT], *args: str | PathLike, **kwargs: Any) -> PT: - args_list = list(args) + @property + def fs(self) -> AbstractFileSystem: try: - other = args_list.pop(0) - except IndexError: - other = "." - else: - other = other or "." - - if isinstance(other, PurePath): - # Create a (modified) copy, if first arg is a Path object - _cls: type[Any] = type(other) - drv, root, parts = _cls._parse_args(args_list) - drv, root, parts = _cls._flavour.join_parsed_parts( - other._drv, other._root, other._parts, drv, root, parts # type: ignore # noqa: E501 - ) - - _kwargs = getattr(other, "_kwargs", {}) - _url = getattr(other, "_url", None) - other_kwargs = _kwargs.copy() - if _url and _url.scheme: - other_kwargs["url"] = _url - new_kwargs = _kwargs.copy() - new_kwargs.update(kwargs) - - return _cls( - _cls._format_parsed_parts(drv, root, parts, **other_kwargs), - **new_kwargs, + return self._fs_cached + except AttributeError: + fs = self._fs_cached = self._fs_factory( + str(self), self.protocol, self.storage_options ) + return fs - url = stringify_path(other) - protocol, _ = split_protocol(url) - parsed_url = urlsplit(url) + @property + def path(self) -> str: + return super().__str__() - if protocol is None and ":/" in url[2:]: # excludes windows paths: C:/... - protocol = kwargs.get("scheme", parsed_url.scheme) or "" - else: - protocol = kwargs.get("scheme", protocol) or "" + # === upath.UPath CUSTOMIZABLE API ================================ - upath_cls = get_upath_class(protocol=protocol) - if upath_cls is None: - raise ValueError(f"Unsupported filesystem: {parsed_url.scheme!r}") + @classmethod + def _transform_init_args( + cls, + args: tuple[str | os.PathLike, ...], + protocol: str, + storage_options: dict[str, Any], + ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]: + """allow customization of init args in subclasses""" + return args, protocol, storage_options - for key in ["scheme", "netloc"]: - val = kwargs.get(key) - if val: - parsed_url = parsed_url._replace(**{key: val}) + @classmethod + def _parse_storage_options( + cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> dict[str, Any]: + """Parse storage_options from the urlpath""" + fs_cls: type[AbstractFileSystem] = get_filesystem_class(protocol) + pth_storage_options = fs_cls._get_kwargs_from_urls(urlpath) + return {**pth_storage_options, **storage_options} - if not parsed_url.path: - parsed_url = parsed_url._replace(path="/") # ensure path has root + @classmethod + def _fs_factory( + cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> AbstractFileSystem: + """Instantiate the filesystem_spec filesystem class""" + fs_cls = get_filesystem_class(protocol) + so_dct = fs_cls._get_kwargs_from_urls(urlpath) + so_dct.update(storage_options) + return fs_cls(**storage_options) + + # === upath.UPath COMPATIBILITY API =============================== + + def __init_subclass__(cls, **kwargs): + """provide a clean migration path for custom user subclasses""" + + # Check if the user subclass has a custom `__new__` method + has_custom_new_method = cls.__new__ is not UPath.__new__ + + if has_custom_new_method and cls._protocol_dispatch is None: + warnings.warn( + "Detected a customized `__new__` method in subclass" + f" {cls.__name__!r}. Protocol dispatch will be disabled" + " for this subclass. Please follow the" + " universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + cls._protocol_dispatch = False - if not protocol: - args_list.insert(0, url) - else: - args_list.insert(0, parsed_url.path) + # Check if the user subclass has defined a custom accessor class + accessor_cls = getattr(cls, "_default_accessor", None) - return upath_cls._from_parts( # type: ignore - args_list, url=parsed_url, **kwargs + has_custom_legacy_accessor = ( + accessor_cls is not None + and issubclass(accessor_cls, FSSpecAccessorShim) + and accessor_cls is not FSSpecAccessorShim + ) + has_customized_fs_instantiation = ( + accessor_cls.__init__ is not FSSpecAccessorShim.__init__ + or hasattr(accessor_cls, "_fs") ) - @property - def protocol(self) -> str: - """The filesystem_spec protocol + if has_custom_legacy_accessor and has_customized_fs_instantiation: + warnings.warn( + "Detected a customized `__init__` method or `_fs` attribute" + f" in the provided `_FSSpecAccessor` subclass of {cls.__name__!r}." + " It is recommended to instead override the `UPath._fs_factory`" + " classmethod to customize filesystem instantiation. Please follow" + " the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) - For local paths protocol is either 'file' if the UPath instance - is backed by fsspec or '' if it's backed by stdlib pathlib. For - both `fsspec.get_filesystem_class` returns `LocalFileSystem`. - """ - if self._url is None: - return "" - return self._url.scheme + def _fs_factory( + cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> AbstractFileSystem: + url = urlsplit(urlpath) + if protocol: + url = url._replace(scheme=protocol) + inst = cls_._default_accessor(url, **storage_options) + return inst._fs + + def _parse_storage_options( + cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> dict[str, Any]: + url = urlsplit(urlpath) + if protocol: + url = url._replace(scheme=protocol) + inst = cls_._default_accessor(url, **storage_options) + return inst._fs.storage_options + + cls._fs_factory = classmethod(_fs_factory) + cls._parse_storage_options = classmethod(_parse_storage_options) @property - def storage_options(self) -> dict[str, Any]: - """The filesystem_spec storage options dictionary - - Accessing `.storage_options` does not instantiate the - corresponding fsspec filesystem class. - """ - return { - key: value - for key, value in self._kwargs.items() - if key not in {"scheme", "netloc", "url"} - } + def _path(self): + warnings.warn( + "UPath._path is deprecated and should not be used." + " Please follow the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + return self.path @property - def fs(self) -> AbstractFileSystem: - """The filesystem_spec filesystem instance""" - return self._accessor._fs + def _kwargs(self): + warnings.warn( + "UPath._kwargs is deprecated. Please use" + " UPath.storage_options instead. Follow the" + " universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + return self.storage_options @property - def path(self) -> str: - """The filesystem_spec path for use with a filesystem instance + def _url(self): + # TODO: + # _url should be deprecated, but for now there is no good way of + # accessing query parameters from urlpaths... + return urlsplit(self.as_posix()) - Note: for some file systems this can be prefixed by the protocol. - """ - return self._path - - def __getattr__(self, item: str) -> Any: + def __getattr__(self, item): if item == "_accessor": - # cache the _accessor attribute on first access - kwargs = self._kwargs.copy() - self._accessor = _accessor = self._default_accessor(self._url, **kwargs) - return _accessor + warnings.warn( + "UPath._accessor is deprecated. Please use" + " UPath.fs instead. Follow the" + " universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + if hasattr(self, "_default_accessor"): + accessor_cls = self._default_accessor + else: + accessor_cls = FSSpecAccessorShim + return accessor_cls.from_path(self) else: raise AttributeError(item) - def _make_child(self: PT, args: list[str]) -> PT: - drv, root, parts = self._parse_args(args) - drv, root, parts = self._flavour.join_parsed_parts( - self._drv, self._root, self._parts, drv, root, parts + @classmethod + def _from_parts(cls, parts, **kwargs): + warnings.warn( + "UPath._from_parts is deprecated and should not be used." + " Please follow the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, ) - return self._from_parsed_parts(drv, root, parts, url=self._url, **self._kwargs) + parsed_url = kwargs.pop("url", None) + if parsed_url: + if protocol := parsed_url.scheme: + kwargs["protocol"] = protocol + if netloc := parsed_url.netloc: + kwargs["netloc"] = netloc + obj = UPath.__new__(cls, parts, **kwargs) + obj.__init__(*parts, **kwargs) + return obj - def _make_child_relpath(self: PT, part: str) -> PT: - # This is an optimization used for dir walking. `part` must be - # a single part relative to this path. - if self._parts[-1:] == [""] and part: - parts = self._parts[:-1] + [part] - else: - parts = self._parts + [part] - return self._from_parsed_parts( - self._drv, self._root, parts, url=self._url, **self._kwargs + @classmethod + def _parse_args(cls, args): + warnings.warn( + "UPath._parse_args is deprecated and should not be used." + " Please follow the universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, ) + pth = cls._flavour.join(*args) + return cls._parse_path(pth) @classmethod - def _format_parsed_parts( - cls: type[PT], - drv: str, - root: str, - parts: list[str], - url: SplitResult | None = None, - **kwargs: Any, - ) -> str: - if parts: - join_parts = parts[1:] if parts[0] == "/" else parts - else: - join_parts = [] - if drv or root: - path: str = drv + root + cls._flavour.join(join_parts) - else: - path = cls._flavour.join(join_parts) - if not url: - scheme: str = kwargs.get("scheme", "file") - netloc: str = kwargs.get("netloc", "") - else: - scheme, netloc = url.scheme, url.netloc - scheme = (scheme + ":") if scheme else "" - netloc = "//" + netloc # always add netloc - formatted = scheme + netloc + path - return formatted + def _format_parsed_parts(cls, drv, root, tail, **kwargs): + if kwargs: + warnings.warn( + "UPath._format_parsed_parts should not be used with" + " additional kwargs. Please follow the" + " universal_pathlib==0.2.0 migration guide at" + " https://github.com/fsspec/universal_pathlib for more" + " information.", + DeprecationWarning, + stacklevel=2, + ) + if "url" in kwargs and tail[:1] == [f"{drv}{root}"]: + # This was called from code that expected py38-py311 behavior + # of _format_parsed_parts, which takes drv, root and parts + tail = tail[1:] + return super()._format_parsed_parts(drv, root, tail) @property - def _path(self) -> str: - if self._parts: - join_parts = self._parts[1:] if self._parts[0] == "/" else self._parts - path: str = self._flavour.join(join_parts) - return self._root + path - else: - return "/" + def _drv(self): + # direct access to ._drv should emit a warning, + # but there is no good way of doing this for now... + try: + return self.__drv + except AttributeError: + self._load_parts() + return self.__drv - def open(self, *args, **kwargs): - return self._accessor.open(self, *args, **kwargs) + @_drv.setter + def _drv(self, value): + self.__drv = value @property - def parent(self: PT) -> PT: - """The logical parent of the path.""" - drv = self._drv - root = self._root - parts = self._parts - if len(parts) == 1 and (drv or root): - return self - return self._from_parsed_parts( - drv, root, parts[:-1], url=self._url, **self._kwargs - ) - - def stat(self): - return self._accessor.stat(self) - - def samefile(self, other_path) -> bool: - raise NotImplementedError + def _root(self): + # direct access to ._root should emit a warning, + # but there is no good way of doing this for now... + try: + return self.__root + except AttributeError: + self._load_parts() + return self.__root - def iterdir(self: PT) -> Generator[PT, None, None]: - """Iterate over the files in this directory. Does not yield any - result for the special paths '.' and '..'. - """ - for name in self._accessor.listdir(self): - # fsspec returns dictionaries - if isinstance(name, dict): - name = name.get("name") - if name in {".", ".."}: - # Yielding a path object for these makes little sense - continue - # only want the path name with iterdir - name = self._sub_path(name) - yield self._make_child_relpath(name) + @_root.setter + def _root(self, value): + self.__root = value - def relative_to(self: PT, *other: str | PathLike) -> PT: - for other_item in other: - if not isinstance(other_item, self.__class__) and not isinstance( - other_item, str - ): - raise ValueError( - f"{repr(self)} and {repr(other_item)} are " - "not of compatible classes." - ) - if not isinstance(other_item, str) and ( - self._url is None - or other_item._url is None - or other_item._url.scheme != self._url.scheme - or other_item._url.netloc != self._url.netloc - or other_item._kwargs != self._kwargs - ): - raise ValueError( - f"{self} and {other_item} do not share the same " - "base URL and storage options." - ) - output: PT = super().relative_to(*other) # type: ignore - output._url = self._url - output._kwargs = self._kwargs - return output + @property + def _parts(self): + # UPath._parts is not used anymore, and not available + # in pathlib.Path for Python 3.12 and later. + # Direct access to ._parts should emit a deprecation warning, + # but there is no good way of doing this for now... + try: + return self.__parts + except AttributeError: + self._load_parts() + self.__parts = super().parts + return list(self.__parts) - def _scandir(self): - # provided in Python3.11 but not required in fsspec glob implementation - raise NotImplementedError + @_parts.setter + def _parts(self, value): + self.__parts = value - def glob(self: PT, pattern: str) -> Generator[PT, None, None]: - path_pattern = self.joinpath(pattern) - for name in self._accessor.glob(self, path_pattern): - name = self._sub_path(name) - name = name.split(self._flavour.sep) - yield self._make_child(name) + # === pathlib.PurePath ============================================ - def rglob(self: PT, pattern: str) -> Generator[PT, None, None]: - if _FSSPEC_HAS_WORKING_GLOB is None: - _check_fsspec_has_working_glob() + def __reduce__(self): + args = tuple(self._raw_paths) + kwargs = { + "protocol": self._protocol, + **self._storage_options, + } + return _make_instance, (type(self), args, kwargs) - if _FSSPEC_HAS_WORKING_GLOB: - r_path_pattern = self.joinpath("**", pattern) - for name in self._accessor.glob(self, r_path_pattern): - name = self._sub_path(name) - name = name.split(self._flavour.sep) - yield self._make_child(name) + def with_segments(self, *pathsegments): + return type(self)( + *pathsegments, + protocol=self._protocol, + **self._storage_options, + ) + @classmethod + def _parse_path(cls, path): + if getattr(cls._flavour, "supports_empty_parts", False): + drv, root, rel = cls._flavour.splitroot(path) + if not root: + parsed = [] + else: + parsed = list(map(sys.intern, rel.split(cls._flavour.sep))) + if parsed[-1] == ".": + parsed[-1] = "" + parsed = [x for x in parsed if x != "."] + return drv, root, parsed + return super()._parse_path(path) + + def __str__(self): + if self._protocol: + return f"{self._protocol}://{self.path}" else: - path_pattern = self.joinpath(pattern) - r_path_pattern = self.joinpath("**", pattern) - seen = set() - for p in (path_pattern, r_path_pattern): - for name in self._accessor.glob(self, p): - name = self._sub_path(name) - name = name.split(self._flavour.sep) - pth = self._make_child(name) - if pth.parts not in seen: - yield pth - seen.add(pth.parts) - - def _sub_path(self, name): - # only want the path name with iterdir - sp = re.escape(self._path) - return re.sub(f"^({sp}|{sp[1:]})/?", "", name) - - def absolute(self: PT) -> PT: - # fsspec paths are always absolute - return self + return self.path - def resolve(self: PT, strict: bool = False) -> PT: - """Return a new path with '.' and '..' parts normalized.""" - _parts = self._parts + def __fspath__(self): + msg = ( + "in a future version of UPath this will be set to None" + " unless the filesystem is local (or caches locally)" + ) + warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) + return str(self) - # Do not attempt to normalize path if no parts are dots - if ".." not in _parts and "." not in _parts: - return self + def __bytes__(self): + msg = ( + "in a future version of UPath this will be set to None" + " unless the filesystem is local (or caches locally)" + ) + warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) + return os.fsencode(self) - sep = self._flavour.sep + def as_uri(self): + return str(self) - resolved: list[str] = [] - resolvable_parts = _parts[1:] - idx_max = len(resolvable_parts) - 1 - for i, part in enumerate(resolvable_parts): - if part == "..": - if resolved: - resolved.pop() - elif part != ".": - if i < idx_max: - part += sep - resolved.append(part) + def is_reserved(self): + return False - path = "".join(resolved) - url = self._url - if url is not None: - url = url._replace(path=path) - parts = _parts[:1] + path.split(sep) - - return self._from_parsed_parts( - self._drv, - self._root, - parts, - url=url, - **self._kwargs, + def __eq__(self, other): + if not isinstance(other, UPath): + return NotImplemented + return ( + self.path == other.path + and self.storage_options == other.storage_options + and ( + get_filesystem_class(self.protocol) + == get_filesystem_class(other.protocol) + ) ) - def exists(self) -> bool: - """Check whether this path exists or not.""" - accessor = self._accessor - try: - return bool(accessor.exists(self)) - except AttributeError: - try: - self._accessor.stat(self) - except FileNotFoundError: - return False - return True + def __hash__(self): + return hash((self.path, self.storage_options, self.protocol)) - def is_dir(self) -> bool: - try: - info = self._accessor.info(self) - if info["type"] == "directory": - return True - except FileNotFoundError: - return False - return False + def relative_to(self, other, /, *_deprecated, walk_up=False): + if isinstance(other, UPath) and self.storage_options != other.storage_options: + raise ValueError( + "paths have different storage_options:" + f" {self.storage_options!r} != {other.storage_options!r}" + ) + return super().relative_to(other, *_deprecated, walk_up=walk_up) - def is_file(self) -> bool: - try: - info = self._accessor.info(self) - if info["type"] == "file": - return True - except FileNotFoundError: + def is_relative_to(self, other, /, *_deprecated): + if isinstance(other, UPath) and self.storage_options != other.storage_options: return False - return False + return super().is_relative_to(other, *_deprecated) + + # === pathlib.Path ================================================ + + def stat(self, *, follow_symlinks=True): + return self.fs.stat(self.path) + + def lstat(self): + # return self.stat(follow_symlinks=False) + raise NotImplementedError + + def exists(self, *, follow_symlinks=True): + return self.fs.exists(self.path) - def is_mount(self) -> bool: + def is_dir(self): + return self.fs.isdir(self.path) + + def is_file(self): + return self.fs.isfile(self.path) + + def is_mount(self): return False - def is_symlink(self) -> bool: + def is_symlink(self): try: - info = self._accessor.info(self) + info = self.fs.info(self.path) if "islink" in info: return bool(info["islink"]) except FileNotFoundError: return False return False - def is_socket(self) -> bool: + def is_junction(self): return False - def is_fifo(self) -> bool: + def is_block_device(self): return False - def is_block_device(self) -> bool: + def is_char_device(self): return False - def is_char_device(self) -> bool: + def is_fifo(self): return False - def is_absolute(self) -> bool: - return True + def is_socket(self): + return False - def unlink(self, missing_ok: bool = False) -> None: - if not self.exists(): - if not missing_ok: - raise FileNotFoundError(str(self)) - return - self._accessor.rm(self, recursive=False) + def samefile(self, other_path): + raise NotImplementedError - def rmdir(self, recursive: bool = True) -> None: - if not self.is_dir(): - raise NotADirectoryError(str(self)) - if not recursive and next(self.iterdir()): # type: ignore - raise OSError(f"Not recursive and directory not empty: {self}") - self._accessor.rm(self, recursive=recursive) + def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None): + return self.fs.open(self.path, mode) # fixme - def chmod(self, mode, *, follow_symlinks: bool = True) -> None: - raise NotImplementedError + def iterdir(self): + if getattr(self._flavour, "supports_empty_parts", False) and self.parts[ + -1: + ] == ("",): + base = self.with_segments(self.anchor, *self._tail[:-1]) + else: + base = self + for name in self.fs.listdir(self.path): + # fsspec returns dictionaries + if isinstance(name, dict): + name = name.get("name") + if name in {".", ".."}: + # Yielding a path object for these makes little sense + continue + # only want the path name with iterdir + _, _, name = str_remove_suffix(name, "/").rpartition(self._flavour.sep) + yield base._make_child_relpath(name) - def rename(self, target, recursive=False, maxdepth=None, **kwargs): - """Move file, see `fsspec.AbstractFileSystem.mv`.""" - if not isinstance(target, UPath): - target = self.parent.joinpath(target).resolve() - self._accessor.mv( - self, - target, - recursive=recursive, - maxdepth=maxdepth, - **kwargs, - ) - return target + def _scandir(self): + raise NotImplementedError # todo - def replace(self, target): - raise NotImplementedError + def _make_child_relpath(self, name): + path = super()._make_child_relpath(name) + del path._str # fix _str = str(self) assignment + return path - def symlink_to(self, target, target_is_directory=False): - raise NotImplementedError + def glob(self, pattern: str, *, case_sensitive=None): + path_pattern = self.joinpath(pattern).path + sep = self._flavour.sep + for name in self.fs.glob(path_pattern): + name = str_remove_prefix(str_remove_prefix(name, self.path), sep) + yield self.joinpath(name) - def hardlink_to(self, target): - raise NotImplementedError + def rglob(self, pattern: str, *, case_sensitive=None): + if _FSSPEC_HAS_WORKING_GLOB is None: + _check_fsspec_has_working_glob() - def link_to(self, target): - raise NotImplementedError + if _FSSPEC_HAS_WORKING_GLOB: + r_path_pattern = self.joinpath("**", pattern).path + sep = self._flavour.sep + for name in self.fs.glob(r_path_pattern): + name = str_remove_prefix(str_remove_prefix(name, self.path), sep) + yield self.joinpath(name) + + else: + path_pattern = self.joinpath(pattern).path + r_path_pattern = self.joinpath("**", pattern).path + sep = self._flavour.sep + seen = set() + for p in (path_pattern, r_path_pattern): + for name in self.fs.glob(p): + name = str_remove_prefix(str_remove_prefix(name, self.path), sep) + if name in seen: + continue + else: + seen.add(name) + yield self.joinpath(name) @classmethod def cwd(cls): @@ -574,50 +708,54 @@ def home(cls): else: raise NotImplementedError - def expanduser(self): - raise NotImplementedError + def absolute(self): + return self - def group(self): - raise NotImplementedError + def resolve(self, strict: bool = False): + _parts = self.parts - def lchmod(self, mode): - raise NotImplementedError + # Do not attempt to normalize path if no parts are dots + if ".." not in _parts and "." not in _parts: + return self - def lstat(self): - raise NotImplementedError + resolved: list[str] = [] + resolvable_parts = _parts[1:] + last_idx = len(resolvable_parts) - 1 + for idx, part in enumerate(resolvable_parts): + if part == "..": + if resolved: + resolved.pop() + if ( + getattr(self._flavour, "supports_empty_parts", False) + and idx == last_idx + ): + resolved.append("") + elif part != ".": + resolved.append(part) + + return self.with_segments(*_parts[:1], *resolved) def owner(self): raise NotImplementedError + def group(self): + raise NotImplementedError + def readlink(self): raise NotImplementedError - def touch(self, *args: int, truncate: bool = True, **kwargs) -> None: - # Keep the calling signature compatible with Path - # (without changing current fsspec behavior for defaults) - if len(args) > 2: - raise TypeError("too many arguments") - else: - for key, val in zip(["mode", "exists_ok"], args): - if key in kwargs: - raise TypeError(f"provided {key!r} as arg and kwarg") - kwargs[key] = val - self._accessor.touch(self, truncate=truncate, **kwargs) - - def mkdir( - self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False - ) -> None: - """ - Create a new directory at this given path. - """ + def touch(self, mode=0o666, exist_ok=True): + self.fs.touch(self.path, truncate=not exist_ok) + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): if parents: if not exist_ok and self.exists(): raise FileExistsError(str(self)) - self._accessor.makedirs(self, exist_ok=exist_ok) + self.fs.makedirs(self.path, exist_ok=exist_ok) else: try: - self._accessor.mkdir( - self, + self.fs.mkdir( + self.path, create_parents=False, mode=mode, ) @@ -625,231 +763,45 @@ def mkdir( if not exist_ok or not self.is_dir(): raise FileExistsError(str(self)) - @classmethod - def _from_parts( - cls: type[PT], - args: list[str | PathLike], - url: SplitResult | None = None, - **kwargs: Any, - ) -> PT: - obj = object.__new__(cls) - drv, root, parts = obj._parse_args(args) - obj._drv = drv - if sys.version_info < (3, 9): - obj._closed = False - obj._kwargs = kwargs.copy() - - if not root: - if not parts: - root = "/" - parts = ["/"] - elif parts[0] == "/": - root = parts[1:] - obj._root = root - obj._parts = parts - - # Update to (full) URL - if url: - url = url._replace(path=root + cls._flavour.join(parts[1:])) - obj._url = url - - return obj - - @classmethod - def _from_parsed_parts( - cls: type[PT], - drv: str, - root: str, - parts: list[str], - url: SplitResult | None = None, - **kwargs: Any, - ) -> PT: - obj = object.__new__(cls) - obj._drv = drv - obj._parts = parts - if sys.version_info < (3, 9): - obj._closed = False - obj._kwargs = kwargs.copy() - - if not root: - if not parts: - root = "/" - elif parts[0] == "/": - root = parts.pop(0) - if len(obj._parts) == 0 or obj._parts[0] != root: - obj._parts.insert(0, root) - obj._root = root - - if url: - url = url._replace(path=root + cls._flavour.join(parts[1:])) - obj._url = url - return obj - - def __eq__(self, other): - if not isinstance(other, self.__class__): - return NotImplemented - p0, p1 = self.parts, other.parts - if len(p0) > len(p1): - if p0 and p0[-1] == "": - p0 = p0[:-1] - elif len(p1) > len(p0): - if p1 and p1[-1] == "": - p1 = p1[:-1] - return ( - p0 == p1 - and self.protocol == other.protocol - and self.storage_options == other.storage_options - ) - - def __str__(self) -> str: - """Return the string representation of the path, suitable for - passing to system calls.""" - try: - return self._str - except AttributeError: - self._str = self._format_parsed_parts( - self._drv, - self._root, - self._parts, - url=self._url, - **self._kwargs, - ) - return self._str - - def __truediv__(self: PT, key: str | PathLike) -> PT: - # Add `/` root if not present - if len(self._parts) == 0: - key = f"{self._root}{key}" + def chmod(self, mode, *, follow_symlinks=True): + raise NotImplementedError - # Adapted from `PurePath._make_child` - drv, root, parts = self._parse_args((key,)) - drv, root, parts = self._flavour.join_parsed_parts( - self._drv, self._root, self._parts, drv, root, parts - ) + def unlink(self, missing_ok=False): + if not self.exists(): + if not missing_ok: + raise FileNotFoundError(str(self)) + return + self.fs.rm(self.path, recursive=False) - kwargs = self._kwargs.copy() + def rmdir(self, recursive: bool = True): # fixme: non-standard + if not self.is_dir(): + raise NotADirectoryError(str(self)) + if not recursive and next(self.iterdir()): + raise OSError(f"Not recursive and directory not empty: {self}") + self.fs.rm(self.path, recursive=recursive) - # Create a new object - out = self.__class__( - self._format_parsed_parts(drv, root, parts, url=self._url), + def rename( + self, target, *, recursive=False, maxdepth=None, **kwargs + ): # fixme: non-standard + if not isinstance(target, UPath): + target = self.parent.joinpath(target).resolve() + self.fs.mv( + self.path, + target.path, + recursive=recursive, + maxdepth=maxdepth, **kwargs, ) - return out - - def __setstate__(self, state: dict) -> None: - self._kwargs = state["_kwargs"].copy() - - def __reduce__(self): - cls = type(self) - return ( - cls, - ( - cls._format_parsed_parts( - self._drv, self._root, self._parts, url=self._url - ), - ), - {"_kwargs": self._kwargs.copy()}, - ) - - def with_suffix(self: PT, suffix: str) -> PT: - """Return a new path with the file suffix changed. If the path - has no suffix, add given suffix. If the given suffix is an empty - string, remove the suffix from the path. - """ - f = self._flavour - if f.sep in suffix or f.altsep and f.altsep in suffix: - raise ValueError(f"Invalid suffix {suffix!r}") - if suffix and not suffix.startswith(".") or suffix == ".": - raise ValueError("Invalid suffix %r" % (suffix)) - name = self.name - if not name: - raise ValueError(f"{self!r} has an empty name") - old_suffix = self.suffix - if not old_suffix: - name = name + suffix - else: - name = name[: -len(old_suffix)] + suffix - return self._from_parsed_parts( - self._drv, - self._root, - self._parts[:-1] + [name], - url=self._url, - **self._kwargs, - ) - - def with_name(self: PT, name: str) -> PT: - """Return a new path with the file name changed.""" - if not self.name: - raise ValueError(f"{self!r} has an empty name") - drv, root, parts = self._flavour.parse_parts((name,)) - if ( - not name - or name[-1] in [self._flavour.sep, self._flavour.altsep] - or drv - or root - or len(parts) != 1 - ): - raise ValueError("Invalid name %r" % (name)) - return self._from_parsed_parts( - self._drv, - self._root, - self._parts[:-1] + [name], - url=self._url, - **self._kwargs, - ) - - @property - def parents(self) -> _UPathParents: - """A sequence of this upath's logical parents.""" - return _UPathParents(self) - - def as_uri(self) -> str: - return str(self) - + return target -class _UPathParents(Sequence[UPath]): - """This object provides sequence-like access to the logical ancestors - of a path. Don't try to construct it yourself.""" + def replace(self, target): + raise NotImplementedError # todo - __slots__ = ( - "_pathcls", - "_drv", - "_root", - "_parts", - "_url", - "_kwargs", - ) + def symlink_to(self, target, target_is_directory=False): + raise NotImplementedError - def __init__(self, path): - # We don't store the instance to avoid reference cycles - self._pathcls = type(path) - self._drv = path._drv - self._root = path._root - self._parts = path._parts - self._url = path._url - self._kwargs = path._kwargs - - def __len__(self): - if self._drv or self._root: - return len(self._parts) - 1 - else: - return len(self._parts) - - def __getitem__(self, idx): - if isinstance(idx, slice): - return tuple(self[i] for i in range(*idx.indices(len(self)))) - - if idx >= len(self) or idx < -len(self): - raise IndexError(idx) - if idx < 0: - idx += len(self) - return self._pathcls._from_parsed_parts( - self._drv, - self._root, - self._parts[: -idx - 1], - url=self._url, - **self._kwargs, - ) + def hardlink_to(self, target): + raise NotImplementedError - def __repr__(self): - return f"<{self._pathcls.__name__}.parents>" + def expanduser(self): + raise NotImplementedError diff --git a/upath/core312plus.py b/upath/core312plus.py deleted file mode 100644 index dae68419..00000000 --- a/upath/core312plus.py +++ /dev/null @@ -1,668 +0,0 @@ -from __future__ import annotations - -import os -import posixpath -import re -import sys -import warnings -from copy import copy -from pathlib import Path -from pathlib import PurePath -from types import MappingProxyType -from typing import TYPE_CHECKING -from typing import Any -from typing import Mapping -from typing import TypeAlias -from typing import cast -from urllib.parse import urlsplit - -if sys.version_info >= (3, 11): - from typing import Self -else: - Self = Any - -from fsspec import AbstractFileSystem -from fsspec import filesystem -from fsspec import get_filesystem_class -from fsspec.core import strip_protocol as fsspec_strip_protocol - -from upath.registry import get_upath_class - -PathOrStr: TypeAlias = "str | PurePath | os.PathLike" - - -class _FSSpecAccessor: - """this is a compatibility shim and will be removed""" - - -class FSSpecFlavour: - """fsspec flavour for universal_pathlib - - **INTERNAL AND VERY MUCH EXPERIMENTAL** - - Implements the fsspec compatible low-level lexical operations on - PurePathBase-like objects. - - Note: - In case you find yourself in need of subclassing FSSpecFlavour, - please open an issue in the universal_pathlib issue tracker: - https://github.com/fsspec/universal_pathlib/issues - Ideally we can find a way to make your use-case work by adding - more functionality to this class. - - """ - - def __init__( - self, - *, - # URI behavior - join_prepends_protocol: bool = False, - join_like_urljoin: bool = False, - supports_empty_parts: bool = False, - supports_netloc: bool = False, - supports_query_parameters: bool = False, - supports_fragments: bool = False, - posixpath_only: bool = True, - # configurable separators - sep: str = "/", - altsep: str | None = None, - ): - self._owner = None - # separators - self.sep = sep - self.altsep = altsep - # configuration - self.join_prepends_protocol = join_prepends_protocol - self.join_like_urljoin = join_like_urljoin - self.supports_empty_parts = supports_empty_parts - self.supports_netloc = supports_netloc - self.supports_query_parameters = supports_query_parameters - self.supports_fragments = supports_fragments - self.posixpath_only = posixpath_only - - def __set_name__(self, owner, name): - # helper to provide a more informative repr - self._owner = owner.__name__ - - def _asdict(self) -> dict[str, Any]: - """return a dict representation of the flavour's settings""" - dct = vars(self).copy() - dct.pop("_owner") - return dct - - def __repr__(self): - return f"<{__name__}.{type(self).__name__} of {self._owner}>" - - def join(self, __path: PathOrStr, *paths: PathOrStr) -> str: - """Join two or more path components, inserting '/' as needed.""" - path = strip_upath_protocol(__path) - paths = map(strip_upath_protocol, paths) - - if self.join_like_urljoin: - path = path.removesuffix("/") - sep = self.sep - for b in paths: - if b.startswith(sep): - path = b - elif not path: - path += b - else: - path += sep + b - joined = path - elif self.posixpath_only: - joined = posixpath.join(path, *paths) - else: - joined = os.path.join(path, *paths) - - if self.join_prepends_protocol and (protocol := _match_protocol(__path)): - joined = f"{protocol}://{joined}" - - return joined - - def splitroot(self, __path: PathOrStr) -> tuple[str, str, str]: - """Split a path in the drive, the root and the rest.""" - if self.supports_fragments or self.supports_query_parameters: - url = urlsplit(__path) - drive = url._replace(path="", query="", fragment="").geturl() - path = url._replace(scheme="", netloc="").geturl() - root = "/" if path.startswith("/") else "" - return drive, root, path.removeprefix("/") - - path = strip_upath_protocol(__path) - if self.supports_netloc: - protocol = _match_protocol(__path) - if protocol: - drive, root, tail = path.partition("/") - return drive, root or "/", tail - else: - return "", "", path - elif self.posixpath_only: - return posixpath.splitroot(path) - else: - drv, root, path = os.path.splitroot(path) - if os.name == "nt" and not drv: - drv = "C:" - return drv, root, path - - def splitdrive(self, __path: PathOrStr) -> tuple[str, str]: - """Split a path into drive and path.""" - if self.supports_fragments or self.supports_query_parameters: - path = strip_upath_protocol(__path) - url = urlsplit(path) - path = url._replace(scheme="", netloc="").geturl() - drive = url._replace(path="", query="", fragment="").geturl() - return drive, path - - path = strip_upath_protocol(__path) - if self.supports_netloc: - protocol = _match_protocol(__path) - if protocol: - drive, root, tail = path.partition("/") - return drive, f"{root}{tail}" - else: - return "", path - elif self.posixpath_only: - return posixpath.splitdrive(path) - else: - drv, path = os.path.splitdrive(path) - if os.name == "nt" and not drv: - drv = "C:" - return drv, path - - def normcase(self, __path: PathOrStr) -> str: - """Normalize case of pathname. Has no effect under Posix""" - if self.posixpath_only: - return posixpath.normcase(__path) - else: - return os.path.normcase(__path) - - -_PROTOCOL_RE = re.compile( - r"^(?P[A-Za-z][A-Za-z0-9+]+):(?P//?)(?P.*)" -) - - -def strip_upath_protocol(pth: PathOrStr) -> str: - """strip protocol from path""" - if isinstance(pth, PurePath): - pth = str(pth) - elif not isinstance(pth, str): - pth = os.fspath(pth) - if m := _PROTOCOL_RE.match(pth): - protocol = m.group("protocol") - path = m.group("path") - if len(m.group("slashes")) == 1: - pth = f"{protocol}:///{path}" - return fsspec_strip_protocol(pth) - else: - return pth - - -def _match_protocol(pth: str) -> str: - if m := _PROTOCOL_RE.match(pth): - return m.group("protocol") - return "" - - -def get_upath_protocol( - pth: str | PurePath | os.PathLike, - *, - protocol: str | None = None, - storage_options: dict[str, Any] | None = None, -) -> str: - """return the filesystem spec protocol""" - if isinstance(pth, str): - pth_protocol = _match_protocol(pth) - elif isinstance(pth, UPath): - pth_protocol = pth.protocol - elif isinstance(pth, PurePath): - pth_protocol = "" - else: - pth_protocol = _match_protocol(os.fspath(pth)) - if storage_options and not protocol and not pth_protocol: - protocol = "file" - if protocol and pth_protocol and not pth_protocol.startswith(protocol): - raise ValueError( - f"requested protocol {protocol!r} incompatible with {pth_protocol!r}" - ) - return protocol or pth_protocol or "" - - -def _make_instance(cls, args, kwargs): - """helper for pickling UPath instances""" - return cls(*args, **kwargs) - - -class UPath(Path): - __slots__ = ( - "_protocol", - "_storage_options", - "_fs_cached", - ) - if TYPE_CHECKING: - _protocol: str - _storage_options: dict[str, Any] - _fs_cached: AbstractFileSystem - - _flavour = FSSpecFlavour() - - def __new__( - cls, *args, protocol: str | None = None, **storage_options: Any - ) -> UPath: - # fill empty arguments - if not args: - args = (".",) - - # create a copy if UPath class - part0, *parts = args - if not parts and not storage_options and isinstance(part0, cls): - return copy(part0) - - # deprecate 'scheme' - if "scheme" in storage_options: - warnings.warn( - "use 'protocol' kwarg instead of 'scheme'", - DeprecationWarning, - stacklevel=2, - ) - protocol = storage_options.pop("scheme") - - # determine which UPath subclass to dispatch to - pth_protocol = get_upath_protocol( - part0, protocol=protocol, storage_options=storage_options - ) - upath_cls = get_upath_class(protocol=pth_protocol) - if upath_cls is None: - raise ValueError(f"Unsupported filesystem: {pth_protocol!r}") - - # create a new instance - if cls is UPath: - # we called UPath() directly, and want an instance based on the - # provided or detected protocol (i.e. upath_cls) - obj: UPath = cast("UPath", object.__new__(upath_cls)) - obj._protocol = pth_protocol - - elif issubclass(cls, upath_cls): - # we called a sub- or sub-sub-class of UPath, i.e. S3Path() and the - # corresponding upath_cls based on protocol is equal-to or a - # parent-of the cls. - obj = cast("UPath", object.__new__(cls)) # type: ignore[unreachable] - obj._protocol = pth_protocol - - elif issubclass(cls, UPath): - # we called a subclass of UPath directly, i.e. S3Path() but the - # detected protocol would return a non-related UPath subclass, i.e. - # S3Path("file:///abc"). This behavior is going to raise an error - # in future versions - msg_protocol = repr(pth_protocol) - if not pth_protocol: - msg_protocol += " (empty string)" - msg = ( - f"{cls.__name__!s}(...) detected protocol {msg_protocol!s} and" - f" returns a {upath_cls.__name__} instance that isn't a direct" - f" subclass of {cls.__name__}. This will raise an exception in" - " future universal_pathlib versions. To prevent the issue, use" - " UPath(...) to create instances of unrelated protocols or you" - f" can instead derive your subclass {cls.__name__!s}(...) from" - f" {upath_cls.__name__} or alternatively override behavior via" - f" registering the {cls.__name__} implementation with protocol" - f" {msg_protocol!s} replacing the default implementation." - ) - warnings.warn(msg, DeprecationWarning, stacklevel=2) - - obj = cast("UPath", object.__new__(upath_cls)) - obj._protocol = pth_protocol - - upath_cls.__init__( - obj, *args, protocol=pth_protocol, **storage_options - ) # type: ignore - - else: - raise RuntimeError("UPath.__new__ expected cls to be subclass of UPath") - - return obj - - def __init__( - self, *args, protocol: str | None = None, **storage_options: Any - ) -> None: - # retrieve storage_options - if args: - args0 = args[0] - if isinstance(args0, UPath): - self._storage_options = {**args0.storage_options, **storage_options} - else: - fs_cls: type[AbstractFileSystem] = get_filesystem_class( - protocol or self._protocol - ) - pth_storage_options = fs_cls._get_kwargs_from_urls(str(args0)) - self._storage_options = {**pth_storage_options, **storage_options} - else: - self._storage_options = storage_options.copy() - - # check that UPath subclasses in args are compatible - # --> ensures items in _raw_paths are compatible - for arg in args: - if not isinstance(arg, UPath): - continue - # protocols: only identical (or empty "") protocols can combine - if arg.protocol and arg.protocol != self._protocol: - raise TypeError("can't combine different UPath protocols as parts") - # storage_options: args may not define other storage_options - if any( - self._storage_options.get(key) != value - for key, value in arg.storage_options.items() - ): - # raise ValueError( - # "can't combine different UPath storage_options as parts" - # ) todo: revisit and define behaviour - pass - - # fill ._raw_paths - super().__init__(*args) - - # === upath.UPath only ============================================ - - @property - def protocol(self) -> str: - return self._protocol - - @property - def storage_options(self) -> Mapping[str, Any]: - return MappingProxyType(self._storage_options) - - @property - def fs(self) -> AbstractFileSystem: - try: - return self._fs_cached - except AttributeError: - fs = self._fs_cached = filesystem( - protocol=self.protocol, **self.storage_options - ) - return fs - - @property - def path(self) -> str: - return super().__str__() - - @property - def _kwargs(self): - warnings.warn( - "use UPath.storage_options instead of UPath._kwargs", - DeprecationWarning, - stacklevel=2, - ) - return self.storage_options - - @property - def _url(self): # todo: deprecate - return urlsplit(self.as_posix()) - - # === pathlib.PurePath ============================================ - - def __reduce__(self): - args = tuple(self._raw_paths) - kwargs = { - "protocol": self._protocol, - **self._storage_options, - } - return _make_instance, (type(self), args, kwargs) - - def with_segments(self, *pathsegments): - return type(self)( - *pathsegments, - protocol=self._protocol, - **self._storage_options, - ) - - @classmethod - def _parse_path(cls, path): - if cls._flavour.supports_empty_parts: - drv, root, rel = cls._flavour.splitroot(path) - if not root: - parsed = [] - else: - parsed = list(map(sys.intern, rel.split(cls._flavour.sep))) - if parsed[-1] == ".": - parsed[-1] = "" - parsed = [x for x in parsed if x != "."] - return drv, root, parsed - return super()._parse_path(path) - - def __str__(self): - if self._protocol: - return f"{self._protocol}://{self.path}" - else: - return self.path - - def __fspath__(self): - msg = ( - "in a future version of UPath this will be set to None" - " unless the filesystem is local (or caches locally)" - ) - warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) - return str(self) - - def __bytes__(self): - msg = ( - "in a future version of UPath this will be set to None" - " unless the filesystem is local (or caches locally)" - ) - warnings.warn(msg, PendingDeprecationWarning, stacklevel=2) - return os.fsencode(self) - - def as_uri(self): - return str(self) - - def is_reserved(self): - return False - - def relative_to(self, other, /, *_deprecated, walk_up=False): - if isinstance(other, UPath) and self.storage_options != other.storage_options: - raise ValueError( - "paths have different storage_options:" - f" {self.storage_options!r} != {other.storage_options!r}" - ) - return super().relative_to(other, *_deprecated, walk_up=walk_up) - - def is_relative_to(self, other, /, *_deprecated): - if isinstance(other, UPath) and self.storage_options != other.storage_options: - return False - return super().is_relative_to(other, *_deprecated) - - # === pathlib.Path ================================================ - - def stat(self, *, follow_symlinks=True): - return self.fs.stat(self.path) - - def lstat(self): - # return self.stat(follow_symlinks=False) - raise NotImplementedError - - def exists(self, *, follow_symlinks=True): - return self.fs.exists(self.path) - - def is_dir(self): - return self.fs.isdir(self.path) - - def is_file(self): - return self.fs.isfile(self.path) - - def is_mount(self): - return False - - def is_symlink(self): - try: - info = self.fs.info(self.path) - if "islink" in info: - return bool(info["islink"]) - except FileNotFoundError: - return False - return False - - def is_junction(self): - return False - - def is_block_device(self): - return False - - def is_char_device(self): - return False - - def is_fifo(self): - return False - - def is_socket(self): - return False - - def samefile(self, other_path): - raise NotImplementedError - - def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None): - return self.fs.open(self.path, mode) # fixme - - def iterdir(self): - if self._flavour.supports_empty_parts and self.parts[-1:] == ("",): - base = self.with_segments(self.anchor, *self._tail[:-1]) - else: - base = self - for name in self.fs.listdir(self.path): - # fsspec returns dictionaries - if isinstance(name, dict): - name = name.get("name") - if name in {".", ".."}: - # Yielding a path object for these makes little sense - continue - # only want the path name with iterdir - _, _, name = name.removesuffix("/").rpartition(self._flavour.sep) - yield base._make_child_relpath(name) - - def _scandir(self): - raise NotImplementedError # todo - - def _make_child_relpath(self, name): - path = super()._make_child_relpath(name) - del path._str # fix _str = str(self) assignment - return path - - def glob(self, pattern: str, *, case_sensitive=None): - path_pattern = self.joinpath(pattern).path - sep = self._flavour.sep - for name in self.fs.glob(path_pattern): - name = name.removeprefix(self.path).removeprefix(sep) - yield self.joinpath(name) - - def rglob(self, pattern: str, *, case_sensitive=None): - r_path_pattern = self.joinpath("**", pattern).path - sep = self._flavour.sep - for name in self.fs.glob(r_path_pattern): - name = name.removeprefix(self.path).removeprefix(sep) - yield self.joinpath(name) - - @classmethod - def cwd(cls): - if cls is UPath: - return get_upath_class("").cwd() - else: - raise NotImplementedError - - @classmethod - def home(cls): - if cls is UPath: - return get_upath_class("").home() - else: - raise NotImplementedError - - def absolute(self) -> Self: - return self - - def resolve(self, strict: bool = False) -> Self: - _parts = self.parts - - # Do not attempt to normalize path if no parts are dots - if ".." not in _parts and "." not in _parts: - return self - - resolved: list[str] = [] - resolvable_parts = _parts[1:] - last_idx = len(resolvable_parts) - 1 - for idx, part in enumerate(resolvable_parts): - if part == "..": - if resolved: - resolved.pop() - if self._flavour.supports_empty_parts and idx == last_idx: - resolved.append("") - elif part != ".": - resolved.append(part) - - return self.with_segments(*_parts[:1], *resolved) - - def owner(self): - raise NotImplementedError - - def group(self): - raise NotImplementedError - - def readlink(self): - raise NotImplementedError - - def touch(self, mode=0o666, exist_ok=True): - self.fs.touch(self.path, truncate=not exist_ok) - - def mkdir(self, mode=0o777, parents=False, exist_ok=False): - if parents: - if not exist_ok and self.exists(): - raise FileExistsError(str(self)) - self.fs.makedirs(self.path, exist_ok=exist_ok) - else: - try: - self.fs.mkdir( - self.path, - create_parents=False, - mode=mode, - ) - except FileExistsError: - if not exist_ok or not self.is_dir(): - raise FileExistsError(str(self)) - - def chmod(self, mode, *, follow_symlinks=True): - raise NotImplementedError - - def unlink(self, missing_ok=False): - if not self.exists(): - if not missing_ok: - raise FileNotFoundError(str(self)) - return - self.fs.rm(self.path, recursive=False) - - def rmdir(self, recursive: bool = True): # fixme: non-standard - if not self.is_dir(): - raise NotADirectoryError(str(self)) - if not recursive and next(self.iterdir()): - raise OSError(f"Not recursive and directory not empty: {self}") - self.fs.rm(self.path, recursive=recursive) - - def rename( - self, target, *, recursive=False, maxdepth=None, **kwargs - ): # fixme: non-standard - if not isinstance(target, UPath): - target = self.parent.joinpath(target).resolve() - self.fs.mv( - self.path, - target.path, - recursive=recursive, - maxdepth=maxdepth, - **kwargs, - ) - return target - - def replace(self, target): - raise NotImplementedError # todo - - def symlink_to(self, target, target_is_directory=False): - raise NotImplementedError - - def hardlink_to(self, target): - raise NotImplementedError - - def expanduser(self): - raise NotImplementedError diff --git a/upath/implementations/cloud.py b/upath/implementations/cloud.py index c9ad05e7..427deb7b 100644 --- a/upath/implementations/cloud.py +++ b/upath/implementations/cloud.py @@ -1,131 +1,59 @@ from __future__ import annotations -import re -import sys -import warnings from typing import Any -import upath.core - - -class _CloudAccessor(upath.core._FSSpecAccessor): - def _format_path(self, path): - """ - netloc has already been set to project via `CloudPath._from_parts` - """ - return f"{path._url.netloc}/{path._path.lstrip('/')}" - - def mkdir(self, path, create_parents=True, **kwargs): - _path = self._format_path(path) - if ( - not create_parents - and not kwargs.get("exist_ok", False) - and self._fs.exists(_path) - ): - raise FileExistsError(_path) - return super().mkdir(path, create_parents=create_parents, **kwargs) - - -class CloudPath(upath.core.UPath): - _default_accessor = _CloudAccessor - - @classmethod - def _from_parts(cls, args, url=None, **kwargs): - if kwargs.get("bucket") and url is not None: - bucket = kwargs.pop("bucket") - url = url._replace(netloc=bucket) - obj = super()._from_parts(args, url, **kwargs) - return obj - - @classmethod - def _from_parsed_parts(cls, drv, root, parts, url=None, **kwargs): - if kwargs.get("bucket") and url is not None: - bucket = kwargs.pop("bucket") - url = url._replace(netloc=bucket) - obj = super()._from_parsed_parts(drv, root, parts, url=url, **kwargs) - return obj - - def _sub_path(self, name): - """ - `gcsfs` and `s3fs` return the full path as `/` with - `listdir` and `glob`. However, in `iterdir` and `glob` we only want the - relative path to `self`. - """ - sp = re.escape(self._path) - netloc = self._url.netloc - return re.sub( - f"^({netloc})?/?({sp}|{sp[1:]})/?", - "", - name, - ) - - def joinpath(self, *args): - if self._url.netloc: - return super().joinpath(*args) - - # if no bucket is defined for self - sep = self._flavour.sep - args_list = [] - for arg in args: - if isinstance(arg, list): - warnings.warn( - "lists as arguments to joinpath are deprecated", - DeprecationWarning, - stacklevel=2, - ) - args_list.extend(arg) - else: - args_list.extend(arg.split(sep)) - bucket = args_list.pop(0) - return type(self)( - "/", - *args_list, - **self.storage_options, - bucket=bucket, - scheme=self.protocol, - ) - - @property - def path(self) -> str: - if self._url is None: - raise RuntimeError(str(self)) - return f"{self._url.netloc}{super()._path}" - - -if sys.version_info >= (3, 12): - from upath.core312plus import FSSpecFlavour - - class CloudPath(upath.core312plus.UPath): # noqa - __slots__ = () - _flavour = FSSpecFlavour( - join_prepends_protocol=True, - supports_netloc=True, - ) - - def __init__( - self, *args, protocol: str | None = None, **storage_options: Any - ) -> None: - if "bucket" in storage_options: - bucket = storage_options.pop("bucket") - args = [f"{self._protocol}://{bucket}/", *args] - super().__init__(*args, protocol=protocol, **storage_options) - - def mkdir( - self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False - ) -> None: - if not parents and not exist_ok and self.exists(): - raise FileExistsError(self.path) - super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok) - - def iterdir(self): - if self.is_file(): - raise NotADirectoryError(str(self)) - yield from super().iterdir() - - def relative_to(self, other, /, *_deprecated, walk_up=False): - # use the parent implementation for the ValueError logic - super().relative_to(other, *_deprecated, walk_up=False) - return self +from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim +from upath._flavour import FSSpecFlavour as _FSSpecFlavour +from upath.core import UPath + +__all__ = [ + "CloudPath", + "GCSPath", + "S3Path", + "AzurePath", +] + + +# accessors are deprecated +_CloudAccessor = _FSSpecAccessorShim + + +class CloudPath(UPath): + __slots__ = () + _flavour = _FSSpecFlavour( + join_prepends_protocol=True, + supports_netloc=True, + ) + + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + for key in ["bucket", "netloc"]: + bucket = storage_options.pop(key, None) + if bucket: + if args[0].startswith("/"): + args = (f"{self._protocol}://{bucket}{args[0]}", *args[1:]) + else: + args = (f"{self._protocol}://{bucket}/", *args) + break + super().__init__(*args, protocol=protocol, **storage_options) + + def mkdir( + self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False + ) -> None: + if not parents and not exist_ok and self.exists(): + raise FileExistsError(self.path) + super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok) + + def iterdir(self): + if self.is_file(): + raise NotADirectoryError(str(self)) + yield from super().iterdir() + + def relative_to(self, other, /, *_deprecated, walk_up=False): + # use the parent implementation for the ValueError logic + super().relative_to(other, *_deprecated, walk_up=False) + return self class GCSPath(CloudPath): diff --git a/upath/implementations/hdfs.py b/upath/implementations/hdfs.py index 50b1c75c..55e553c8 100644 --- a/upath/implementations/hdfs.py +++ b/upath/implementations/hdfs.py @@ -1,56 +1,23 @@ from __future__ import annotations -import sys +from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim +from upath.core import UPath -import upath.core +__all__ = ["HDFSPath"] +# accessors are deprecated +_HDFSAccessor = _FSSpecAccessorShim -class _HDFSAccessor(upath.core._FSSpecAccessor): - def __init__(self, parsed_url, *args, **kwargs): - super().__init__(parsed_url, *args, **kwargs) - self._fs.root_marker = "/" - def touch(self, path, **kwargs): - kwargs.pop("truncate", None) - super().touch(path, **kwargs) +class HDFSPath(UPath): + __slots__ = () - def mkdir(self, path, create_parents=True, **kwargs): - pth = self._format_path(path) - if create_parents: - return self._fs.makedirs(pth, **kwargs) - else: - if not kwargs.get("exist_ok", False) and self._fs.exists(pth): - raise FileExistsError(pth) - print(kwargs, self._fs.exists(pth), pth) - return self._fs.mkdir(pth, create_parents=create_parents, **kwargs) + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + if not exist_ok and self.exists(): + raise FileExistsError(str(self)) + super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok) - def listdir(self, path, **kwargs): - try: - yield from super().listdir(path, **kwargs) - except OSError as err: - if err.args and err.args[0].startswith( - "GetFileInfo expects base_dir of selector to be a directory" - ): - raise NotADirectoryError(path) - raise - - -class HDFSPath(upath.core.UPath): - _default_accessor = _HDFSAccessor - - -if sys.version_info >= (3, 12): - import upath.core312plus - - class HDFSPath(upath.core312plus.UPath): # noqa - __slots__ = () - - def mkdir(self, mode=0o777, parents=False, exist_ok=False): - if not exist_ok and self.exists(): - raise FileExistsError(str(self)) - super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok) - - def iterdir(self): - if self.is_file(): - raise NotADirectoryError(str(self)) - yield from super().iterdir() + def iterdir(self): + if self.is_file(): + raise NotADirectoryError(str(self)) + yield from super().iterdir() diff --git a/upath/implementations/http.py b/upath/implementations/http.py index 9f647316..0d0dc21f 100644 --- a/upath/implementations/http.py +++ b/upath/implementations/http.py @@ -1,77 +1,79 @@ from __future__ import annotations -import sys from itertools import chain -from urllib.parse import urlunsplit from fsspec.asyn import sync -import upath.core +from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim +from upath._flavour import FSSpecFlavour as _FSSpecFlavour +from upath.core import UPath +__all__ = ["HTTPPath"] -class _HTTPAccessor(upath.core._FSSpecAccessor): - def __init__(self, parsed_url, *args, **kwargs): - super().__init__(parsed_url, *args, **kwargs) +# accessors are deprecated +_HTTPAccessor = _FSSpecAccessorShim - def _format_path(self, path): - return str(path) +class HTTPPath(UPath): + _flavour = _FSSpecFlavour( + join_like_urljoin=True, + supports_empty_parts=True, + supports_netloc=True, + supports_query_parameters=True, + supports_fragments=True, + ) -class HTTPPath(upath.core.UPath): - _default_accessor = _HTTPAccessor + @property + def root(self) -> str: + return super().root or "/" - def is_dir(self): + def __str__(self): + return super(UPath, self).__str__() + + def is_file(self): try: - return self._path_type() == "directory" + next(super().iterdir()) + except (StopIteration, NotADirectoryError): + return True except FileNotFoundError: return False + else: + return False - def is_file(self): + def is_dir(self): try: - return self._path_type() == "file" + next(super().iterdir()) + except (StopIteration, NotADirectoryError): + return False except FileNotFoundError: return False + else: + return True - def _path_type(self): + def iterdir(self): + it = iter(super().iterdir()) try: - next(self.iterdir()) + item0 = next(it) except (StopIteration, NotADirectoryError): - return "file" + raise NotADirectoryError(str(self)) + except FileNotFoundError: + raise FileNotFoundError(str(self)) else: - return "directory" - - def _sub_path(self, name): - """ - `fsspec` returns the full path as `scheme://netloc/` with - `listdir` and `glob`. However, in `iterdir` and `glob` we only want the - relative path to `self`. - """ - complete_address = self._format_parsed_parts( - None, None, [self._path], url=self._url, **self._kwargs - ) - - if name.startswith(complete_address): - name = name[len(complete_address) :] # noqa: E203 - name = name.strip("/") - - return name + yield from chain([item0], it) def resolve( - self: HTTPPath, strict: bool = False, follow_redirects: bool = True + self: HTTPPath, + strict: bool = False, + follow_redirects: bool = True, ) -> HTTPPath: """Normalize the path and resolve redirects.""" # Normalise the path resolved_path = super().resolve(strict=strict) if follow_redirects: - # Ensure we have a url - parsed_url = resolved_path._url - if parsed_url is None: - return resolved_path - else: - url = parsed_url.geturl() # Get the fsspec fs - fs = resolved_path._accessor._fs + fs = self.fs + url = str(self) # Ensure we have a session session = sync(fs.loop, fs.set_session) # Use HEAD requests if the server allows it, falling back to GETs @@ -87,110 +89,3 @@ def resolve( break return resolved_path - - @property - def drive(self): - return f"{self._url.scheme}://{self._url.netloc}" - - @property - def anchor(self) -> str: - return self.drive + self.root - - @property - def parts(self) -> tuple[str, ...]: - parts = super().parts - if not parts: - return () - p0, *partsN = parts - if p0 == "/": - p0 = self.anchor - if not partsN and self._url and self._url.path == "/": - partsN = [""] - return (p0, *partsN) - - @property - def path(self) -> str: - # http filesystems use the full url as path - if self._url is None: - raise RuntimeError(str(self)) - return urlunsplit(self._url) - - -if sys.version_info >= (3, 12): # noqa - from upath.core312plus import FSSpecFlavour - - class HTTPPath(upath.core312plus.UPath): # noqa - _flavour = FSSpecFlavour( - join_like_urljoin=True, - supports_empty_parts=True, - supports_netloc=True, - supports_query_parameters=True, - supports_fragments=True, - ) - - @property - def root(self) -> str: - return super().root or "/" - - def __str__(self): - return super(upath.core312plus.UPath, self).__str__() - - def is_file(self): - try: - next(super().iterdir()) - except (StopIteration, NotADirectoryError): - return True - except FileNotFoundError: - return False - else: - return False - - def is_dir(self): - try: - next(super().iterdir()) - except (StopIteration, NotADirectoryError): - return False - except FileNotFoundError: - return False - else: - return True - - def iterdir(self): - it = iter(super().iterdir()) - try: - item0 = next(it) - except (StopIteration, NotADirectoryError): - raise NotADirectoryError(str(self)) - except FileNotFoundError: - raise FileNotFoundError(str(self)) - else: - yield from chain([item0], it) - - def resolve( - self: HTTPPath, - strict: bool = False, - follow_redirects: bool = True, - ) -> HTTPPath: - """Normalize the path and resolve redirects.""" - # Normalise the path - resolved_path = super().resolve(strict=strict) - - if follow_redirects: - # Get the fsspec fs - fs = self.fs - url = str(self) - # Ensure we have a session - session = sync(fs.loop, fs.set_session) - # Use HEAD requests if the server allows it, falling back to GETs - for method in (session.head, session.get): - r = sync(fs.loop, method, url, allow_redirects=True) - try: - r.raise_for_status() - except Exception as exc: - if method == session.get: - raise FileNotFoundError(self) from exc - else: - resolved_path = HTTPPath(str(r.url)) - break - - return resolved_path diff --git a/upath/implementations/local.py b/upath/implementations/local.py index e51d3871..dd7dcce2 100644 --- a/upath/implementations/local.py +++ b/upath/implementations/local.py @@ -7,11 +7,14 @@ from pathlib import PosixPath from pathlib import WindowsPath from typing import Any -from typing import Iterable +from typing import Collection +from typing import MutableMapping from urllib.parse import SplitResult -from fsspec.implementations.local import LocalFileSystem +from fsspec import __version__ as fsspec_version +from packaging.version import Version +from upath._flavour import FSSpecFlavour as _FSSpecFlavour from upath.core import UPath __all__ = [ @@ -21,169 +24,158 @@ "WindowsUPath", ] +_LISTDIR_WORKS_ON_FILES = Version(fsspec_version) >= Version("2024.2.0") + class LocalPath(UPath): __slots__ = () + _flavour = _FSSpecFlavour( + posixpath_only=False, + ) + @property + def path(self): + sep = self._flavour.sep + if self.drive: + return f"/{super().path}".replace(sep, "/") + return super().path.replace(sep, "/") -class FilePath(LocalPath): - __slots__ = () - + @property + def _url(self): + return SplitResult(self.protocol, "", self.path, "", "") -_PY310_IGNORE = {"__slots__", "__module__", "_from_parts", "__new__"} +class FilePath(LocalPath): + __slots__ = () -def _iterate_class_attrs( - path_cls: type[Path], - ignore: set[str] = frozenset(), -) -> Iterable[tuple[str, Any]]: + def iterdir(self): + if _LISTDIR_WORKS_ON_FILES and self.is_file(): + raise NotADirectoryError(f"{self}") + return super().iterdir() + + +_pathlib_py312_ignore = { + "__slots__", + "__module__", + "__new__", + "__init__", + "_from_parts", + "_from_parsed_parts", + "with_segments", +} + + +def _set_class_attributes( + type_dict: MutableMapping[str, Any], + src: type[Path], + *, + ignore: Collection[str] = frozenset(_pathlib_py312_ignore), +) -> None: + """helper function to assign all methods/attrs from src to a class dict""" visited = set() - for cls in path_cls.__mro__: + for cls in src.__mro__: if cls is object: continue for attr, func_or_value in cls.__dict__.items(): - if attr in ignore: - continue - if attr in visited: - continue if ismemberdescriptor(func_or_value): continue + if attr in ignore or attr in visited: + continue + else: + visited.add(attr) + + type_dict[attr] = func_or_value + - yield attr, func_or_value - visited.add(attr) +def _upath_init(inst: PosixUPath | WindowsUPath) -> None: + """helper to initialize the PosixPath/WindowsPath instance with UPath attrs""" + inst._protocol = "" + inst._storage_options = {} + if sys.version_info < (3, 10): + inst._init() class PosixUPath(PosixPath, LocalPath): __slots__ = () - if os.name == "nt": - __new__ = PosixPath.__new__ # type: ignore - # assign all PosixPath methods/attrs to prevent multi inheritance issues - for attr, func_or_attr in _iterate_class_attrs(PosixPath, ignore=_PY310_IGNORE): - locals()[attr] = func_or_attr - del attr, func_or_attr - - @property - def fs(self): - return LocalFileSystem() + _set_class_attributes(locals(), src=PosixPath) + + if sys.version_info < (3, 12): + + def __new__( + cls, *args, protocol: str | None = None, **storage_options: Any + ) -> UPath: + if os.name == "nt": + raise NotImplementedError( + f"cannot instantiate {cls.__name__} on your system" + ) + obj = super().__new__(cls, *args) + obj._protocol = "" + return obj + + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + super(Path, self).__init__() + self._drv, self._root, self._parts = type(self)._parse_args(args) + _upath_init(self) + + @classmethod + def _from_parts(cls, *args, **kwargs): + obj = super(Path, cls)._from_parts(*args, **kwargs) + _upath_init(obj) + return obj + + @classmethod + def _from_parsed_parts(cls, drv, root, parts): + obj = super(Path, cls)._from_parsed_parts(drv, root, parts) + _upath_init(obj) + return obj - @property - def path(self) -> str: - return str(self) - - @classmethod - def _from_parts(cls, args, *, url=None, **kw): - obj = super(UPath, cls)._from_parts(args) - obj._kwargs = {} - obj._url = SplitResult("", "", str(obj), "", "") - return obj - - @classmethod - def _from_parsed_parts( - cls, - drv, - root, - parts, - url=None, - **kwargs: Any, - ): - obj = super(UPath, cls)._from_parsed_parts( # type: ignore[misc] - drv, root, parts - ) - obj._kwargs = {} - obj._url = SplitResult("", "", str(obj), "", "") - return obj + @property + def path(self) -> str: + return PosixPath.__str__(self) class WindowsUPath(WindowsPath, LocalPath): __slots__ = () - if os.name != "nt": - __new__ = WindowsPath.__new__ # type: ignore - # assign all WindowsPath methods/attrs to prevent multi inheritance issues - for attr, func_or_attr in _iterate_class_attrs(WindowsPath, ignore=_PY310_IGNORE): - locals()[attr] = func_or_attr - del attr, func_or_attr - - @property - def fs(self): - return LocalFileSystem() - - @property - def path(self) -> str: - return str(self) - - @classmethod - def _from_parts(cls, args, *, url=None, **kw): - obj = super(UPath, cls)._from_parts(args) - obj._kwargs = {} - obj._url = SplitResult("", "", str(obj), "", "") - return obj - - @classmethod - def _from_parsed_parts( - cls, - drv, - root, - parts, - url=None, - **kwargs: Any, - ): - obj = super(UPath, cls)._from_parsed_parts( # type: ignore[misc] - drv, root, parts - ) - obj._kwargs = {} - obj._url = SplitResult("", "", str(obj), "", "") - return obj - - -if sys.version_info >= (3, 12): # noqa: C901 - from upath.core312plus import FSSpecFlavour - - class LocalPath(UPath): - __slots__ = () - _flavour = FSSpecFlavour( - posixpath_only=False, - ) - - @property - def path(self): - sep = self._flavour.sep - if self.drive: - return f"/{super().path}".replace(sep, "/") - return super().path.replace(sep, "/") + _set_class_attributes(locals(), src=WindowsPath) + + if sys.version_info < (3, 12): + + def __new__( + cls, *args, protocol: str | None = None, **storage_options: Any + ) -> UPath: + if os.name != "nt": + raise NotImplementedError( + f"cannot instantiate {cls.__name__} on your system" + ) + obj = super().__new__(cls, *args) + obj._protocol = "" + return obj + + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + super(Path, self).__init__() + self._drv, self._root, self._parts = self._parse_args(args) + _upath_init(self) + + @classmethod + def _from_parts(cls, *args, **kwargs): + obj = super(Path, cls)._from_parts(*args, **kwargs) + _upath_init(obj) + return obj + + @classmethod + def _from_parsed_parts(cls, drv, root, parts): + obj = super(Path, cls)._from_parsed_parts(drv, root, parts) + _upath_init(obj) + return obj @property - def _url(self): - return SplitResult(self.protocol, "", self.path, "", "") - - class FilePath(LocalPath): # noqa - __slots__ = () - - _PY312_IGNORE = {"__slots__", "__module__", "__new__", "__init__", "with_segments"} - - class PosixUPath(PosixPath, LocalPath): # noqa - __slots__ = () - - if os.name == "nt": - __new__ = PosixPath.__new__ - - # assign all PosixPath methods/attrs to prevent multi inheritance issues - for attr, func_or_attr in _iterate_class_attrs(PosixPath, ignore=_PY312_IGNORE): - locals()[attr] = func_or_attr - del attr, func_or_attr - - class WindowsUPath(WindowsPath, LocalPath): # noqa - __slots__ = () - - if os.name != "nt": - __new__ = WindowsPath.__new__ - - # assign all WindowsPath methods/attrs to prevent multi inheritance issues - for attr, func_or_attr in _iterate_class_attrs( - WindowsPath, ignore=_PY312_IGNORE - ): - locals()[attr] = func_or_attr - del attr, func_or_attr + def path(self) -> str: + return WindowsPath.__str__(self) diff --git a/upath/implementations/memory.py b/upath/implementations/memory.py index 09e564a7..7169cd42 100644 --- a/upath/implementations/memory.py +++ b/upath/implementations/memory.py @@ -1,79 +1,27 @@ from __future__ import annotations -import sys -from typing import Any -from urllib.parse import SplitResult +from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim +from upath.core import UPath -import upath.core +__all__ = ["MemoryPath"] +# accessors are deprecated +_MemoryAccessor = _FSSpecAccessorShim -class _MemoryAccessor(upath.core._FSSpecAccessor): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._fs.root_marker = "" - - -class MemoryPath(upath.core.UPath): - _default_accessor = _MemoryAccessor +class MemoryPath(UPath): def iterdir(self): - """Iterate over the files in this directory. Does not yield any - result for the special paths '.' and '..'. - """ - for name in self._accessor.listdir(self): - # fsspec returns dictionaries - if isinstance(name, dict): - name = name.get("name") - if name in {".", ".."}: - # Yielding a path object for these makes little sense - continue - # only want the path name with iterdir - name = name.rstrip("/") - name = self._sub_path(name) - yield self._make_child_relpath(name) + if not self.is_dir(): + raise NotADirectoryError(str(self)) + yield from super().iterdir() - @classmethod - def _from_parts(cls, args, url=None, **kwargs): - if url and url.netloc: - if args: - if args[0].startswith("/"): - args[0] = args[0][1:] - args[0:1] = [f"/{url.netloc}/{args[0]}"] - else: - args[:] = f"/{url.netloc}" - url = url._replace(netloc="") - return super()._from_parts(args, url=url, **kwargs) + @property + def path(self): + path = super().path + return "/" if path == "." else path - @classmethod - def _format_parsed_parts( - cls, - drv: str, - root: str, - parts: list[str], - url: SplitResult | None = None, - **kwargs: Any, - ) -> str: - s = super()._format_parsed_parts(drv, root, parts, url=url, **kwargs) + def __str__(self): + s = super().__str__() if s.startswith("memory:///"): s = s.replace("memory:///", "memory://", 1) return s - - -if sys.version_info >= (3, 12): - - class MemoryPath(upath.core.UPath): # noqa - def iterdir(self): - if not self.is_dir(): - raise NotADirectoryError(str(self)) - yield from super().iterdir() - - @property - def path(self): - path = super().path - return "/" if path == "." else path - - def __str__(self): - s = super().__str__() - if s.startswith("memory:///"): - s = s.replace("memory:///", "memory://", 1) - return s diff --git a/upath/implementations/webdav.py b/upath/implementations/webdav.py index ff1f1225..941d6f7b 100644 --- a/upath/implementations/webdav.py +++ b/upath/implementations/webdav.py @@ -1,114 +1,67 @@ from __future__ import annotations -import sys from typing import Any -from urllib.parse import ParseResult from urllib.parse import urlsplit from urllib.parse import urlunsplit -import upath.core +from fsspec.registry import known_implementations +from fsspec.registry import register_implementation +from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim +from upath._compat import str_remove_prefix +from upath._compat import str_remove_suffix +from upath.core import UPath -class _WebdavAccessor(upath.core._FSSpecAccessor): - def __init__(self, parsed_url: ParseResult, **kwargs): - from webdav4.fsspec import WebdavFileSystem +__all__ = [ + "WebdavPath", +] - parsed_url = parsed_url._replace(scheme=parsed_url.scheme[7:], path="") - base_url = urlunsplit(parsed_url) - self._fs = WebdavFileSystem(base_url=base_url, **kwargs) +# webdav was only registered in fsspec>=2022.5.0 +if "webdav" not in known_implementations: + import webdav4.fsspec - def listdir(self, path, **kwargs): - base_url = urlunsplit(path._url._replace(path="")) - for file_info in self._fs.listdir( - self._format_path(path).lstrip("/"), **kwargs - ): - yield { - **file_info, - "name": f"{base_url}/{file_info['name']}", - } + register_implementation("webdav", webdav4.fsspec.WebdavFileSystem) - def glob(self, path, path_pattern, **kwargs): - base_url = urlunsplit(path._url._replace(path="")) - for file_path in self._fs.glob( - self._format_path(path_pattern).lstrip("/"), **kwargs - ): - yield f"{base_url}/{file_path}" +# accessors are deprecated +_WebdavAccessor = _FSSpecAccessorShim -class WebdavPath(upath.core.UPath): - _default_accessor = _WebdavAccessor - def _sub_path(self, name): - """fsspec returns path as `scheme://netloc/` with listdir - and glob, so we potentially need to sub the whole string - """ - sp = self.path - complete_address = self._format_parsed_parts( - None, None, [sp], url=self._url, **self._kwargs - ) +class WebdavPath(UPath): + __slots__ = () - if name.startswith(complete_address): - name = name[len(complete_address) :] # noqa: E203 - name = name.strip("/") - - return name - - @property - def protocol(self) -> str: - if self._url is None: - raise RuntimeError(str(self)) - return self._url.scheme.split("+")[0] - - @property - def storage_options(self) -> dict[str, Any]: - if self._url is None: - raise RuntimeError(str(self)) - sopts = super().storage_options - http_protocol = self._url.scheme.split("+")[1] - assert http_protocol in {"http", "https"} - base_url = urlunsplit(self._url._replace(scheme=http_protocol, path="")) - sopts["base_url"] = base_url - return sopts - - -if sys.version_info >= (3, 12): - import upath.core312plus - - class WebdavPath(upath.core312plus.UPath): # noqa - __slots__ = () - - def __init__( - self, *args, protocol: str | None = None, **storage_options: Any - ) -> None: - base_options = getattr(self, "_storage_options", {}) # when unpickling - if args: - args0, *argsN = args - url = urlsplit(str(args0)) - args0 = urlunsplit(url._replace(scheme="", netloc="")) or "/" - if "base_url" not in storage_options: - if self._protocol == "webdav+http": - storage_options["base_url"] = urlunsplit( - url._replace(scheme="http", path="") - ) - elif self._protocol == "webdav+https": - storage_options["base_url"] = urlunsplit( - url._replace(scheme="https", path="") - ) - else: - args0, argsN = "/", () - storage_options = {**base_options, **storage_options} + def __init__( + self, *args, protocol: str | None = None, **storage_options: Any + ) -> None: + base_options = getattr(self, "_storage_options", {}) # when unpickling + if args: + args0, *argsN = args + url = urlsplit(str(args0)) + args0 = urlunsplit(url._replace(scheme="", netloc="")) or "/" if "base_url" not in storage_options: - raise ValueError( - f"must provide `base_url` storage option for args: {args!r}" - ) - self._protocol = "webdav" - super().__init__(args0, *argsN, protocol="webdav", **storage_options) + if self._protocol == "webdav+http": + storage_options["base_url"] = urlunsplit( + url._replace(scheme="http", path="") + ) + elif self._protocol == "webdav+https": + storage_options["base_url"] = urlunsplit( + url._replace(scheme="https", path="") + ) + else: + args0, argsN = "/", [] + storage_options = {**base_options, **storage_options} + if "base_url" not in storage_options: + raise ValueError( + f"must provide `base_url` storage option for args: {args!r}" + ) + self._protocol = "webdav" + super().__init__(args0, *argsN, protocol="webdav", **storage_options) - @property - def path(self) -> str: - # webdav paths don't start at "/" - return super().path.removeprefix("/") + @property + def path(self) -> str: + # webdav paths don't start at "/" + return str_remove_prefix(super().path, "/") - def __str__(self): - base_url = self.storage_options["base_url"].removesuffix("/") - return super().__str__().replace("webdav://", f"webdav+{base_url}", 1) + def __str__(self): + base_url = str_remove_suffix(self.storage_options["base_url"], "/") + return super().__str__().replace("webdav://", f"webdav+{base_url}", 1) diff --git a/upath/registry.py b/upath/registry.py index 1bf6f67f..f93feeb1 100644 --- a/upath/registry.py +++ b/upath/registry.py @@ -27,6 +27,7 @@ myproto = my_module.submodule:MyPath ``` """ + from __future__ import annotations import os @@ -37,6 +38,7 @@ from functools import lru_cache from importlib import import_module from importlib.metadata import entry_points +from typing import TYPE_CHECKING from typing import Iterator from typing import MutableMapping @@ -63,8 +65,8 @@ class _Registry(MutableMapping[str, "type[upath.UPath]"]): "abfss": "upath.implementations.cloud.AzurePath", "adl": "upath.implementations.cloud.AzurePath", "az": "upath.implementations.cloud.AzurePath", - "file": "upath.implementations.local.LocalPath", - "local": "upath.implementations.local.LocalPath", + "file": "upath.implementations.local.FilePath", + "local": "upath.implementations.local.FilePath", "gcs": "upath.implementations.cloud.GCSPath", "gs": "upath.implementations.cloud.GCSPath", "hdfs": "upath.implementations.hdfs.HDFSPath", @@ -78,6 +80,9 @@ class _Registry(MutableMapping[str, "type[upath.UPath]"]): "webdav+https": "upath.implementations.webdav.WebdavPath", } + if TYPE_CHECKING: + _m: MutableMapping[str, str | type[upath.UPath]] + def __init__(self) -> None: if sys.version_info >= (3, 10): eps = entry_points(group=_ENTRY_POINT_GROUP) @@ -90,7 +95,7 @@ def __contains__(self, item: object) -> bool: return item in set().union(self._m, self._entries) def __getitem__(self, item: str) -> type[upath.UPath]: - fqn = self._m.get(item) + fqn: str | type[upath.UPath] | None = self._m.get(item) if fqn is None: if item in self._entries: fqn = self._m[item] = self._entries[item].load() diff --git a/upath/tests/cases.py b/upath/tests/cases.py index bcd43824..bed42126 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -50,9 +50,11 @@ def test_expanduser(self): "*", pytest.param( "**/*.txt", - marks=pytest.mark.xfail(reason="requires fsspec>=2023.9.0") - if Version(fsspec_version) < Version("2023.9.0") - else (), + marks=( + pytest.mark.xfail(reason="requires fsspec>=2023.9.0") + if Version(fsspec_version) < Version("2023.9.0") + else () + ), ), ), ) @@ -191,7 +193,7 @@ def test_mkdir_parents_true_exists_ok_false(self): with pytest.raises(FileExistsError): new_dir.mkdir(parents=True, exist_ok=False) - @pytest.mark.xfail(sys.version_info >= (3, 12), reason="only valid on python<=3.11") + @pytest.mark.skip(reason="_accessor is unsupported in universal_pathlib>0.1.4") def test_makedirs_exist_ok_true(self): new_dir = self.path.joinpath("parent", "child", "dir_may_not_exist") new_dir._accessor.makedirs(new_dir, exist_ok=True) @@ -199,7 +201,7 @@ def test_makedirs_exist_ok_true(self): new_dir.joinpath(".file").touch() new_dir._accessor.makedirs(new_dir, exist_ok=True) - @pytest.mark.xfail(sys.version_info >= (3, 12), reason="only valid on python<=3.11") + @pytest.mark.skip(reason="_accessor is unsupported in universal_pathlib>0.1.4") def test_makedirs_exist_ok_false(self): new_dir = self.path.joinpath("parent", "child", "dir_may_exist") new_dir._accessor.makedirs(new_dir, exist_ok=False) @@ -462,3 +464,12 @@ def test_read_with_fsspec(self): fs = filesystem(protocol, **storage_options) with fs.open(path) as f: assert f.read() == b"hello world" + + def test_access_to_private_api(self): + # DO NOT access these private attributes in your code + p = UPath(str(self.path), **self.path.storage_options) + assert isinstance(p._drv, str) + p = UPath(str(self.path), **self.path.storage_options) + assert isinstance(p._root, str) + p = UPath(str(self.path), **self.path.storage_options) + assert isinstance(p._parts, (list, tuple)) diff --git a/upath/tests/conftest.py b/upath/tests/conftest.py index 04d2b27a..a2f85b0f 100644 --- a/upath/tests/conftest.py +++ b/upath/tests/conftest.py @@ -327,9 +327,12 @@ def webdav_fixture(local_testdir, webdav_server): fs_provider.lock_manager.storage.clear() +AZURITE_PORT = int(os.environ.get("UPATH_AZURITE_PORT", "10000")) + + @pytest.fixture(scope="session") def azurite_credentials(): - url = "http://localhost:10000" + url = f"http://localhost:{AZURITE_PORT}" account_name = "devstoreaccount1" key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # noqa: E501 endpoint = f"{url}/{account_name}" @@ -348,10 +351,10 @@ def docker_azurite(azurite_credentials): image = "mcr.microsoft.com/azure-storage/azurite" container_name = "azure_test" cmd = ( - f"docker run --rm -d -p 10000:10000 --name {container_name} {image}" # noqa: E501 + f"docker run --rm -d -p {AZURITE_PORT}:10000 --name {container_name} {image}" # noqa: E501 " azurite-blob --loose --blobHost 0.0.0.0" # noqa: E501 ) - url = "http://localhost:10000" + url = f"http://localhost:{AZURITE_PORT}" stop_docker(container_name) subprocess.run(shlex.split(cmd), check=True) diff --git a/upath/tests/implementations/test_hdfs.py b/upath/tests/implementations/test_hdfs.py index c2b75cf0..8867cea4 100644 --- a/upath/tests/implementations/test_hdfs.py +++ b/upath/tests/implementations/test_hdfs.py @@ -1,5 +1,6 @@ """see upath/tests/conftest.py for fixtures """ + import pytest # noqa: F401 from upath import UPath diff --git a/upath/tests/implementations/test_http.py b/upath/tests/implementations/test_http.py index 00a2a02d..c9b2797f 100644 --- a/upath/tests/implementations/test_http.py +++ b/upath/tests/implementations/test_http.py @@ -48,15 +48,19 @@ def test_mkdir(self): "*.txt", pytest.param( "*", - marks=pytest.mark.xfail(reason="requires fsspec<=2023.10.0") - if Version(fsspec_version) > Version("2023.10.0") - else (), + marks=( + pytest.mark.xfail(reason="requires fsspec<=2023.10.0") + if Version(fsspec_version) > Version("2023.10.0") + else () + ), ), pytest.param( "**/*.txt", - marks=pytest.mark.xfail(reason="requires fsspec>=2023.9.0") - if Version(fsspec_version) < Version("2023.9.0") - else (), + marks=( + pytest.mark.xfail(reason="requires fsspec>=2023.9.0") + if Version(fsspec_version) < Version("2023.9.0") + else () + ), ), ), ) diff --git a/upath/tests/implementations/test_memory.py b/upath/tests/implementations/test_memory.py index 8e84dc9d..7a0b9aea 100644 --- a/upath/tests/implementations/test_memory.py +++ b/upath/tests/implementations/test_memory.py @@ -1,5 +1,3 @@ -import sys - import pytest from upath import UPath @@ -28,14 +26,7 @@ def test_is_MemoryPath(self): ("memory:/a", "memory://a"), ("memory:/a/b", "memory://a/b"), ("memory://", "memory://"), - pytest.param( - "memory://a", - "memory://a", - marks=pytest.mark.xfail( - sys.version_info < (3, 12), - reason="currently broken due to urllib parsing", - ), - ), + ("memory://a", "memory://a"), ("memory://a/b", "memory://a/b"), ("memory:///", "memory://"), ("memory:///a", "memory://a"), diff --git a/upath/tests/implementations/test_s3.py b/upath/tests/implementations/test_s3.py index ae5d10b2..9b57f013 100644 --- a/upath/tests/implementations/test_s3.py +++ b/upath/tests/implementations/test_s3.py @@ -1,5 +1,6 @@ """see upath/tests/conftest.py for fixtures """ + import fsspec import pytest # noqa: F401 diff --git a/upath/tests/implementations/test_webdav.py b/upath/tests/implementations/test_webdav.py index 85572abe..23693e2e 100644 --- a/upath/tests/implementations/test_webdav.py +++ b/upath/tests/implementations/test_webdav.py @@ -3,7 +3,6 @@ from upath import UPath from ..cases import BaseTests -from ..utils import xfail_if_version class TestUPathWebdav(BaseTests): @@ -22,6 +21,9 @@ def test_storage_options(self): assert storage_options == self.path.fs.storage_options assert base_url == self.path.fs.client.base_url - @xfail_if_version("fsspec", lt="2022.5.0", reason="requires fsspec>=2022.5.0") def test_read_with_fsspec(self): + # this test used to fail with fsspec<2022.5.0 because webdav was not + # registered in fsspec. But when UPath(webdav_fixture) is called, to + # run the BaseTests, the upath.implementations.webdav module is + # imported, which registers the webdav implementation in fsspec. super().test_read_with_fsspec() diff --git a/upath/tests/pathlib/test_pathlib_312.py b/upath/tests/pathlib/test_pathlib_312.py index 71cab6b8..1a706e9f 100644 --- a/upath/tests/pathlib/test_pathlib_312.py +++ b/upath/tests/pathlib/test_pathlib_312.py @@ -9,6 +9,7 @@ import stat import tempfile import unittest +from contextlib import nullcontext from unittest import mock from ._test_support import import_helper @@ -16,6 +17,7 @@ from ._test_support import is_emscripten, is_wasi from . import _test_support as os_helper from ._test_support import TESTFN, FakePath +from ..utils import temporary_register try: import grp, pwd @@ -23,7 +25,7 @@ grp = pwd = None import upath -from upath.core312plus import UPath +from upath.core import UPath from upath.implementations.local import PosixUPath, WindowsUPath import pytest @@ -76,7 +78,8 @@ def test_constructor_common(self): self.assertEqual(P(P('a'), 'b'), P('a/b')) self.assertEqual(P(P('a'), P('b')), P('a/b')) self.assertEqual(P(P('a'), P('b'), P('c')), P(FakePath("a/b/c"))) - self.assertEqual(P(P('./a:b')), P('./a:b')) + if os.name != "nt": + self.assertEqual(P(P('./a:b')), P('./a:b')) def test_bytes(self): P = self.cls @@ -125,18 +128,25 @@ def test_str_subclass_common(self): def test_with_segments_common(self): class P(_BasePurePathSubclass, self.cls): pass - p = P('foo', 'bar', session_id=42) - self.assertEqual(42, (p / 'foo').session_id) - self.assertEqual(42, ('foo' / p).session_id) - self.assertEqual(42, p.joinpath('foo').session_id) - self.assertEqual(42, p.with_name('foo').session_id) - self.assertEqual(42, p.with_stem('foo').session_id) - self.assertEqual(42, p.with_suffix('.foo').session_id) - self.assertEqual(42, p.with_segments('foo').session_id) - self.assertEqual(42, p.relative_to('foo').session_id) - self.assertEqual(42, p.parent.session_id) - for parent in p.parents: - self.assertEqual(42, parent.session_id) + + if self.cls is UPath: + cm = temporary_register("", P) + else: + cm = nullcontext() + + with cm: + p = P('foo', 'bar', session_id=42) + self.assertEqual(42, (p / 'foo').session_id) + self.assertEqual(42, ('foo' / p).session_id) + self.assertEqual(42, p.joinpath('foo').session_id) + self.assertEqual(42, p.with_name('foo').session_id) + self.assertEqual(42, p.with_stem('foo').session_id) + self.assertEqual(42, p.with_suffix('.foo').session_id) + self.assertEqual(42, p.with_segments('foo').session_id) + self.assertEqual(42, p.relative_to('foo').session_id) + self.assertEqual(42, p.parent.session_id) + for parent in p.parents: + self.assertEqual(42, parent.session_id) def _get_drive_root_parts(self, parts): path = self.cls(*parts) @@ -1682,23 +1692,25 @@ def test_home(self): def test_with_segments(self): class P(_BasePurePathSubclass, self.cls): pass - p = P(BASE, session_id=42) - self.assertEqual(42, p.absolute().session_id) - self.assertEqual(42, p.resolve().session_id) - if not is_wasi: # WASI has no user accounts. - self.assertEqual(42, p.with_segments('~').expanduser().session_id) - self.assertEqual(42, (p / 'fileA').rename(p / 'fileB').session_id) - self.assertEqual(42, (p / 'fileB').replace(p / 'fileA').session_id) - if os_helper.can_symlink(): - self.assertEqual(42, (p / 'linkA').readlink().session_id) - for path in p.iterdir(): - self.assertEqual(42, path.session_id) - for path in p.glob('*'): - self.assertEqual(42, path.session_id) - for path in p.rglob('*'): - self.assertEqual(42, path.session_id) - for dirpath, dirnames, filenames in p.walk(): - self.assertEqual(42, dirpath.session_id) + + with temporary_register("", P): + p = P(BASE, session_id=42) + self.assertEqual(42, p.absolute().session_id) + self.assertEqual(42, p.resolve().session_id) + if not is_wasi: # WASI has no user accounts. + self.assertEqual(42, p.with_segments('~').expanduser().session_id) + self.assertEqual(42, (p / 'fileA').rename(p / 'fileB').session_id) + self.assertEqual(42, (p / 'fileB').replace(p / 'fileA').session_id) + if os_helper.can_symlink(): + self.assertEqual(42, (p / 'linkA').readlink().session_id) + for path in p.iterdir(): + self.assertEqual(42, path.session_id) + for path in p.glob('*'): + self.assertEqual(42, path.session_id) + for path in p.rglob('*'): + self.assertEqual(42, path.session_id) + for dirpath, dirnames, filenames in p.walk(): + self.assertEqual(42, dirpath.session_id) def test_samefile(self): fileA_path = os.path.join(BASE, 'fileA') @@ -2957,8 +2969,9 @@ def test_glob_empty_pattern(self): with self.assertRaisesRegex(ValueError, 'Unacceptable pattern'): list(p.glob('')) - @pytest.mark.xfail(reason="subclassing UPath directly for Posix and Windows paths requires protocol registration") def test_with_segments(self): + if self.cls is UPath: + pytest.skip(reason="") super().test_with_segments() @only_posix @@ -3265,14 +3278,12 @@ def check(): class PathSubclassTest(_BasePathTest, unittest.TestCase): - class cls(UPath): - cwd = UPath.cwd - home = UPath.home + class cls(WindowsUPath if os.name == 'nt' else PosixUPath): + pass # repr() roundtripping is not supported in custom subclass. test_repr_roundtrips = None - @pytest.mark.xfail(reason="subsubclassing UPath directly for Posix and Windows paths requires protocol registration") def test_with_segments(self): super().test_with_segments() diff --git a/upath/tests/test_core.py b/upath/tests/test_core.py index 10b625e6..9baf6e6d 100644 --- a/upath/tests/test_core.py +++ b/upath/tests/test_core.py @@ -15,6 +15,7 @@ from .cases import BaseTests from .utils import only_on_windows from .utils import skip_on_windows +from .utils import xfail_if_version @skip_on_windows @@ -68,6 +69,12 @@ def test_home(self): assert isinstance(pth, pathlib.Path) assert isinstance(pth, UPath) + @xfail_if_version("fsspec", reason="", ge="2024.2.0") + def test_iterdir_no_dir(self): + # the mock filesystem is basically just LocalFileSystem, + # so this test would need to have an iterdir fix. + super().test_iterdir_no_dir() + def test_multiple_backend_paths(local_testdir): path = "s3://bucket/" @@ -117,7 +124,7 @@ def test_instance_check_local_uri(local_testdir): assert isinstance(upath, UPath) -@pytest.mark.xfail(sys.version_info >= (3, 12), reason="requires python<3.12") +@pytest.mark.xfail(reason="unsupported on universal_pathlib>0.1.4") def test_new_method(local_testdir): path = UPath.__new__(pathlib.Path, local_testdir) assert str(path) == str(pathlib.Path(local_testdir)) diff --git a/upath/tests/third_party/test_migration_py312.py b/upath/tests/third_party/test_migration_py312.py new file mode 100644 index 00000000..de2477e2 --- /dev/null +++ b/upath/tests/third_party/test_migration_py312.py @@ -0,0 +1,101 @@ +import os +from os import getenv + +import pytest + +from upath import UPath +from upath.registry import get_upath_class +from upath.registry import register_implementation + + +@pytest.fixture(scope="function") +def clean_registry(): + from upath.registry import _registry + + try: + yield + finally: + _registry._m.maps.clear() + get_upath_class.cache_clear() + + +@pytest.fixture(scope="function") +def github_subclass_old_style(clean_registry): + # GitHubPath code from: + # https://github.com/juftin/textual-universal-directorytree/blob/110770f2ee40ab5afff7eade635caad644d80848/textual_universal_directorytree/alternate_paths.py#L15-L27 + + from upath.core import _FSSpecAccessor + + class _GitHubAccessor(_FSSpecAccessor): + def __init__(self, *args, **kwargs): + token = getenv("GITHUB_TOKEN") + if token is not None: + kwargs.update({"username": "Bearer", "token": token}) + super().__init__(*args, **kwargs) + + class GitHubPath(UPath): + _default_accessor = _GitHubAccessor + + def __new__(cls, *args, **kwargs): + file_path = cls.handle_github_url(*args[0:1], storage_options=kwargs) + return super().__new__(cls, file_path, *args[1:], **kwargs) + + @property + def path(self): + return super().path.strip("/") + + @property + def name(self): + if self.path == "": + org = self._accessor._fs.org + repo = self._accessor._fs.repo + sha = self._accessor._fs.storage_options["sha"] + github_name = f"{org}:{repo}@{sha}" + return github_name + else: + return super().name + + @classmethod + def handle_github_url(cls, url, storage_options): + import requests # type: ignore[import] + + url = str(url) + gitub_prefix = "github://" + if gitub_prefix in url and "@" not in url: + _, user_password = url.split("github://") + if "org" in storage_options and "repo" in storage_options: + org = storage_options["org"] + repo = storage_options["repo"] + _, *args = user_password.rpartition(":")[2].split("/") + else: + org, repo_str = user_password.split(":") + repo, *args = repo_str.split("/") + elif gitub_prefix in url and "@" in url: + return url + else: + raise ValueError(f"Invalid GitHub URL: {url}") + token = getenv("GITHUB_TOKEN") + auth = {"auth": ("Bearer", token)} if token is not None else {} + resp = requests.get( + f"https://api.github.com/repos/{org}/{repo}", + headers={"Accept": "application/vnd.github.v3+json"}, + **auth, # type: ignore[arg-type] + ) + resp.raise_for_status() + default_branch = resp.json()["default_branch"] + arg_str = "/".join(args) + github_uri = ( + f"{gitub_prefix}{org}:{repo}@{default_branch}/{arg_str}".rstrip("/") + ) + return github_uri + + register_implementation("github", GitHubPath, clobber=True) + + +@pytest.mark.skipif("GITHUB_TOKEN" not in os.environ, reason="No GITHUB_TOKEN found") +def test_migration_for_github_subclass(github_subclass_old_style): + + readme = UPath("github://fsspec:universal_pathlib@main/README.md").read_text() + assert "universal_pathlib" in readme + rst_files = list(UPath("github://fsspec:universal_pathlib@main/").glob("*.rst")) + assert len(rst_files) == 2 diff --git a/upath/tests/utils.py b/upath/tests/utils.py index d25df0c1..463ed0a8 100644 --- a/upath/tests/utils.py +++ b/upath/tests/utils.py @@ -1,5 +1,6 @@ import operator import sys +from contextlib import contextmanager import pytest from fsspec.utils import get_package_version_without_import @@ -44,3 +45,18 @@ def xfail_if_no_ssl_connection(func): return pytest.mark.xfail(reason="No SSL connection")(func) else: return func + + +@contextmanager +def temporary_register(protocol, cls): + """helper to temporarily register a protocol for testing purposes""" + from upath.registry import _registry + from upath.registry import get_upath_class + + m = _registry._m.maps[0] + try: + m[protocol] = cls + yield + finally: + m.clear() + get_upath_class.cache_clear()