From 1102681a6b066225a89f597e6a914babf6c114d4 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 16:33:46 -0600 Subject: [PATCH] Add task to write to bq Add alias --- airflow_variables_dev.json | 1 + dags/external_data_dag.py | 55 +++++++++ schemas/retool_entity_data_schema.json | 163 +++++++++++++++++++++++++ 3 files changed, 219 insertions(+) create mode 100644 schemas/retool_entity_data_schema.json diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 170de89e..19a4da25 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -351,6 +351,7 @@ "create_sandbox": 2400, "current_state": 720, "default": 60, + "del_ins_retool_entity_data_task": 720, "elementary_dbt_data_quality": 1620, "elementary_generate_report": 1200, "enriched_history_operations": 780, diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 1d695ce6..6df6615c 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -3,6 +3,7 @@ It is scheduled to export information to BigQuery at regular intervals. """ +import os from ast import literal_eval from datetime import datetime, timedelta from json import loads @@ -13,6 +14,13 @@ from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from kubernetes.client import models as k8s from stellar_etl_airflow import macros +from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( + build_del_ins_from_gcs_to_bq_task, +) +from stellar_etl_airflow.build_del_ins_operator import ( + create_del_ins_task, + initialize_task_vars, +) from stellar_etl_airflow.default import ( alert_after_max_retries, get_default_dag_args, @@ -28,6 +36,9 @@ start_date=datetime(2024, 12, 5, 14, 30), description="This DAG exports data from external sources such as retool.", schedule_interval="*/10 * * * *", + params={ + "alias": "external", + }, render_template_as_native_obj=True, user_defined_filters={ "fromjson": lambda s: loads(s), @@ -86,6 +97,12 @@ def stellar_etl_internal_task( ) +run_id = "{{ run_id }}" +filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), run_id, "retool-exported-entity.txt" +) + + retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -99,5 +116,43 @@ def stellar_etl_internal_task( Variable.get("gcs_exported_data_bucket_name"), "--cloud-provider", "gcp", + "--output", + filepath, ], ) + +table_name = "retool_entity_data" +table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" +public_project = "test-hubble-319619" +public_dataset = "test_crypto_stellar_internal" +batch_id = macros.get_batch_id() +batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +export_task_id = "export_retool_data" +source_object_suffix = "/*-retool-exported-entity.txt" +source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + + source_object_suffix +] + +task_vars = { + "task_id": f"del_ins_{table_name}_task", + "project": public_project, + "dataset": public_dataset, + "table_name": table_name, + "export_task_id": "export_retool_data", + "source_object_suffix": source_object_suffix, + "partition": False, + "cluster": False, + "batch_id": batch_id, + "batch_date": batch_date, + "source_objects": source_objects, + "table_id": table_id, +} + +retool_insert_to_bq_task = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task +) + +retool_export_task >> retool_insert_to_bq_task diff --git a/schemas/retool_entity_data_schema.json b/schemas/retool_entity_data_schema.json new file mode 100644 index 00000000..2be79a99 --- /dev/null +++ b/schemas/retool_entity_data_schema.json @@ -0,0 +1,163 @@ +[ + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_id", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "created_at", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "updated_at", + "type": "TIMESTAMP" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "id", + "type": "INTEGER" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "account_sponsor", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "REPEATED", + "name": "app_geographies", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "custodial", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "description", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "fee_sponsor", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "home_domain", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "home_domains_id", + "type": "INTEGER" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "live", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "name", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "non_custodial", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "notes", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "REPEATED", + "name": "ramps", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "sdp_enabled", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "soroban_enabled", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "status", + "type": "STRING" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "verified", + "type": "BOOLEAN" + }, + { + "description": "", + "fields": [], + "mode": "", + "name": "website_url", + "type": "STRING" + } +]