Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRODUCTION] Update production Airflow environment #228

Merged
merged 39 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7de75e3
wip
chowbao May 26, 2023
a4943a6
Merge branch 'master' into futurenet-soroban-support
chowbao Jul 24, 2023
923f6dc
Update history_effects_schema.json
sydneynotthecity Jul 24, 2023
76c2307
Update history_operations_schema.json
sydneynotthecity Jul 24, 2023
e395413
Add futurenet preview 10 soroban data
chowbao Jul 25, 2023
afac4af
Update history_operations_schema.json
sydneynotthecity Jul 26, 2023
ddb1adb
Update history_operations_schema.json
sydneynotthecity Jul 26, 2023
2043cae
Updates
chowbao Jul 27, 2023
96d3ebf
Merge branch 'master' into futurenet-soroban-support
chowbao Jul 27, 2023
55264d7
Fix typo
chowbao Jul 27, 2023
e7f7cc3
Update effects schema
chowbao Jul 27, 2023
0a54f87
Change bad records to 0 for low volume in futurenet
chowbao Jul 27, 2023
fd81d91
Fix typos
chowbao Jul 27, 2023
c942031
Soroban support for operations and effects
sydneynotthecity Jul 24, 2023
8854f7b
Update futurenet schemas
sydneynotthecity Aug 28, 2023
8a4d83f
Update soroban columns
chowbao Aug 31, 2023
45c9ddf
Merge remote-tracking branch 'refs/remotes/origin/futurenet-soroban-s…
chowbao Aug 31, 2023
3d2e7a5
Remove parameters from invoke host functions in operations
chowbao Aug 31, 2023
5d280b7
Add more columns
chowbao Aug 31, 2023
f545090
Merge branch 'master' into soroban-support-testnet
chowbao Sep 7, 2023
d4d1204
Update config settings schema
chowbao Sep 7, 2023
5c8026e
Updates for soroban
chowbao Sep 19, 2023
977e5f9
Add diagnostic events
chowbao Sep 19, 2023
82a91e4
Update start dates
chowbao Sep 20, 2023
aa03620
fix type
chowbao Sep 20, 2023
b2b2805
fix schemas
chowbao Sep 20, 2023
cb8a4e8
update vars
chowbao Sep 20, 2023
8f44d71
Merge branch 'master' into soroban-support-testnet
chowbao Sep 20, 2023
c5c03e4
format
chowbao Sep 20, 2023
c71c87e
Add prod variables
chowbao Sep 21, 2023
ea701dd
Fix variables
chowbao Sep 21, 2023
24cdda3
Update schema
chowbao Sep 21, 2023
52d6f53
Merge branch 'master' into soroban-support-testnet
chowbao Oct 3, 2023
030cef0
Merge branch 'master' into soroban-support-testnet
sydneynotthecity Oct 4, 2023
6bf1289
Merge branch 'master' into soroban-support-testnet
sydneynotthecity Oct 5, 2023
14096bd
Merge branch 'master' into soroban-support-testnet
sydneynotthecity Oct 6, 2023
586c9d5
Merge pull request #218 from stellar/soroban-support-testnet
chowbao Oct 12, 2023
35c15d7
Adjust DAGs for aggregate table changes
sydneynotthecity Oct 13, 2023
8cd1008
Merge pull request #229 from stellar/adjust-aggregate-tables
sydneynotthecity Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions airflow_variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@
"transaction_id",
"account",
"type"
]
],
"contract_data": ["last_modified_ledger"],
"contract_code": ["last_modified_ledger"],
"config_settings": ["last_modified_ledger"],
"expiration": ["last_modified_ledger"]
},
"dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth",
Expand All @@ -112,7 +116,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:0643adc",
"dbt_image_name": "stellar/stellar-dbt:011d897",
"dbt_job_execution_timeout_seconds": 6000,
"dbt_job_retries": 1,
"dbt_keyfile_profile": "",
Expand All @@ -126,7 +130,7 @@
"dbt_token_uri": "https://oauth2.googleapis.com/token",
"gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:6211230",
"image_name": "stellar/stellar-etl:e9c803c",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand All @@ -153,7 +157,12 @@
"signers": "account_signers.txt",
"trades": "trades.txt",
"transactions": "transactions.txt",
"trustlines": "trustlines.txt"
"trustlines": "trustlines.txt",
"contract_data": "contract_data.txt",
"contract_code": "contract_code.txt",
"config_settings": "config_settings.txt",
"expiration": "expiration.txt",
"diagnostic_events": "diagnostic_events.txt"
},
"output_path": "/home/airflow/gcs/data/",
"owner": "SDF",
Expand Down Expand Up @@ -213,6 +222,22 @@
"trust_lines": {
"type": "MONTH",
"field": "batch_run_date"
},
"contract_data": {
"field": "batch_run_date",
"type": "MONTH"
},
"contract_code": {
"field": "batch_run_date",
"type": "MONTH"
},
"config_settings": {
"field": "batch_run_date",
"type": "MONTH"
},
"expiration": {
"field": "batch_run_date",
"type": "MONTH"
}
},
"public_dataset": "crypto_stellar_2",
Expand Down Expand Up @@ -258,7 +283,12 @@
"trades": "history_trades",
"transactions": "history_transactions",
"trustlines": "trust_lines",
"enriched_history_operations": "enriched_history_operations"
"enriched_history_operations": "enriched_history_operations",
"contract_data": "contract_data",
"contract_code": "contract_code",
"config_settings": "config_settings",
"expiration": "expiration",
"diagnostic_events": "diagnostic_events"
},
"task_timeout": {
"build_batch_stats": 180,
Expand Down Expand Up @@ -289,6 +319,7 @@
}
},
"partners_bucket": "ext-partner-sftp",
"use_futurenet": "False",
"currency_ohlc": {
"currency": "euro_ohlc",
"table_name": "euro_usd_ohlc",
Expand Down
41 changes: 36 additions & 5 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@
"transaction_id",
"account",
"type"
]
],
"contract_data": ["last_modified_ledger"],
"contract_code": ["last_modified_ledger"],
"config_settings": ["last_modified_ledger"],
"expiration": ["last_modified_ledger"]
},
"dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth",
Expand All @@ -90,7 +94,7 @@
"partnership_assets__account_holders_activity_fact": true,
"partnership_assets__asset_activity_fact": true
},
"dbt_image_name": "stellar/stellar-dbt:0643adc",
"dbt_image_name": "stellar/stellar-dbt:011d897",
"dbt_job_execution_timeout_seconds": 300,
"dbt_job_retries": 1,
"dbt_keyfile_profile": "",
Expand All @@ -104,7 +108,7 @@
"dbt_token_uri": "https://oauth2.googleapis.com/token",
"gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:6211230",
"image_name": "chowbao/stellar-etl:testnet-noUsdDb",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand All @@ -130,7 +134,12 @@
"signers": "account_signers.txt",
"trades": "trades.txt",
"transactions": "transactions.txt",
"trustlines": "trustlines.txt"
"trustlines": "trustlines.txt",
"contract_data": "contract_data.txt",
"contract_code": "contract_code.txt",
"config_settings": "config_settings.txt",
"expiration": "expiration.txt",
"diagnostic_events": "diagnostic_events.txt"
},
"output_path": "/home/airflow/gcs/data/",
"owner": "SDF",
Expand Down Expand Up @@ -194,6 +203,22 @@
"mgi": {
"field": "tran_evnt_date",
"type": "MONTH"
},
"contract_data": {
"field": "batch_run_date",
"type": "MONTH"
},
"contract_code": {
"field": "batch_run_date",
"type": "MONTH"
},
"config_settings": {
"field": "batch_run_date",
"type": "MONTH"
},
"expiration": {
"field": "batch_run_date",
"type": "MONTH"
}
},
"partners_data": {
Expand Down Expand Up @@ -246,7 +271,12 @@
"trades": "history_trades",
"transactions": "history_transactions",
"trustlines": "trust_lines",
"enriched_history_operations": "enriched_history_operations"
"enriched_history_operations": "enriched_history_operations",
"contract_data": "contract_data",
"contract_code": "contract_code",
"config_settings": "config_settings",
"expiration": "expiration",
"diagnostic_events": "diagnostic_events"
},
"task_timeout": {
"build_batch_stats": 180,
Expand All @@ -269,6 +299,7 @@
"sandbox_dataset": "crypto_stellar_internal_sandbox",
"volume_config": {},
"volume_name": "etl-data",
"use_futurenet": "False",
"currency_ohlc": {
"currency": "euro_ohlc",
"table_name": "euro_usd_ohlc",
Expand Down
1 change: 1 addition & 0 deletions dags/asset_pricing_pipeline_dag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import json

from airflow import DAG
from airflow.models.variable import Variable
Expand Down
7 changes: 7 additions & 0 deletions dags/bucket_list_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
public_project = Variable.get("public_project")
public_dataset = Variable.get("public_dataset")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

"""
The time task reads in the execution time of the current run, as well as the next
Expand All @@ -61,6 +62,7 @@
"export_accounts",
file_names["accounts"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_bal_task = build_export_task(
Expand All @@ -69,6 +71,7 @@
"export_claimable_balances",
file_names["claimable_balances"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_off_task = build_export_task(
Expand All @@ -77,6 +80,7 @@
"export_offers",
file_names["offers"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_pool_task = build_export_task(
Expand All @@ -85,6 +89,7 @@
"export_pools",
file_names["liquidity_pools"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_sign_task = build_export_task(
Expand All @@ -93,6 +98,7 @@
"export_signers",
file_names["signers"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_trust_task = build_export_task(
Expand All @@ -101,6 +107,7 @@
"export_trustlines",
file_names["trustlines"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)

Expand Down
22 changes: 20 additions & 2 deletions dags/history_archive_with_captive_core_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
dag = DAG(
"history_archive_with_captive_core",
default_args=get_default_dag_args(),
start_date=datetime.datetime(2022, 3, 11, 18, 30),
start_date=datetime.datetime(2023, 9, 20, 15, 0),
catchup=True,
description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.",
schedule_interval="*/30 * * * *",
params={
Expand All @@ -44,12 +45,13 @@
public_dataset = Variable.get("public_dataset")
public_dataset_new = Variable.get("public_dataset_new")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

"""
The time task reads in the execution time of the current run, as well as the next
execution time. It converts these two times into ledger ranges.
"""
time_task = build_time_task(dag, use_testnet=use_testnet)
time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet)

"""
The write batch stats task will take a snapshot of the DAG run_id, execution date,
Expand All @@ -60,6 +62,7 @@
write_trade_stats = build_batch_stats(dag, table_names["trades"])
write_effects_stats = build_batch_stats(dag, table_names["effects"])
write_tx_stats = build_batch_stats(dag, table_names["transactions"])
write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"])

"""
The export tasks call export commands on the Stellar ETL using the ledger range from the time task.
Expand All @@ -76,6 +79,7 @@
"export_operations",
file_names["operations"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand All @@ -85,6 +89,7 @@
"export_trades",
file_names["trades"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand All @@ -94,6 +99,7 @@
"export_effects",
"effects.txt",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand All @@ -103,6 +109,17 @@
"export_transactions",
file_names["transactions"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
diagnostic_events_export_task = build_export_task(
dag,
"archive",
"export_diagnostic_events",
file_names["diagnostic_events"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand Down Expand Up @@ -380,3 +397,4 @@
)
tx_export_task >> delete_old_tx_pub_task >> send_txs_to_pub_task >> wait_on_dag
tx_export_task >> delete_old_tx_pub_new_task >> send_txs_to_pub_new_task >> wait_on_dag
(time_task >> write_diagnostic_events_stats >> diagnostic_events_export_task)
8 changes: 6 additions & 2 deletions dags/history_archive_without_captive_core_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
dag = DAG(
"history_archive_without_captive_core",
default_args=get_default_dag_args(),
start_date=datetime.datetime(2022, 3, 11, 18, 30),
start_date=datetime.datetime(2023, 9, 20, 15, 0),
catchup=True,
description="This DAG exports ledgers, transactions, and assets from the history archive to BigQuery. Incremental Loads",
schedule_interval="*/15 * * * *",
params={
Expand All @@ -43,12 +44,13 @@
public_dataset = Variable.get("public_dataset")
public_dataset_new = Variable.get("public_dataset_new")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

"""
The time task reads in the execution time of the current run, as well as the next
execution time. It converts these two times into ledger ranges.
"""
time_task = build_time_task(dag, use_testnet=use_testnet)
time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet)

"""
The write batch stats task will take a snapshot of the DAG run_id, execution date,
Expand All @@ -73,6 +75,7 @@
"export_ledgers",
file_names["ledgers"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
asset_export_task = build_export_task(
Expand All @@ -81,6 +84,7 @@
"export_assets",
file_names["assets"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)

Expand Down
Loading