From a2ec9465d26137420ef9a7fb0140a42a5b49855a Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 11 Dec 2024 19:55:13 -0600 Subject: [PATCH] Add metadata fields --- airflow_variables_dev.json | 2 +- dags/external_data_dag.py | 34 +++++++++++++++++++--------------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 10461424..22f97fb1 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -316,7 +316,7 @@ "schema_filepath": "/home/airflow/gcs/dags/schemas/", "sentry_dsn": "https://9e0a056541c3445083329b072f2df690@o14203.ingest.us.sentry.io/6190849", "sentry_environment": "development", - "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:40f406c53", + "stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7", "table_ids": { "accounts": "accounts", "assets": "history_assets", diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 947fcc1b..5026961d 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -111,6 +111,23 @@ def stellar_etl_internal_task( "retool-exported-entity.txt", ) +retool_table_name = "retool_entity_data" +retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" +retool_public_project = "test-hubble-319619" +retool_public_dataset = "test_crypto_stellar_internal" +retool_batch_id = macros.get_batch_id() +retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +retool_export_task_id = "export_retool_data" +retool_source_object_suffix = "" +retool_source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + retool_export_task_id + + '\')["output"] }}' + + retool_source_object_suffix +] +batch_insert_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + + retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -126,25 +143,12 @@ def stellar_etl_internal_task( "gcp", "--output", retool_filepath, + "-u", + f"'batch_id={retool_batch_id},batch_run_date={retool_batch_date},batch_insert_ts={batch_insert_ts}'", ], output_file=retool_filepath, ) -retool_table_name = "retool_entity_data" -retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" -retool_public_project = "test-hubble-319619" -retool_public_dataset = "test_crypto_stellar_internal" -retool_batch_id = macros.get_batch_id() -retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" -retool_export_task_id = "export_retool_data" -retool_source_object_suffix = "" -retool_source_objects = [ - "{{ task_instance.xcom_pull(task_ids='" - + retool_export_task_id - + '\')["output"] }}' - + retool_source_object_suffix -] - retool_task_vars = { "task_id": f"del_ins_{retool_table_name}_task", "project": retool_public_project,