diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 80fc7e4a..cc1df335 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,6 +56,12 @@ jobs: with: credentials_json: "${{ secrets.CREDS_TEST_HUBBLE }}" + - id: "get-credentials" + uses: "google-github-actions/get-gke-credentials@v2" + with: + cluster_name: "us-central1-hubble-1pt5-dev-7db0e004-gke" + location: "us-central1-c" + - name: Pytest run: pytest dags/ diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 3dc52faf..b00fe70c 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -121,7 +121,7 @@ "partnership_assets__account_holders_activity_fact": false, "partnership_assets__asset_activity_fact": false }, - "dbt_image_name": "stellar/stellar-dbt:017241c", + "dbt_image_name": "stellar/stellar-dbt:d5d92f8", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, "dbt_mart_dataset": "test_sdf_marts", @@ -340,5 +340,8 @@ "internal_source_db": "test-hubble-319619", "internal_source_schema": "test_crypto_stellar_internal", "public_source_db": "test-hubble-319619", - "public_source_schema": "test_crypto_stellar" + "public_source_schema": "test_crypto_stellar", + "slack_elementary_channel": "stellar-elementary-alerts", + "elementary_secret": "slack-token-elementary", + "dbt_elementary_dataset": "test_elementary" } diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index d84110fa..8d350a81 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -143,7 +143,7 @@ "partnership_assets__asset_activity_fact": false, "trade_agg": false }, - "dbt_image_name": "stellar/stellar-dbt:017241c", + "dbt_image_name": "stellar/stellar-dbt:d5d92f8", "dbt_job_execution_timeout_seconds": 1800, "dbt_job_retries": 1, "dbt_mart_dataset": "sdf_marts", @@ -360,5 +360,8 @@ "internal_source_db": "hubble-261722", "internal_source_schema": "crypto_stellar_internal_2", "public_source_db": "crypto-stellar", - "public_source_schema": "crypto_stellar" + "public_source_schema": "crypto_stellar", + "slack_elementary_channel": "alerts-hubble-data-quality", + "elementary_secret": "slack-token-elementary", + "dbt_elementary_dataset": "elementary" } diff --git a/dags/dbt_enriched_base_tables_dag.py b/dags/dbt_enriched_base_tables_dag.py index 5af07529..9cb78efb 100644 --- a/dags/dbt_enriched_base_tables_dag.py +++ b/dags/dbt_enriched_base_tables_dag.py @@ -4,6 +4,7 @@ from kubernetes.client import models as k8s from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import dbt_task +from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry init_sentry() @@ -38,7 +39,13 @@ enriched_history_operations_task = dbt_task(dag, tag="enriched_history_operations") current_state_task = dbt_task(dag, tag="current_state") +elementary = elementary_task(dag, "dbt_enriched_base_tables") + # DAG task graph wait_on_cc >> enriched_history_operations_task wait_on_state_table >> current_state_task + +enriched_history_operations_task >> elementary + +current_state_task >> elementary diff --git a/dags/dbt_sdf_marts_dag.py b/dags/dbt_sdf_marts_dag.py index ab574611..8121ada4 100644 --- a/dags/dbt_sdf_marts_dag.py +++ b/dags/dbt_sdf_marts_dag.py @@ -4,6 +4,7 @@ from kubernetes.client import models as k8s from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import dbt_task +from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry init_sentry() @@ -45,6 +46,8 @@ history_assets = dbt_task(dag, tag="history_assets") soroban = dbt_task(dag, tag="soroban") +elementary = elementary_task(dag, "dbt_sdf_marts") + # DAG task graph wait_on_dbt_enriched_base_tables >> ohlc_task >> liquidity_pool_trade_volume_task @@ -59,3 +62,14 @@ wait_on_dbt_enriched_base_tables >> partnership_assets_task wait_on_dbt_enriched_base_tables >> history_assets wait_on_dbt_enriched_base_tables >> soroban + +mgi_task >> elementary +liquidity_providers_task >> elementary +trade_agg_task >> elementary +fee_stats_agg_task >> elementary +asset_stats_agg_task >> elementary +network_stats_agg_task >> elementary +partnership_assets_task >> elementary +history_assets >> elementary +soroban >> elementary +liquidity_pool_trade_volume_task >> elementary diff --git a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py new file mode 100644 index 00000000..60fe08ec --- /dev/null +++ b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py @@ -0,0 +1,93 @@ +import base64 +import logging + +from airflow.configuration import conf +from airflow.models import Variable +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, +) +from kubernetes import client, config +from kubernetes.client import models as k8s +from stellar_etl_airflow.default import alert_after_max_retries + + +def access_secret(secret_name, namespace): + config.load_kube_config() + v1 = client.CoreV1Api() + secret_data = v1.read_namespaced_secret(secret_name, namespace) + secret = secret_data.data + secret = base64.b64decode(secret["token"]).decode("utf-8") + return secret + + +def elementary_task( + dag, + task_name, + resource_cfg="default", +): + namespace = conf.get("kubernetes", "NAMESPACE") + + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + + container_resources = k8s.V1ResourceRequirements( + requests={ + "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", + "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", + } + ) + affinity = Variable.get("affinity", deserialize_json=True).get(resource_cfg) + + dbt_image = "{{ var.value.dbt_image_name }}" + + slack_secret_name = Variable.get("elementary_secret") + secret = access_secret(slack_secret_name, "default") + args = [ + "monitor", + "--slack-token", + f"{secret}", + "--slack-channel-name", + "{{ var.value.slack_elementary_channel }}", + ] + + logging.info(f"sh commands to run in pod: {args}") + + return KubernetesPodOperator( + task_id=f"elementary_slack_alert_{task_name}", + name=f"elementary_slack_alert_{task_name}", + namespace=Variable.get("k8s_namespace"), + service_account_name=Variable.get("k8s_service_account"), + env_vars={ + "DBT_USE_COLORS": "0", + "DBT_DATASET": "{{ var.value.dbt_elementary_dataset }}", + "DBT_TARGET": "{{ var.value.dbt_target }}", + "DBT_MAX_BYTES_BILLED": "{{ var.value.dbt_maximum_bytes_billed }}", + "DBT_JOB_TIMEOUT": "{{ var.value.dbt_job_execution_timeout_seconds }}", + "DBT_THREADS": "{{ var.value.dbt_threads }}", + "DBT_JOB_RETRIES": "{{ var.value.dbt_job_retries }}", + "DBT_PROJECT": "{{ var.value.dbt_project }}", + "INTERNAL_SOURCE_DB": "{{ var.value.internal_source_db }}", + "INTERNAL_SOURCE_SCHEMA": "{{ var.value.internal_source_schema }}", + "PUBLIC_SOURCE_DB": "{{ var.value.public_source_db }}", + "PUBLIC_SOURCE_SCHEMA": "{{ var.value.public_source_schema }}", + "EXECUTION_DATE": "{{ ds }}", + }, + image=dbt_image, + cmds=["edr"], + arguments=args, + dag=dag, + do_xcom_push=True, + is_delete_operator_pod=True, + startup_timeout_seconds=720, + in_cluster=in_cluster, + config_file=config_file_location, + affinity=affinity, + container_resources=container_resources, + on_failure_callback=alert_after_max_retries, + image_pull_policy="IfNotPresent", + image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + )