Skip to content

Commit

Permalink
Move Asset user facing components to task_sdk (apache#43773)
Browse files Browse the repository at this point in the history
* feat(task_sdk): Move asset from Airflow core to task_sdk

* feat(task_sdk): Move assets.metadata to task_sdk.definitions.asset

* fix(providers/amazon): fix common.compat provider ImportError handling

* fix(providers/google): fix common.compat provider ImportError handling

* fix(providers/openlineage): fix common.compat provider ImportError handling

* fix(provider/common/compat): fix common.compat provider ImportError handling

* feat(task_sdk): expose Model

* docs(nesfragements): update how asset module should be imported

* fix(task_sdk): fix 2_10 compatibility

* feat(common.compat): use version to decide how to import assets instead of exception

* feat(providers/common.compat): use airflow version instead of exception to return compat method

* refactor(providers/common/compat): extract airflow version to __init__

* fix(providers): use version compare to decide whether to import asset

* feat(decorators/asset): move @asset to task_sdk

* refactor(asset): rename _AssetAliasCondition as AssetAliasCondition

* feat(task_sdk): make airflow.sdk.definitions.decoratos a package

* Revert "feat(task_sdk): make airflow.sdk.definitions.decoratos a package"

This reverts commit 324efc0.

* feat(task_sdk): move asset related logic in airflow.sdk.definitions.decorators to airflow.sdk.definitions.asset.*

* refactor(task_sdk): move @asset to airflow.sdk.definitions.asset.decorators

* test(providers/amazon): remove unnecessary compat handling

* test(providers/google): remove unnecessary compat handling

* test(openlineage): remove unnecessary compat handling

* fix(provider/openlineage): fix how asset compat is handled

* feat(task_sdk/asset): expose extract_event_key

* test(providers/google): change Asset import back to common.compat

* docs(newsfragments): fix error naming

* docs(newsfragments): fix typo

* docs(newsfragment): add missing metadata

* fixup! feat(task_sdk): Move asset from Airflow core to task_sdk

* fixup! feat(task_sdk): Move asset from Airflow core to task_sdk
  • Loading branch information
Lee-W authored Nov 20, 2024
1 parent f05ce08 commit a0f3353
Show file tree
Hide file tree
Showing 78 changed files with 902 additions and 733 deletions.
4 changes: 2 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@
"version": (".version", "", False),
# Deprecated lazy imports
"AirflowException": (".exceptions", "AirflowException", True),
"Dataset": (".assets", "Dataset", True),
"Dataset": (".sdk.definitions.asset", "Dataset", True),
}
if TYPE_CHECKING:
# These objects are imported by PEP-562, however, static analyzers and IDE's
# have no idea about typing of these objects.
# Add it under TYPE_CHECKING block should help with it.
from airflow.assets import Asset, Dataset
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.sdk.definitions.asset import Asset, Dataset


def __getattr__(name: str):
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
queued_event_collection_schema,
queued_event_schema,
)
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.sdk.definitions.asset import Asset
from airflow.utils import timezone
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.sdk.definitions.asset import Asset
from airflow.utils import timezone

assets_router = AirflowRouter(tags=["Asset"])
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
@functools.lru_cache
def initialize_method_map() -> dict[str, Callable]:
from airflow.api.common.trigger_dag import trigger_dag
from airflow.assets import expand_alias_to_assets
from airflow.assets.manager import AssetManager
from airflow.cli.commands.task_command import _get_ti_db_access
from airflow.dag_processing.manager import DagFileProcessorManager
Expand All @@ -76,6 +75,7 @@ def initialize_method_map() -> dict[str, Callable]:
_update_ti_heartbeat,
_xcom_pull,
)
from airflow.sdk.definitions.asset import expand_alias_to_assets
from airflow.secrets.metastore import MetastoreBackend
from airflow.utils.cli_action_loggers import _default_action_log_internal
from airflow.utils.log.file_task_handler import FileTaskHandler
Expand Down
Loading

0 comments on commit a0f3353

Please sign in to comment.