Skip to content

Commit

Permalink
create export to lake task
Browse files Browse the repository at this point in the history
  • Loading branch information
cayod committed Oct 30, 2023
1 parent 2f9bf16 commit f9c68cf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
10 changes: 8 additions & 2 deletions dags/history_ledger_transaction_with_captive_core_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_batch_stats import build_batch_stats
from stellar_etl_airflow.build_export_task import build_export_task
from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task
from stellar_etl_airflow.build_export_to_lake_task import export_to_lake
from stellar_etl_airflow.build_time_task import build_time_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry

Expand Down Expand Up @@ -79,4 +79,10 @@
)


(time_task >> write_lt_stats >> lt_export_task)
lt_lake_export_task = export_to_lake(
dag,
lt_export_task.task_id,
)


(time_task >> write_lt_stats >> lt_export_task >> lt_lake_export_task)
31 changes: 31 additions & 0 deletions dags/stellar_etl_airflow/build_export_to_lake_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
This file contains functions for creating Airflow tasks to load files from Google Cloud Storage into BigQuery.
"""

from airflow.models import Variable
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from sentry_sdk import capture_message, push_scope
from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema
from stellar_etl_airflow.default import alert_after_max_retries


def export_to_lake(dag, export_task_id):
bucket_source = Variable.get("gcs_exported_data_bucket_name")
bucket_destination = Variable.get("ledger_transaction_data_lake_bucket_name")
return GCSToGCSOperator(
dag=dag,
task_id="export_data_to_lake",
source_bucket=bucket_source,
source_objects=[
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
],
destination_bucket=bucket_destination,
destination_object=[
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"][13:] }}'
],
exact_match=True,
)

0 comments on commit f9c68cf

Please sign in to comment.