Skip to content

Commit

Permalink
Added SLA parameter to the relavant task building functions
Browse files Browse the repository at this point in the history
  • Loading branch information
edualvess committed May 9, 2024
1 parent 9911c3b commit 8982c76
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 1 deletion.
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def dbt_task(
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[task_name]
),
)


Expand Down
6 changes: 6 additions & 0 deletions dags/stellar_etl_airflow/build_elementary_slack_alert_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import logging
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import Variable
Expand Down Expand Up @@ -89,4 +90,9 @@ def elementary_task(
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
f"elementary_{task_name}"
]
),
)
4 changes: 3 additions & 1 deletion dags/stellar_etl_airflow/build_export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,7 @@ def build_export_task(
affinity=affinity,
on_failure_callback=alert_after_max_retries,
image_pull_policy=Variable.get("image_pull_policy"),
sla=timedelta(minutes=4),
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)["export_task"]
),
)
5 changes: 5 additions & 0 deletions dags/stellar_etl_airflow/build_time_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ def build_time_task(
container_resources=resources_requests,
on_failure_callback=alert_after_max_retries,
image_pull_policy=Variable.get("image_pull_policy"),
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
"get_ledger_range_from_times"
]
),
)

0 comments on commit 8982c76

Please sign in to comment.