diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index ffc9bdd3..72781c0d 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -62,7 +62,7 @@ "dbt_threads": 12, "gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:395f1f8", + "image_name": "stellar/stellar-etl:a334cb5", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", @@ -182,7 +182,7 @@ "table": "raw_mgi_stellar_transactions" } }, - "partners_bucket": "partners_data", + "partners_bucket": "ext-partner-sftp", "public_dataset": "test_crypto_stellar", "public_project": "test-hubble-319619", "resources": { @@ -190,7 +190,7 @@ "requests": { "cpu": "0.3", "memory": "900Mi", - "ephemeral-storage": "10Mi" + "ephemeral-storage": "1Gi" } }, "cc": { @@ -274,5 +274,7 @@ "public_source_schema": "test_crypto_stellar", "slack_elementary_channel": "stellar-elementary-alerts", "elementary_secret": "slack-token-elementary", - "dbt_elementary_dataset": "test_elementary" + "dbt_elementary_dataset": "test_elementary", + "use_captive_core": "False", + "txmeta_datastore_url": "gcs://exporter-test/ledgers/testnet" } diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 7af9edc7..ebdbea93 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -153,7 +153,7 @@ "dbt_threads": 12, "gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:395f1f8", + "image_name": "stellar/stellar-etl:a334cb5", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", @@ -269,9 +269,9 @@ "resources": { "default": { "requests": { - "cpu": "1.2", + "cpu": "3.5", "memory": "5Gi", - "ephemeral-storage": "10Mi" + "ephemeral-storage": "10Gi" } }, "cc": { @@ -363,5 +363,7 @@ "public_source_schema": "crypto_stellar", "slack_elementary_channel": "alerts-hubble-data-quality", "elementary_secret": "slack-token-elementary", - "dbt_elementary_dataset": "elementary" + "dbt_elementary_dataset": "elementary", + "use_captive_core": "False", + "txmeta_datastore_url": "gcs://ledger-exporter/ledgers/pubnet" } diff --git a/dags/dbt_enriched_base_tables_dag.py b/dags/dbt_enriched_base_tables_dag.py index 9cb78efb..81e2df1b 100644 --- a/dags/dbt_enriched_base_tables_dag.py +++ b/dags/dbt_enriched_base_tables_dag.py @@ -24,14 +24,8 @@ ) # Wait on ingestion DAGs -# NOTE: history_archive_without_captive_core is currently disabled in -# favor of history_archive_with_captive_core_combined; -# Update with ledger exporter project integration -# wait_on_cc = build_cross_deps( -# dag, "wait_on_ledgers_txs", "history_archive_without_captive_core" -# ) -wait_on_cc = build_cross_deps( - dag, "wait_on_ledgers_txs", "history_archive_with_captive_core_combined_export" +wait_on_history_table = build_cross_deps( + dag, "wait_on_ledgers_txs", "history_table_export" ) wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") @@ -42,7 +36,7 @@ elementary = elementary_task(dag, "dbt_enriched_base_tables") # DAG task graph -wait_on_cc >> enriched_history_operations_task +wait_on_history_table >> enriched_history_operations_task wait_on_state_table >> current_state_task diff --git a/dags/history_archive_with_captive_core_combined_export_dag.py b/dags/history_archive_with_captive_core_combined_export_dag.py deleted file mode 100644 index cd8f7625..00000000 --- a/dags/history_archive_with_captive_core_combined_export_dag.py +++ /dev/null @@ -1,311 +0,0 @@ -""" -The history_archive_export DAG exports operations and trades from the history archives. -It is scheduled to export information to BigQuery at regular intervals. -""" -from ast import literal_eval -from datetime import datetime -from json import loads - -from airflow import DAG -from airflow.models.variable import Variable -from kubernetes.client import models as k8s -from stellar_etl_airflow import macros -from stellar_etl_airflow.build_batch_stats import build_batch_stats -from stellar_etl_airflow.build_bq_insert_job_task import build_bq_insert_job -from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps -from stellar_etl_airflow.build_delete_data_task import build_delete_data_task -from stellar_etl_airflow.build_export_task import build_export_task -from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task -from stellar_etl_airflow.build_time_task import build_time_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry - -init_sentry() - -dag = DAG( - "history_archive_with_captive_core_combined_export", - default_args=get_default_dag_args(), - start_date=datetime(2024, 1, 19, 19, 0), - catchup=True, - description="This DAG exports all history_* base tables using CaptiveCore. The DAG is a temporary fix, and not suited for public use.", - schedule_interval="*/30 * * * *", - params={ - "alias": "cc", - }, - render_template_as_native_obj=True, - user_defined_filters={ - "fromjson": lambda s: loads(s), - "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), - "literal_eval": lambda e: literal_eval(e), - }, - user_defined_macros={ - "subtract_data_interval": macros.subtract_data_interval, - "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, - }, -) - -table_names = Variable.get("table_ids", deserialize_json=True) -internal_project = "{{ var.value.bq_project }}" -internal_dataset = "{{ var.value.bq_dataset }}" -public_project = "{{ var.value.public_project }}" -public_dataset = "{{ var.value.public_dataset }}" -use_testnet = literal_eval(Variable.get("use_testnet")) -use_futurenet = literal_eval(Variable.get("use_futurenet")) - -""" -The time task reads in the execution time of the current run, as well as the next -execution time. It converts these two times into ledger ranges. -""" -time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) - -""" -The write batch stats task will take a snapshot of the DAG run_id, execution date, -start and end ledgers so that reconciliation and data validation are easier. The -record is written to an internal dataset for data eng use only. -""" -write_op_stats = build_batch_stats(dag, table_names["operations"]) -write_trade_stats = build_batch_stats(dag, table_names["trades"]) -write_effects_stats = build_batch_stats(dag, table_names["effects"]) -write_tx_stats = build_batch_stats(dag, table_names["transactions"]) -write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"]) - -""" -The export tasks call export commands on the Stellar ETL using the ledger range from the time task. -The results of the command are stored in a file. There is one task for each of the data types that -can be exported from the history archives. - -The DAG sleeps for 30 seconds after the export_task writes to the file to give the poststart.sh -script time to copy the file over to the correct directory. If there is no sleep, the load task -starts prematurely and will not load data. -""" -all_history_export_task = build_export_task( - dag, - "archive", - "export_all_history", - "all_history", - use_testnet=use_testnet, - use_futurenet=use_futurenet, - use_gcs=True, - resource_cfg="cc", -) - -""" -The delete partition task checks to see if the given partition/batch id exists in -Bigquery. If it does, the records are deleted prior to reinserting the batch. -""" -delete_old_op_task = build_delete_data_task( - dag, internal_project, internal_dataset, table_names["operations"] -) -delete_old_op_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["operations"], "pub" -) -delete_old_trade_task = build_delete_data_task( - dag, internal_project, internal_dataset, table_names["trades"] -) -delete_old_trade_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["trades"], "pub" -) -delete_enrich_op_task = build_delete_data_task( - dag, internal_project, internal_dataset, "enriched_history_operations" -) -delete_enrich_op_pub_task = build_delete_data_task( - dag, public_project, public_dataset, "enriched_history_operations", "pub" -) -delete_enrich_ma_op_task = build_delete_data_task( - dag, internal_project, internal_dataset, "enriched_meaningful_history_operations" -) -delete_old_effects_task = build_delete_data_task( - dag, internal_project, internal_dataset, table_names["effects"] -) -delete_old_effects_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["effects"], "pub" -) -delete_old_tx_task = build_delete_data_task( - dag, internal_project, internal_dataset, table_names["transactions"] -) -delete_old_tx_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["transactions"], "pub" -) - -""" -The send tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. -Then, the task merges the unique entries in the file into the corresponding table in BigQuery. -""" -send_ops_to_bq_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - internal_project, - internal_dataset, - table_names["operations"], - "exported_operations.txt", - partition=True, - cluster=True, -) -send_trades_to_bq_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - internal_project, - internal_dataset, - table_names["trades"], - "exported_trades.txt", - partition=True, - cluster=True, -) -send_effects_to_bq_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - internal_project, - internal_dataset, - table_names["effects"], - "exported_effects.txt", - partition=True, - cluster=True, -) -send_txs_to_bq_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - internal_project, - internal_dataset, - table_names["transactions"], - "exported_transactions.txt", - partition=True, - cluster=True, -) - - -""" -Load final public dataset, crypto-stellar -""" -send_ops_to_pub_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - public_project, - public_dataset, - table_names["operations"], - "exported_operations.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_trades_to_pub_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - public_project, - public_dataset, - table_names["trades"], - "exported_trades.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_effects_to_pub_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - public_project, - public_dataset, - table_names["effects"], - "exported_effects.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_txs_to_pub_task = build_gcs_to_bq_task( - dag, - all_history_export_task.task_id, - public_project, - public_dataset, - table_names["transactions"], - "exported_transactions.txt", - partition=True, - cluster=True, - dataset_type="pub", -) - -""" -Batch loading of derived table, `enriched_history_operations` which denormalizes ledgers, transactions and operations data. -Must wait on history_archive_without_captive_core_dag to finish before beginning the job. -The internal dataset also creates a filtered table, `enriched_meaningful_history_operations` which filters down to only relevant asset ops. -""" -wait_on_dag = build_cross_deps( - dag, "wait_on_ledgers_txs", "history_archive_without_captive_core" -) -insert_enriched_hist_task = build_bq_insert_job( - dag, - internal_project, - internal_dataset, - "enriched_history_operations", - partition=True, - cluster=True, -) -insert_enriched_hist_pub_task = build_bq_insert_job( - dag, - public_project, - public_dataset, - "enriched_history_operations", - partition=True, - cluster=True, - dataset_type="pub", -) -insert_enriched_ma_hist_task = build_bq_insert_job( - dag, - internal_project, - internal_dataset, - "enriched_meaningful_history_operations", - partition=True, - cluster=True, -) - -( - time_task - >> write_op_stats - >> all_history_export_task - >> delete_old_op_task - >> send_ops_to_bq_task - >> wait_on_dag - >> delete_enrich_op_task -) -( - delete_enrich_op_task - >> insert_enriched_hist_task - >> delete_enrich_ma_op_task - >> insert_enriched_ma_hist_task -) -( - all_history_export_task - >> delete_old_op_pub_task - >> send_ops_to_pub_task - >> wait_on_dag - >> delete_enrich_op_pub_task - >> insert_enriched_hist_pub_task -) -( - time_task - >> write_trade_stats - >> all_history_export_task - >> delete_old_trade_task - >> send_trades_to_bq_task -) -all_history_export_task >> delete_old_trade_pub_task >> send_trades_to_pub_task -( - time_task - >> write_effects_stats - >> all_history_export_task - >> delete_old_effects_task - >> send_effects_to_bq_task -) -all_history_export_task >> delete_old_effects_pub_task >> send_effects_to_pub_task -( - time_task - >> write_tx_stats - >> all_history_export_task - >> delete_old_tx_task - >> send_txs_to_bq_task - >> wait_on_dag -) -all_history_export_task >> delete_old_tx_pub_task >> send_txs_to_pub_task >> wait_on_dag -(time_task >> write_diagnostic_events_stats >> all_history_export_task) -( - [ - insert_enriched_hist_pub_task, - insert_enriched_hist_task, - ] -) diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py deleted file mode 100644 index 0db07100..00000000 --- a/dags/history_archive_without_captive_core_dag.py +++ /dev/null @@ -1,211 +0,0 @@ -""" -The history_archive_export DAG exports ledgers and transactions from the history archives. -It is scheduled to export information to BigQuery at regular intervals. -""" -from ast import literal_eval -from datetime import datetime -from json import loads - -from airflow import DAG -from airflow.models import Variable -from kubernetes.client import models as k8s -from stellar_etl_airflow import macros -from stellar_etl_airflow.build_batch_stats import build_batch_stats -from stellar_etl_airflow.build_bq_insert_job_task import build_bq_insert_job -from stellar_etl_airflow.build_delete_data_task import build_delete_data_task -from stellar_etl_airflow.build_export_task import build_export_task -from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task -from stellar_etl_airflow.build_time_task import build_time_task -from stellar_etl_airflow.default import get_default_dag_args, init_sentry - -init_sentry() - -dag = DAG( - "history_archive_without_captive_core", - default_args=get_default_dag_args(), - start_date=datetime(2023, 12, 18, 17, 0), - catchup=True, - description="This DAG exports ledgers, transactions, and assets from the history archive to BigQuery. Incremental Loads", - schedule_interval="*/15 * * * *", - params={ - "alias": "archive", - }, - render_template_as_native_obj=True, - user_defined_filters={ - "fromjson": lambda s: loads(s), - "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), - "literal_eval": lambda e: literal_eval(e), - }, - user_defined_macros={ - "subtract_data_interval": macros.subtract_data_interval, - "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, - }, -) - -table_names = Variable.get("table_ids", deserialize_json=True) -internal_project = "{{ var.value.bq_project }}" -internal_dataset = "{{ var.value.bq_dataset }}" -public_project = "{{ var.value.public_project }}" -public_dataset = "{{ var.value.public_dataset }}" -use_testnet = literal_eval(Variable.get("use_testnet")) -use_futurenet = literal_eval(Variable.get("use_futurenet")) - -""" -The time task reads in the execution time of the current run, as well as the next -execution time. It converts these two times into ledger ranges. -""" -time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) - -""" -The write batch stats task will take a snapshot of the DAG run_id, execution date, -start and end ledgers so that reconciliation and data validation are easier. The -record is written to an internal dataset for data eng use only. -""" -write_ledger_stats = build_batch_stats(dag, table_names["ledgers"]) -write_asset_stats = build_batch_stats(dag, table_names["assets"]) - -""" -The export tasks call export commands on the Stellar ETL using the ledger range from the time task. -The results of the command are stored in a file. There is one task for each of the data types that -can be exported from the history archives. - -The DAG sleeps for 30 seconds after the export_task writes to the file to give the poststart.sh -script time to copy the file over to the correct directory. If there is no sleep, the load task -starts prematurely and will not load data. -""" -ledger_export_task = build_export_task( - dag, - "archive", - "export_ledgers", - "{{ var.json.output_file_names.ledgers }}", - use_testnet=use_testnet, - use_futurenet=use_futurenet, - use_gcs=True, - resource_cfg="wocc", -) -asset_export_task = build_export_task( - dag, - "archive", - "export_assets", - "{{ var.json.output_file_names.assets }}", - use_testnet=use_testnet, - use_futurenet=use_futurenet, - use_gcs=True, - resource_cfg="wocc", -) - -""" -The delete partition task checks to see if the given partition/batch id exists in -Bigquery. If it does, the records are deleted prior to reinserting the batch. -""" -delete_old_ledger_task = build_delete_data_task( - dag, internal_project, internal_dataset, table_names["ledgers"] -) -delete_old_ledger_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["ledgers"], "pub" -) -delete_old_asset_task = build_delete_data_task( - dag, internal_project, internal_dataset, table_names["assets"] -) -delete_old_asset_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["assets"], "pub" -) - -""" -The send tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. -Then, the task merges the unique entries in the file into the corresponding table in BigQuery. -""" -send_ledgers_to_bq_task = build_gcs_to_bq_task( - dag, - ledger_export_task.task_id, - internal_project, - internal_dataset, - table_names["ledgers"], - "", - partition=True, - cluster=True, -) -send_assets_to_bq_task = build_gcs_to_bq_task( - dag, - asset_export_task.task_id, - internal_project, - internal_dataset, - table_names["assets"], - "", - partition=True, - cluster=True, -) - -""" -The send tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. -Then, the task merges the unique entries in the file into the corresponding table in BigQuery. -""" -send_ledgers_to_pub_task = build_gcs_to_bq_task( - dag, - ledger_export_task.task_id, - public_project, - public_dataset, - table_names["ledgers"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) -send_assets_to_pub_task = build_gcs_to_bq_task( - dag, - asset_export_task.task_id, - public_project, - public_dataset, - table_names["assets"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) - -""" -The tasks below use a job in BigQuery to deduplicate the table history_assets_stg. -The job refreshes the table history_assets with only new records. -""" -dedup_assets_bq_task = build_bq_insert_job( - dag, - internal_project, - internal_dataset, - table_names["assets"], - partition=True, - cluster=True, - create=True, -) -dedup_assets_pub_task = build_bq_insert_job( - dag, - public_project, - public_dataset, - table_names["assets"], - partition=True, - cluster=True, - create=True, - dataset_type="pub", -) - -( - time_task - >> write_ledger_stats - >> ledger_export_task - >> delete_old_ledger_task - >> send_ledgers_to_bq_task -) -ledger_export_task >> delete_old_ledger_pub_task >> send_ledgers_to_pub_task -( - time_task - >> write_asset_stats - >> asset_export_task - >> delete_old_asset_task - >> send_assets_to_bq_task - >> dedup_assets_bq_task -) -( - asset_export_task - >> delete_old_asset_pub_task - >> send_assets_to_pub_task - >> dedup_assets_pub_task -) diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_tables_dag.py similarity index 71% rename from dags/history_archive_with_captive_core_dag.py rename to dags/history_tables_dag.py index 739bb7ca..2e40cd3a 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_tables_dag.py @@ -22,12 +22,12 @@ init_sentry() dag = DAG( - "history_archive_with_captive_core", + "history_table_export", default_args=get_default_dag_args(), - start_date=datetime(2023, 12, 18, 17, 0), + start_date=datetime(2024, 4, 22, 19, 0), catchup=True, description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.", - schedule_interval="*/30 * * * *", + schedule_interval="*/10 * * * *", params={ "alias": "cc", }, @@ -50,6 +50,8 @@ public_dataset = "{{ var.value.public_dataset }}" use_testnet = literal_eval(Variable.get("use_testnet")) use_futurenet = literal_eval(Variable.get("use_futurenet")) +use_captive_core = literal_eval(Variable.get("use_captive_core")) +txmeta_datastore_url = "{{ var.value.txmeta_datastore_url }}" """ The time task reads in the execution time of the current run, as well as the next @@ -67,6 +69,8 @@ write_effects_stats = build_batch_stats(dag, table_names["effects"]) write_tx_stats = build_batch_stats(dag, table_names["transactions"]) write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"]) +write_ledger_stats = build_batch_stats(dag, table_names["ledgers"]) +write_asset_stats = build_batch_stats(dag, table_names["assets"]) """ The export tasks call export commands on the Stellar ETL using the ledger range from the time task. @@ -85,7 +89,8 @@ use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, - resource_cfg="cc", + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, ) trade_export_task = build_export_task( dag, @@ -95,7 +100,8 @@ use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, - resource_cfg="cc", + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, ) effects_export_task = build_export_task( dag, @@ -105,7 +111,8 @@ use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, - resource_cfg="cc", + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, ) tx_export_task = build_export_task( dag, @@ -115,7 +122,8 @@ use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, - resource_cfg="cc", + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, ) diagnostic_events_export_task = build_export_task( dag, @@ -125,7 +133,30 @@ use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, - resource_cfg="cc", + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, +) +ledger_export_task = build_export_task( + dag, + "archive", + "export_ledgers", + "{{ var.json.output_file_names.ledgers }}", + use_testnet=use_testnet, + use_futurenet=use_futurenet, + use_gcs=True, + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, +) +asset_export_task = build_export_task( + dag, + "archive", + "export_assets", + "{{ var.json.output_file_names.assets }}", + use_testnet=use_testnet, + use_futurenet=use_futurenet, + use_gcs=True, + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, ) """ @@ -165,6 +196,18 @@ delete_old_tx_pub_task = build_delete_data_task( dag, public_project, public_dataset, table_names["transactions"], "pub" ) +delete_old_ledger_task = build_delete_data_task( + dag, internal_project, internal_dataset, table_names["ledgers"] +) +delete_old_ledger_pub_task = build_delete_data_task( + dag, public_project, public_dataset, table_names["ledgers"], "pub" +) +delete_old_asset_task = build_delete_data_task( + dag, internal_project, internal_dataset, table_names["assets"] +) +delete_old_asset_pub_task = build_delete_data_task( + dag, public_project, public_dataset, table_names["assets"], "pub" +) """ The send tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. @@ -210,6 +253,26 @@ partition=True, cluster=True, ) +send_ledgers_to_bq_task = build_gcs_to_bq_task( + dag, + ledger_export_task.task_id, + internal_project, + internal_dataset, + table_names["ledgers"], + "", + partition=True, + cluster=True, +) +send_assets_to_bq_task = build_gcs_to_bq_task( + dag, + asset_export_task.task_id, + internal_project, + internal_dataset, + table_names["assets"], + "", + partition=True, + cluster=True, +) """ @@ -259,15 +322,29 @@ cluster=True, dataset_type="pub", ) - -""" -Batch loading of derived table, `enriched_history_operations` which denormalizes ledgers, transactions and operations data. -Must wait on history_archive_without_captive_core_dag to finish before beginning the job. -The internal dataset also creates a filtered table, `enriched_meaningful_history_operations` which filters down to only relevant asset ops. -""" -wait_on_dag = build_cross_deps( - dag, "wait_on_ledgers_txs", "history_archive_without_captive_core" +send_ledgers_to_pub_task = build_gcs_to_bq_task( + dag, + ledger_export_task.task_id, + public_project, + public_dataset, + table_names["ledgers"], + "", + partition=True, + cluster=True, + dataset_type="pub", ) +send_assets_to_pub_task = build_gcs_to_bq_task( + dag, + asset_export_task.task_id, + public_project, + public_dataset, + table_names["assets"], + "", + partition=True, + cluster=True, + dataset_type="pub", +) + insert_enriched_hist_task = build_bq_insert_job( dag, internal_project, @@ -300,7 +377,6 @@ >> op_export_task >> delete_old_op_task >> send_ops_to_bq_task - >> wait_on_dag >> delete_enrich_op_task ) ( @@ -313,7 +389,6 @@ op_export_task >> delete_old_op_pub_task >> send_ops_to_pub_task - >> wait_on_dag >> delete_enrich_op_pub_task >> insert_enriched_hist_pub_task ) @@ -339,9 +414,14 @@ >> tx_export_task >> delete_old_tx_task >> send_txs_to_bq_task - >> wait_on_dag + >> delete_enrich_op_task +) +( + tx_export_task + >> delete_old_tx_pub_task + >> send_txs_to_pub_task + >> delete_enrich_op_pub_task ) -tx_export_task >> delete_old_tx_pub_task >> send_txs_to_pub_task >> wait_on_dag (time_task >> write_diagnostic_events_stats >> diagnostic_events_export_task) ( [ @@ -349,3 +429,51 @@ insert_enriched_hist_task, ] ) +dedup_assets_bq_task = build_bq_insert_job( + dag, + internal_project, + internal_dataset, + table_names["assets"], + partition=True, + cluster=True, + create=True, +) +dedup_assets_pub_task = build_bq_insert_job( + dag, + public_project, + public_dataset, + table_names["assets"], + partition=True, + cluster=True, + create=True, + dataset_type="pub", +) + +( + time_task + >> write_ledger_stats + >> ledger_export_task + >> delete_old_ledger_task + >> send_ledgers_to_bq_task + >> delete_enrich_op_task +) +( + ledger_export_task + >> delete_old_ledger_pub_task + >> send_ledgers_to_pub_task + >> delete_enrich_op_pub_task +) +( + time_task + >> write_asset_stats + >> asset_export_task + >> delete_old_asset_task + >> send_assets_to_bq_task + >> dedup_assets_bq_task +) +( + asset_export_task + >> delete_old_asset_pub_task + >> send_assets_to_pub_task + >> dedup_assets_pub_task +) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index bb39462b..bad4c389 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -22,9 +22,9 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime(2023, 12, 17, 6, 0), + start_date=datetime(2024, 3, 22, 0, 0), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", - schedule_interval="*/30 * * * *", + schedule_interval="*/10 * * * *", params={ "alias": "state", }, @@ -48,6 +48,8 @@ public_dataset = "{{ var.value.public_dataset }}" use_testnet = literal_eval(Variable.get("use_testnet")) use_futurenet = literal_eval(Variable.get("use_futurenet")) +use_captive_core = literal_eval(Variable.get("use_captive_core")) +txmeta_datastore_url = "{{ var.value.txmeta_datastore_url }}" date_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) changes_task = build_export_task( @@ -58,7 +60,8 @@ use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, - resource_cfg="state", + use_captive_core=use_captive_core, + txmeta_datastore_url=txmeta_datastore_url, ) """ diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 84036597..eff8617b 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -127,7 +127,6 @@ def dbt_task( image=dbt_image, arguments=args, dag=dag, - do_xcom_push=True, is_delete_operator_pod=True, startup_timeout_seconds=720, in_cluster=in_cluster, @@ -208,7 +207,6 @@ def build_dbt_task( cmds=command, arguments=args, dag=dag, - do_xcom_push=True, is_delete_operator_pod=True, startup_timeout_seconds=720, in_cluster=in_cluster, diff --git a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py index 60fe08ec..f33481fc 100644 --- a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py +++ b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py @@ -80,7 +80,6 @@ def elementary_task( cmds=["edr"], arguments=args, dag=dag, - do_xcom_push=True, is_delete_operator_pod=True, startup_timeout_seconds=720, in_cluster=in_cluster, diff --git a/dags/stellar_etl_airflow/build_export_task.py b/dags/stellar_etl_airflow/build_export_task.py index 1d15e099..9a6431fb 100644 --- a/dags/stellar_etl_airflow/build_export_task.py +++ b/dags/stellar_etl_airflow/build_export_task.py @@ -47,6 +47,8 @@ def generate_etl_cmd( use_gcs=False, use_testnet=False, use_futurenet=False, + use_captive_core=False, + txmeta_datastore_url="", ): """ Runs the provided stellar-etl command with arguments that are appropriate for the command type. @@ -137,6 +139,7 @@ def generate_etl_cmd( cmd.extend( ["--cloud-storage-bucket", Variable.get("gcs_exported_data_bucket_name")] ) + # TODO: cloud-provider should be a parameter instead of hardcoded to gcp cmd.extend(["--cloud-provider", "gcp"]) batch_id = macros.get_batch_id() batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" @@ -147,6 +150,10 @@ def generate_etl_cmd( cmd.append("--testnet") elif use_futurenet: cmd.append("--futurenet") + if use_captive_core: + cmd.append("--captive-core") + if txmeta_datastore_url: + cmd.extend(["--datastore-url", txmeta_datastore_url]) cmd.append("--strict-export") if command == "export_all_history": @@ -164,6 +171,8 @@ def build_export_task( use_testnet=False, use_futurenet=False, resource_cfg="default", + use_captive_core=False, + txmeta_datastore_url="", ): """ Creates a task that calls the provided export function with the correct arguments in the stellar-etl Docker image. @@ -179,7 +188,14 @@ def build_export_task( """ etl_cmd, output_file = generate_etl_cmd( - command, filename, cmd_type, use_gcs, use_testnet, use_futurenet + command, + filename, + cmd_type, + use_gcs, + use_testnet, + use_futurenet, + use_captive_core, + txmeta_datastore_url, ) etl_cmd_string = " ".join(etl_cmd) namespace = conf.get("kubernetes", "NAMESPACE")