Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] partial Does Not Replace Task Outputs #303

Open
su-four opened this issue Dec 1, 2024 · 2 comments
Open

[Bug] partial Does Not Replace Task Outputs #303

su-four opened this issue Dec 1, 2024 · 2 comments
Labels
bug Something isn't working triage-needed

Comments

@su-four
Copy link

su-four commented Dec 1, 2024

DAG Factory version

0.20.0

airflow version

2.10

Python version

3.12

Deployment

Docker-Compose

What happened?

When using the partial feature in dag-factory to define task parameters that depend on the output of a previous task, the output of the upstream task is not replaced with its actual value.

Expected Behavior:
The third.output value should be replaced with the actual output of the third task, allowing the five task to receive the correct input.

Actual Behavior:
The third.output placeholder remains unresolved, causing the five task to fail or operate on an incorrect value.

How to reproduce

For example, consider the following YAML configuration:

five:  
  operator: "airflow.operators.python.PythonOperator"  
  python_callable_name: "five"  
  python_callable_file: "/funcs/test.py"  
  partial:  
    input: third.output  
  dependencies: [third]

Here, the input parameter in partial is expected to be replaced with the actual output of the third task. However, third.output is not resolved and remains as-is, causing the task five to behave incorrectly.

@su-four su-four added bug Something isn't working triage-needed labels Dec 1, 2024
@tatiana
Copy link
Collaborator

tatiana commented Dec 4, 2024

Hi, @su-four. Thank you for reporting this issue.

Unfortunately, with the details you shared, we could not reproduce the bug.

Currently, DAG factory has an example DAG and configuration file that works with task mapping:
https://github.com/astronomer/dag-factory/blob/main/dev/dags/example_task_group.py
https://github.com/astronomer/dag-factory/blob/main/dev/dags/example_task_group.yml

It uses the functions defined in:
https://github.com/astronomer/dag-factory/blob/main/dev/dags/expand_tasks.py

And it seems to work as expected. Please, could you modify this example to reproduce the bug you're facing, and share with us all the changes necessary to be able to reproduce it?

@su-four
Copy link
Author

su-four commented Dec 17, 2024

Clarification on Earlier Report:

Hi, @tatiana and team!

Thank you for your response. I realize that I may have miscommunicated my original issue and possibly gave the wrong impression. Let me clarify:

I am not using dynamic task mapping in this scenario. Instead, I am trying to pass the output of one task (e.g., task1) as an input to another task (e.g., task2) directly within the DAG Factory YAML configuration. However, it seems that placeholders like task1.output are not being resolved to the actual output of the upstream task.

Example Configuration:

Here’s a simplified configuration to illustrate my problem:

tasks:
  task1:
    operator: "airflow.operators.python.PythonOperator"
    python_callable_name: "task1"
    python_callable_file: "/funcs/task1.py"
  
  task2:
    operator: "airflow.operators.python.PythonOperator"
    python_callable_name: "task2"
    python_callable_file: "/funcs/task2.py"
    partial:
      input: task1.output
    dependencies: [task1]

Expected Behavior:

task1 generates an output (e.g., result1).
task2 should receive result1 as the value for the input parameter via the partial configuration.

Actual Behavior:

The placeholder task1.output in the partial configuration is not replaced with the actual value from task1's output.
As a result, task2 either fails or operates on an unresolved placeholder.

Additional Example:

I also tried this configuration, and as expected, it also doesn’t work:

tasks:
  num_genrator:
    operator: "airflow.operators.python.PythonOperator"
    python_callable_name: "task1"
    python_callable_file: "/funcs/task1.py"
  
  task2:
    operator: "airflow.operators.python.PythonOperator"
    python_callable_name: "task2"
    python_callable_file: "/funcs/task2.py"
    num_list: num_genrator.output
    dependencies: [num_genrator]

In this case, I attempted to pass the output of the num_genrator task directly to the num_list parameter of task2. However, the placeholder num_genrator.output also remained unresolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage-needed
Projects
None yet
Development

No branches or pull requests

2 participants