From 3f3995cd5d9fc413ce3d205fbe8405fba293cee3 Mon Sep 17 00:00:00 2001 From: Laysa de Sousa Bitencourt Date: Thu, 25 Apr 2024 20:30:01 -0300 Subject: [PATCH] successful_transforms_folders --- dags/stellar_etl_airflow/test_sources.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dags/stellar_etl_airflow/test_sources.py b/dags/stellar_etl_airflow/test_sources.py index 99c8971e..295a8acd 100644 --- a/dags/stellar_etl_airflow/test_sources.py +++ b/dags/stellar_etl_airflow/test_sources.py @@ -79,7 +79,10 @@ def store_files(blobs, successful_transforms_folders): if re.search(rf"{key}", blob.name): matching_files.append(os.path.basename(blob.name)) - successful_transforms_folders[key] = matching_files + if successful_transforms_folders[key] is None: + successful_transforms_folders[key] = matching_files + else: + successful_transforms_folders[key].extend(matching_files) return successful_transforms_folders @@ -155,9 +158,14 @@ def get_from_stateTables(**context): blobs, successful_transforms_folders ) - print(successful_transforms_folders) + print(f"successful_transforms_folders AAAARE: {successful_transforms_folders}") + gcs_hook = GCSHook(google_cloud_storage_conn_id="google_cloud_storage_default") - # gcs_hook = GCSHook(google_cloud_storage_conn_id="google_cloud_storage_default") + # for dag_run in execution_dates: + # execution_date_str = dag_run.execution_date.strftime( + # "%Y-%m-%d %H:%M:%S%z" + # ).replace(" ", "T") + # execution_date_str = execution_date_str[:-2] + ":" + execution_date_str[-2:] # for key in successful_transforms_folders.keys(): # if successful_transforms_folders[key] is not None: