diff --git a/dags/history_ledger_transaction_with_captive_core_dag.py b/dags/history_ledger_transaction_with_captive_core_dag.py index 9c556854..a9bc5a2e 100644 --- a/dags/history_ledger_transaction_with_captive_core_dag.py +++ b/dags/history_ledger_transaction_with_captive_core_dag.py @@ -11,7 +11,7 @@ from stellar_etl_airflow import macros from stellar_etl_airflow.build_batch_stats import build_batch_stats from stellar_etl_airflow.build_export_task import build_export_task -from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task +from stellar_etl_airflow.build_export_to_lake_task import export_to_lake from stellar_etl_airflow.build_time_task import build_time_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry @@ -79,4 +79,10 @@ ) -(time_task >> write_lt_stats >> lt_export_task) +lt_lake_export_task = export_to_lake( + dag, + lt_export_task.task_id, +) + + +(time_task >> write_lt_stats >> lt_export_task >> lt_lake_export_task) diff --git a/dags/stellar_etl_airflow/build_export_to_lake_task.py b/dags/stellar_etl_airflow/build_export_to_lake_task.py new file mode 100644 index 00000000..8a5f6256 --- /dev/null +++ b/dags/stellar_etl_airflow/build_export_to_lake_task.py @@ -0,0 +1,31 @@ +""" +This file contains functions for creating Airflow tasks to load files from Google Cloud Storage into BigQuery. +""" + +from airflow.models import Variable +from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator +from sentry_sdk import capture_message, push_scope +from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema +from stellar_etl_airflow.default import alert_after_max_retries + + +def export_to_lake(dag, export_task_id): + bucket_source = Variable.get("gcs_exported_data_bucket_name") + bucket_destination = Variable.get("ledger_transaction_data_lake_bucket_name") + return GCSToGCSOperator( + dag=dag, + task_id="export_data_to_lake", + source_bucket=bucket_source, + source_objects=[ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + ], + destination_bucket=bucket_destination, + destination_object=[ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"][13:] }}' + ], + exact_match=True, + )