Skip to content

Commit

Permalink
Add dag to create elementary report (#506)
Browse files Browse the repository at this point in the history
* Add dag to create elementary report

* lint

update

update

* Use last invocation

update

* send args

* concat

* reduce days

* update

* update dbt image

* get all invokes

* update dbt image

* udpate image

* increase task sla threshold

* limited data fetch

* increase memory

* Specific config for elementary report

* Increase ephemeral storage

* report for last 7 days

* report for last 7 days

* upgrade elementary

* limit days

* increase ephemeral storage

* decrease execution limit

* Actually use ephemeral storage

* Actually use ephemeral storage

* Actually use ephemeral storage

* bring memory down back

* Fix schedule, add vars in prod

* lint

* update dbt image and remove unnecessary vars
  • Loading branch information
amishas157 authored Nov 12, 2024
1 parent b29125b commit 48c8e63
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 21 deletions.
13 changes: 9 additions & 4 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"partnership_assets__account_holders_activity_fact": false,
"partnership_assets__asset_activity_fact": false
},
"dbt_image_name": "stellar/stellar-dbt:53375b5f9",
"dbt_image_name": "stellar/stellar-dbt:420c216df",
"dbt_internal_source_db": "test-hubble-319619",
"dbt_internal_source_schema": "test_crypto_stellar_internal",
"dbt_job_execution_timeout_seconds": 300,
Expand Down Expand Up @@ -288,21 +288,25 @@
"dbt": {
"requests": {
"cpu": "1",
"ephemeral-storage": "500Mi",
"memory": "600Mi"
}
},
"default": {
"requests": {
"cpu": "0.3",
"ephemeral-storage": "500Mi",
"memory": "600Mi"
}
},
"elementaryreport": {
"requests": {
"cpu": "1",
"ephemeral_storage": "2Gi",
"memory": "4Gi"
}
},
"stellaretl": {
"requests": {
"cpu": "0.3",
"ephemeral-storage": "500Mi",
"memory": "600Mi"
}
}
Expand Down Expand Up @@ -346,6 +350,7 @@
"current_state": 720,
"default": 60,
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
"enriched_history_operations_with_exclude": 780,
"fee_stats": 840,
Expand Down
13 changes: 9 additions & 4 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:53375b5f9",
"dbt_image_name": "stellar/stellar-dbt:420c216df",
"dbt_internal_source_db": "hubble-261722",
"dbt_internal_source_schema": "crypto_stellar_internal_2",
"dbt_job_execution_timeout_seconds": 2400,
Expand Down Expand Up @@ -286,21 +286,25 @@
"dbt": {
"requests": {
"cpu": "1",
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
},
"default": {
"requests": {
"cpu": "0.5",
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
},
"elementaryreport": {
"requests": {
"cpu": "1",
"ephemeral_storage": "2Gi",
"memory": "4Gi"
}
},
"stellaretl": {
"requests": {
"cpu": "0.5",
"ephemeral-storage": "1Gi",
"memory": "1Gi"
}
}
Expand Down Expand Up @@ -344,6 +348,7 @@
"current_state": 1200,
"default": 60,
"elementary_dbt_data_quality": 2100,
"elementary_generate_report": 1200,
"enriched_history_operations": 1800,
"enriched_history_operations_with_exclude": 1800,
"fee_stats": 360,
Expand Down
4 changes: 3 additions & 1 deletion dags/dbt_data_quality_alerts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
) as dag:

# Trigger elementary
elementary_alerts = elementary_task(dag, "dbt_data_quality", resource_cfg="dbt")
elementary_alerts = elementary_task(
dag, "dbt_data_quality", "monitor", resource_cfg="dbt"
)

elementary_alerts
47 changes: 47 additions & 0 deletions dags/elementary_report_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_dbt_task import dbt_task
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

with DAG(
"elementary_report",
default_args=get_default_dag_args(),
start_date=datetime(2024, 11, 11, 0, 0),
description="This DAG creates elementary report and send it to slack",
schedule="0 3 * * MON", # Runs every Monday
user_defined_filters={
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
},
max_active_runs=1,
catchup=False,
) as dag:

# Trigger elementary
elementary_alerts = elementary_task(
dag,
"generate_report",
"send-report",
resource_cfg="elementaryreport",
cmd_args=[
"--days-back",
"7",
"--profiles-dir",
".",
"--executions-limit",
"120",
"--slack-file-name",
f"elementary_report_{datetime.today().date()}.html",
],
)

elementary_alerts
26 changes: 14 additions & 12 deletions dags/stellar_etl_airflow/build_elementary_slack_alert_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def access_secret(secret_name, namespace):
return secret


def elementary_task(
dag,
task_name,
resource_cfg="default",
):
def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"):
namespace = conf.get("kubernetes", "NAMESPACE")

if namespace == "default":
Expand All @@ -33,25 +29,31 @@ def elementary_task(
config_file_location = None
in_cluster = True

container_resources = k8s.V1ResourceRequirements(
requests={
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}",
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}",
}
)
requests = {
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}",
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}",
}
if resource_cfg == "elementaryreport":
requests["ephemeral-storage"] = (
f"{{{{ var.json.resources.{resource_cfg}.requests.ephemeral_storage }}}}"
)
container_resources = k8s.V1ResourceRequirements(requests=requests)

dbt_image = "{{ var.value.dbt_image_name }}"

slack_secret_name = Variable.get("dbt_elementary_secret")
secret = access_secret(slack_secret_name, "default")
args = [
"monitor",
f"{command}",
"--slack-token",
f"{secret}",
"--slack-channel-name",
"{{ var.value.dbt_slack_elementary_channel }}",
]

if len(cmd_args):
args = [*args, *cmd_args]

logging.info(f"sh commands to run in pod: {args}")

return KubernetesPodOperator(
Expand Down

0 comments on commit 48c8e63

Please sign in to comment.