From abc6431bf7fc5c24aa15cfc09e4c3e2d5e3bed39 Mon Sep 17 00:00:00 2001 From: Cayod Date: Tue, 31 Oct 2023 09:05:36 -0300 Subject: [PATCH] update destination object --- dags/stellar_etl_airflow/build_export_to_lake_task.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dags/stellar_etl_airflow/build_export_to_lake_task.py b/dags/stellar_etl_airflow/build_export_to_lake_task.py index 8a5f6256..89516a97 100644 --- a/dags/stellar_etl_airflow/build_export_to_lake_task.py +++ b/dags/stellar_etl_airflow/build_export_to_lake_task.py @@ -12,6 +12,11 @@ def export_to_lake(dag, export_task_id): bucket_source = Variable.get("gcs_exported_data_bucket_name") bucket_destination = Variable.get("ledger_transaction_data_lake_bucket_name") + destination_data = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"][13:] }}' + ] return GCSToGCSOperator( dag=dag, task_id="export_data_to_lake", @@ -22,10 +27,6 @@ def export_to_lake(dag, export_task_id): + '\')["output"] }}' ], destination_bucket=bucket_destination, - destination_object=[ - "{{ task_instance.xcom_pull(task_ids='" - + export_task_id - + '\')["output"][13:] }}' - ], + destination_object=destination_data[0], exact_match=True, )