Skip to content

Commit

Permalink
Apply review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Feb 14, 2024
1 parent 5ae9682 commit 7a83e83
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
18 changes: 10 additions & 8 deletions astronomer/providers/core/example_dags/example_astro.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
schedule=None,
catchup=False,
tags=["example", "async", "core"],
) as dag:
t1 = ExternalDeploymentSensor(
):
ExternalDeploymentSensor(
task_id="test1",
external_dag_id="example_wait_to_test_example_astro_task",
)

t2 = ExternalDeploymentSensor(
ExternalDeploymentSensor(
task_id="test2",
external_dag_id="example_wait_to_test_example_astro_task",
external_task_id="wait_for_2_min",
Expand All @@ -31,31 +31,33 @@
schedule=None,
catchup=False,
tags=["example", "async", "core"],
) as dag1:
):

@task
def wait_for_2_min() -> None:
"""Wait for 2 min."""
time.sleep(120)

wait_for_2_min()


with DAG(
dag_id="trigger_astro_test_and_example",
start_date=datetime(2022, 1, 1),
schedule=None,
catchup=False,
tags=["example", "async", "core"],
) as dag2:
a = TriggerDagRunOperator(
):
run_wait_dag = TriggerDagRunOperator(
task_id="run_wait_dag",
trigger_dag_id="example_external_task_async_waits_for_me",
wait_for_completion=False,
)

b = TriggerDagRunOperator(
run_astro_dag = TriggerDagRunOperator(
task_id="run_astro_dag",
trigger_dag_id="example_astro_task",
wait_for_completion=False,
)

a >> b
run_wait_dag >> run_astro_dag
7 changes: 4 additions & 3 deletions astronomer/providers/core/hooks/astro.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
"password": "Astro Cloud API Token",
},
"placeholders": {
"password": "ey...xz.ey...fq.tw...ap",
"host": "https://clmkpsyfc010391acjie00t1l.astronomer.run/d5lc9c9x",
"password": "JWT API Token",
},
}

def get_conn(self) -> Any:
def get_conn(self) -> tuple[str, str]:
"""Retrieves the Astro Cloud connection details."""
conn = BaseHook.get_connection(self.astro_cloud_conn_id)
base_url = conn.host or os.environ.get("AIRFLOW__WEBSERVER_BASE_URL")
base_url = conn.host or os.environ.get("AIRFLOW__WEBSERVER__BASE_URL")
if base_url is None:
raise AirflowException(f"Airflow host is missing in connection {self.astro_cloud_conn_id}")
token = conn.password
Expand Down
6 changes: 3 additions & 3 deletions astronomer/providers/core/sensors/astro.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ def __init__(

def poke(self, context: Context) -> bool | PokeReturnValue:
"""
Method to check if the external deployment is successful.
Check the status of a DAG/task in another deployment.
Queries Astro Cloud for the status of the specified DAG or task instance.
Queries Airflow's REST API for the status of the specified DAG or task instance.
Returns True if successful, False otherwise.
:param context: The task execution context.
"""
hook = AstroHook(self.astro_cloud_conn_id)
dag_runs: list[dict[str, Any]] = hook.get_dag_runs(self.external_dag_id)
if dag_runs is None or len(dag_runs) == 0:
self.log.info(f"No dag-runs found for dag-id {self.external_dag_id}")
self.log.info("No DAG runs found for DAG %s", self.external_dag_id)
return True
self._dag_run_id = cast(str, dag_runs[0]["dag_run_id"])
if self.external_task_id is not None:
Expand Down

0 comments on commit 7a83e83

Please sign in to comment.