From f3b7cfc9925d4a0abec29698a7398c2750ca5a15 Mon Sep 17 00:00:00 2001 From: bolkedebruin Date: Tue, 23 Jan 2024 14:51:37 +0100 Subject: [PATCH] Make Datasets Pathlike (#36947) This makes datasets inherit from os.Pathlike so they can directly be used by the Object Storage API. --- airflow/datasets/__init__.py | 6 +++++- tests/datasets/test_dataset.py | 8 ++++++++ tests/io/test_path.py | 9 +++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 0dc635a00b0a7..a4a127e3f7af9 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import os from typing import Any, ClassVar from urllib.parse import urlsplit @@ -23,7 +24,7 @@ @attr.define() -class Dataset: +class Dataset(os.PathLike): """A Dataset is used for marking data dependencies between workflows.""" uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) @@ -42,3 +43,6 @@ def _check_uri(self, attr, uri: str): parsed = urlsplit(uri) if parsed.scheme and parsed.scheme.lower() == "airflow": raise ValueError(f"{attr.name!r} scheme `airflow` is reserved") + + def __fspath__(self): + return self.uri diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index f707be07920e3..9e9ca9951315d 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -17,6 +17,8 @@ from __future__ import annotations +import os + import pytest from airflow.datasets import Dataset @@ -46,3 +48,9 @@ def test_uri_with_scheme(): def test_uri_without_scheme(): dataset = Dataset(uri="example_dataset") EmptyOperator(task_id="task1", outlets=[dataset]) + + +def test_fspath(): + uri = "s3://example_dataset" + dataset = Dataset(uri=uri) + assert os.fspath(dataset) == uri diff --git a/tests/io/test_path.py b/tests/io/test_path.py index ab143b038e00b..deb8d412cc700 100644 --- a/tests/io/test_path.py +++ b/tests/io/test_path.py @@ -26,6 +26,7 @@ from fsspec.implementations.local import LocalFileSystem from fsspec.utils import stringify_path +from airflow.datasets import Dataset from airflow.io import _register_filesystems, get_fs from airflow.io.path import ObjectStoragePath from airflow.io.store import _STORE_CACHE, ObjectStore, attach @@ -309,3 +310,11 @@ def test_backwards_compat(self): finally: # Reset the cache to avoid side effects _register_filesystems.cache_clear() + + def test_dataset(self): + p = "s3" + f = "/tmp/foo" + i = Dataset(uri=f"{p}://{f}", extra={"foo": "bar"}) + o = ObjectStoragePath(i) + assert o.protocol == p + assert o.path == f