-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ExternalDeploymentSensor #1472
Conversation
df7d0c8
to
cad8ba6
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1472 +/- ##
==========================================
+ Coverage 98.28% 98.33% +0.05%
==========================================
Files 91 94 +3
Lines 4485 4637 +152
==========================================
+ Hits 4408 4560 +152
Misses 77 77 ☔ View full report in Codecov by Sentry. |
5e2e341
to
b460f77
Compare
PR to fix CI: #1473 |
ea363e7
to
5ae9682
Compare
""" | ||
base_url, _ = self.get_conn() | ||
path = f"/api/v1/dags/{external_dag_id}/dagRuns" | ||
params: dict[str, int | str | list[str]] = {"limit": 1, "state": ["running", "queued"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure why limit is set to 1 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to fetch last dagrun based on execution date
state = dag_runs[0].get("state") | ||
if state == "success": | ||
return True | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly this fetches the latest DAG run and checks the status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it check the status of last dagrun based on execution date
7a83e83
to
d578ccb
Compare
9ffdcf9
to
0c41551
Compare
0c41551
to
ac900b1
Compare
Sensor expects external_dag_id as a required param and external_task_id as an optional param if external_task_id is provided then it monitors a task instance of task external_task_id otherwise monitors the overall status of external_dag_id.
Once the sensor execution starts it fetches the dag run for external_dag_id with status running or queued if found then waits for it to succeed or fail in the trigger component. if not found then return immediately and mark it a success.
TODO;Test caseDocs/exampleprobably monitor task group too: This can't be done at moment because task group is just UI feature and not exposed externallyAssumption
When this sensor starts running it assumes that the dag it is monitoring is either in a running or queued state