Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Port _validate_inlet_outlet_assets_activeness into Task SDK #46020

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jan 24, 2025

closes: #45969

Why?

Tasks can be defined with inlet and outlet events in it definition. For cases when inactive assets are defined either in inlets, outlets or both, _validate_inlet_outlet_assets_activeness is a function defined that checks for this scenario and fails those tasks. This was added as part of #44600.

This behaviour needs to be ported into the task sdk to maintain synchronisation with the legacy DAGs written prior to Airflow 3.

Approach

This was done like this earlier:

def _validate_inlet_outlet_assets_activeness(self, session: Session) -> None:
if not self.task or not (self.task.outlets or self.task.inlets):
return
all_asset_unique_keys = {
AssetUniqueKey.from_asset(inlet_or_outlet)
for inlet_or_outlet in itertools.chain(self.task.inlets, self.task.outlets)
if isinstance(inlet_or_outlet, Asset)
}
inactive_asset_unique_keys = self._get_inactive_asset_unique_keys(all_asset_unique_keys, session)
if inactive_asset_unique_keys:
raise AirflowInactiveAssetInInletOrOutletException(inactive_asset_unique_keys)
.

  • The function gets all the unique keys of all the possible assets defined in a conjunction of inlets + outlets of the task
  • Once this is retrieved, the AssetUniqueKey is constructed for above
  • We define another function that gets the inactive keys by querying the database:
    def _get_inactive_asset_unique_keys(
  • If any inactive assets, raise an exception.

Challenge with doing similar things for task sdk:

  1. inlets and outlets are runtime parameters for tasks, so we can only get them at runtime.
  2. These need to be sent to the execution API server, which can make a decision whether these validation succeed or not.

So, these lead to two possible approaches:

Approach 1: Performing these checks while starting a task ie. in the /run endpoint.

This seemed like the most natural approach but there were a few challenges around here.

This is our start() in supervisor:

    @classmethod
    def start(  # type: ignore[override]
        cls,
        *,
        what: TaskInstance,
        dag_rel_path: str | os.PathLike[str],
        bundle_info,
        client: Client,
        target: Callable[[], None] = _subprocess_main,
        logger: FilteringBoundLogger | None = None,
        **kwargs,
    ) -> Self:
        """Fork and start a new subprocess to execute the given task."""
        proc: Self = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs)
        # Tell the task process what it needs to do!
        proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info)
        return proc

We do super().start which starts the child process's entrypoint / target but it doesnt run it until a startup is recveived. And in _on_child_started , we do this:

    def _on_child_started(self, ti: TaskInstance, dag_rel_path: str | os.PathLike[str], bundle_info):
        """Send startup message to the subprocess."""
        try:
            # We've forked, but the task won't start doing anything until we send it the StartupDetails
            # message. But before we do that, we need to tell the server it's started (so it has the chance to
            # tell us "no, stop!" for any reason)
            ti_context = self.client.task_instances.start(ti.id, self.pid, datetime.now(tz=timezone.utc))
.
.
.
.
.


        msg = StartupDetails.model_construct(
            ti=ti,
            dag_rel_path=os.fspath(dag_rel_path),
            bundle_info=bundle_info,
            requests_fd=self._requests_fd,
            ti_context=ti_context,
        )

        # Send the message to tell the process what it needs to execute
        log.debug("Sending", msg=msg)

        try:
            self.stdin.write(msg.model_dump_json().encode())
            self.stdin.write(b"\n")

We tell the server to self.client.task_instances.start and then send the startup message to the task runner when it actually starts.

The option would be to send the task outlets and inlets (defined as ti.task.inlets or ti.task.outlets) to the execution API. Challenge that makes it hard is that the ti_context comes from the run endpoint and we need that for startup details.

Maybe this can be a later optimisation with fewer network calls but require good amount of refactoring, so parking for now.

Approach 2: Introduce a new endpoint at the execution API that can perform runtime checks and is called after startup and during run from task runner.

  • Introduced a new endpoint: /{task_instance_id}/runtime-checks. Generic endpoint that can perform runtime checks for a TI. Right now limited to _validate_inlet_outlet_assets_activeness but easily extendable. Returns 200 if all ok, 204 if not applicable and 400 for failed checks.
  • Called from task runner after the task has been fetched: ti.task = ti.task.prepare_for_execution() through the supervisor and the client.

Implementation Details:

Execution API Server

  • New API: runtime_checks introduced on task instance, right now limited to checking behaviour assosiated with _validate_inlet_outlet_assets_activeness but can be extended.
  • Returns 200 if all ok with ({"message": "Runtime checks passed successfully."}), 204 if not applicable and 400 for failed checks.

Client side

Comms

  • RuntimeCheckOnTask payload introduced, this one is an extension of payload:
class TIRuntimeCheckPayload(BaseModel):
    """Payload for performing Runtime checks on the TaskInstance model as requested by the SDK."""

    inlet: list[AssetProfile] | None = None
    outlet: list[AssetProfile] | None = None
  • The response from the server is translated to a simple OKresponse: True indicates that runtime checks passed from server, False is otherwise.

HTTP Client

  • Simple POST call to the server API: self.client.post(f"task-instances/{id}/runtime-checks", content=body.model_dump_json()) and translates the response to OKResponse
  • For a non 400 status code in failure, raises exception.

Task Runner

  • Right after preparation for execution of a task, we check if inlets or outlets are defined on a task, if yes, construct the structure to call the API and send it to the supervisor.
  • If supervisor returns a message indicating the runtime checks didnt pass on server, fail the task.

Supervisor

        elif isinstance(msg, RuntimeCheckOnTask):
            runtime_check_resp = self.client.task_instances.runtime_checks(id=self.id, msg=msg)
            resp = runtime_check_resp.model_dump_json().encode()
            if not runtime_check_resp.ok:
                log.debug("Runtime checks failed on task %s, marking task as failed..", self.id)
                self.client.task_instances.finish(
                    id=self.id, state=TerminalTIState.FAILED, when=datetime.now(tz=timezone.utc)
                )
  • Calls the runtime checks API, if the response is not OK, immediately calls the finish endpoint with failed state.
  • This optimises the flow as much as possible by having no need to communicate with the task runner.

Models/taskinstance.py

  • Modified the _validate_inlet_outlet_assets_activeness to a structure that can accomodate the task sdk.
validate_inlet_outlet_assets_activeness(
        inlets: list[AssetProfile], outlets: list[AssetProfile], session: Session
    )

The signature now accepts the inlets and outlets directly.

Testing results

DAG defined:

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset

with DAG(
    dag_id="asset_dag",
    start_date=None,
    schedule=None,
    catchup=False,
):

    outlet = EmptyOperator(
        task_id="outlet",
        outlets=[Asset(name="outlet", uri="something-uri"), Asset(name="outlet-new-name", uri="something-uri")],
    )

    inlet = EmptyOperator(
        task_id="inlet",
        outlets=[Asset(name="outlet", uri="something-uri"), Asset(name="outlet-new-name", uri="something-uri")],
    )

    outlet >> inlet

The issue with the DAG is that the same assets are present in the outlets and inlets and changed name with same URI. Hence once of them is inactive in outlets

Legacy

image

Logs:

be976385dc7e
 ▶ Log message source details
[2025-01-27, 09:19:41 UTC] {local_task_job_runner.py:120} ▼ Pre task execution logs
[2025-01-27, 09:19:41 UTC] {taskinstance.py:2384} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: asset_dag.outlet manual__2025-01-27T09:19:35.134947+00:00 [queued]>
[2025-01-27, 09:19:41 UTC] {taskinstance.py:2384} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: asset_dag.outlet manual__2025-01-27T09:19:35.134947+00:00 [queued]>
[2025-01-27, 09:19:41 UTC] {taskinstance.py:2625} INFO - Starting attempt 1 of 1
[2025-01-27, 09:19:41 UTC] {taskinstance.py:2648} INFO - Executing <Task(EmptyOperator): outlet> on 2025-01-27 09:19:35.134947+00:00
[2025-01-27, 09:19:41 UTC] {standard_task_runner.py:131} INFO - Started process 557 to run task
[2025-01-27, 09:19:41 UTC] {standard_task_runner.py:147} INFO - Running: ['***', 'tasks', 'run', 'asset_dag', 'outlet', 'manual__2025-01-27T09:19:35.134947+00:00', '--raw', '--subdir', 'DAGS_FOLDER/producer_and_consumer.py', '--cfg-path', '/tmp/tmpocu3qfya']
[2025-01-27, 09:19:41 UTC] {standard_task_runner.py:148} INFO - Subtask outlet
[2025-01-27, 09:19:41 UTC] {task_command.py:474} INFO - Running <TaskInstance: asset_dag.outlet manual__2025-01-27T09:19:35.134947+00:00 [running]> on host be976385dc7e
[2025-01-27, 09:19:41 UTC] {taskinstance.py:3119} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 274, in _run_raw_task
    TaskInstance.validate_inlet_outlet_assets_activeness(inlets, outlets, session=session)
  File "/opt/airflow/airflow/models/taskinstance.py", line 3744, in validate_inlet_outlet_assets_activeness
    raise AirflowInactiveAssetInInletOrOutletException(inactive_asset_unique_keys)
airflow.exceptions.AirflowInactiveAssetInInletOrOutletException: Task has the following inactive assets in its inlets or outlets: Asset(name="outlet-new-name", uri="something-uri")
[2025-01-27, 09:19:41 UTC] {taskinstance.py:1130} INFO - Immediate failure requested. Marking task as FAILED. dag_id=asset_dag, task_id=outlet, run_id=manual__2025-01-27T09:19:35.134947+00:00, logical_date=20250127T091935, start_date=20250127T091941, end_date=20250127T091941
[2025-01-27, 09:19:41 UTC] {taskinstance.py:343} ▶ Post task execution logs

Task SDK

image

Error logs:

5c6cc67013a3
 ▶ Log message source details
{"logger":"airflow.dag_processing.bundles.manager.DagBundlesManager","timestamp":"2025-01-27T09:29:41.642326","event":"DAG bundles loaded: dags-folder","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T09:29:41.642487","event":"Filling up the DagBag from /files/dags/producer_and_consumer.py","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T09:29:41.642729","event":"Importing /files/dags/producer_and_consumer.py","level":"debug"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T09:29:41.645642","event":"Loaded DAG <DAG: asset_dag>","level":"debug"}
{"file":"producer_and_consumer.py","timestamp":"2025-01-27T09:29:41.645773","logger":"task","event":"DAG file parsed","level":"debug"}
{"json":"{\"inlet\":[],\"outlet\":[{\"name\":\"outlet\",\"uri\":\"something-uri\",\"asset_type\":\"Asset\"},{\"name\":\"outlet-new-name\",\"uri\":\"something-uri\",\"asset_type\":\"Asset\"}],\"type\":\"RuntimeCheckOnTask\"}\n","timestamp":"2025-01-27T09:29:41.646024","logger":"task","event":"Sending request","level":"debug"}
{"timestamp":"2025-01-27T09:29:41.666578","logger":"task","event":"Runtime checks failed for task, marking task as failed..","level":"info"}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh
Copy link
Contributor Author

Need to work on few things:

  1. Fix the task runner tests
  2. Add tests for supervisor
  3. Revisit the # type: ignore checks -> added some cos it felt easier.
  4. Add PR description with plenty of details.

@amoghrajesh amoghrajesh self-assigned this Jan 28, 2025
@amoghrajesh
Copy link
Contributor Author

Yeah, with the latest changes, the DAG passes:
image

And the error handling is better:
image

CC: @ashb

@amoghrajesh
Copy link
Contributor Author

Well OK, turns out the test has to be updated now!

@amoghrajesh amoghrajesh requested a review from ashb January 28, 2025 09:17
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few small comments, LGTM overall though

@amoghrajesh
Copy link
Contributor Author

@ashb handled your comments, let me know if the approval still stands :)

@ashb
Copy link
Member

ashb commented Jan 28, 2025

Yup, still stands. When the changes requested are small enough I often "pre-approve" it to save an extra cycle, and then it's up to you as the PR author to decide it can be merged once the changes are made, or if you need to get a re-review.

@amoghrajesh
Copy link
Contributor Author

The CI had some issues lately, just working on a green one so that this can be merged

@amoghrajesh
Copy link
Contributor Author

After a hard battle with the CI, we have won! Merging this one

@amoghrajesh amoghrajesh merged commit dc4ce65 into apache:main Jan 30, 2025
63 checks passed
@amoghrajesh amoghrajesh deleted the AIP72_validate_inlet_outlet_assets_activeness_sdk branch January 30, 2025 06:34
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Port _validate_inlet_outlet_assets_activeness into Task SDK
2 participants