Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRODUCTION] Update production Airflow environment #340

Merged
merged 11 commits into from
Apr 18, 2024
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ jobs:
- id: "get-credentials"
uses: "google-github-actions/get-gke-credentials@v2"
with:
cluster_name: "us-central1-hubble-1pt5-dev-7db0e004-gke"
location: "us-central1-c"
cluster_name: "us-central1-test-hubble-2-5f1f2dbf-gke"
location: "us-central1"

- name: Pytest
run: pytest dags/
Expand Down Expand Up @@ -98,20 +98,20 @@ jobs:
run: python dags/stellar_etl_airflow/add_files_to_composer.py --bucket $BUCKET
env:
GOOGLE_CLOUD_PROJECT: test-hubble-319619
BUCKET: us-central1-hubble-1pt5-dev-7db0e004-bucket
BUCKET: us-central1-test-hubble-2-5f1f2dbf-bucket

- name: Update Airflow variables
uses: actions-hub/gcloud@master
env:
PROJECT_ID: test-hubble-319619
APPLICATION_CREDENTIALS: "${{ secrets.CREDS_TEST_HUBBLE }}"
COMPOSER_ENVIRONMENT: hubble-1pt5-dev
COMPOSER_ENVIRONMENT: test-hubble-2
LOCATION: us-central1
with:
args: >
components install kubectl && gcloud composer environments run
$COMPOSER_ENVIRONMENT --location $LOCATION variables import
-- gcsfuse/variables.json
-- gcsfuse/actual_mount_path/variables.json
promote-to-prod:
runs-on: ubuntu-latest
Expand Down
77 changes: 4 additions & 73 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
@@ -1,74 +1,5 @@
{
"affinity": {
"default": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["default-pool"]
}
]
}
]
}
}
},
"cc": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["pool-2"]
}
]
}
]
}
}
},
"wocc": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["default-pool"]
}
]
}
]
}
}
},
"state": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["default-pool"]
}
]
}
]
}
}
}
},
"affinity": {},
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"bq_dataset": "test_crypto_stellar_internal",
"bq_project": "test-hubble-319619",
Expand Down Expand Up @@ -121,17 +52,17 @@
"partnership_assets__account_holders_activity_fact": false,
"partnership_assets__asset_activity_fact": false
},
"dbt_image_name": "stellar/stellar-dbt:b621192",
"dbt_image_name": "stellar/stellar-dbt:8ae3438",
"dbt_job_execution_timeout_seconds": 300,
"dbt_job_retries": 1,
"dbt_mart_dataset": "test_sdf_marts",
"dbt_maximum_bytes_billed": 250000000000,
"dbt_project": "test-hubble-319619",
"dbt_target": "test",
"dbt_threads": 12,
"gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket",
"gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:75c9a9c",
"image_name": "stellar/stellar-etl:395f1f8",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand Down
4 changes: 2 additions & 2 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:b621192",
"dbt_image_name": "stellar/stellar-dbt:8ae3438",
"dbt_job_execution_timeout_seconds": 1800,
"dbt_job_retries": 1,
"dbt_mart_dataset": "sdf_marts",
Expand All @@ -153,7 +153,7 @@
"dbt_threads": 12,
"gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:75c9a9c",
"image_name": "stellar/stellar-etl:395f1f8",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand Down
15 changes: 9 additions & 6 deletions dags/dbt_sdf_marts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@

mgi_task = dbt_task(dag, tag="mgi")
liquidity_providers_task = dbt_task(dag, tag="liquidity_providers")
# liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value")
# liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history")
liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value")
liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history")
trade_agg_task = dbt_task(dag, tag="trade_agg")
fee_stats_agg_task = dbt_task(dag, tag="fee_stats")
asset_stats_agg_task = dbt_task(dag, tag="asset_stats")
network_stats_agg_task = dbt_task(dag, tag="network_stats")
partnership_assets_task = dbt_task(dag, tag="partnership_assets")
history_assets = dbt_task(dag, tag="history_assets")
soroban = dbt_task(dag, tag="soroban")
snapshot_state = dbt_task(dag, tag="snapshot_state")

elementary = elementary_task(dag, "dbt_sdf_marts")

Expand All @@ -57,20 +58,21 @@
wait_on_partner_pipeline_dag >> mgi_task

wait_on_dbt_enriched_base_tables >> liquidity_providers_task
# wait_on_dbt_enriched_base_tables >> liquidity_pools_values_task
# wait_on_dbt_enriched_base_tables >> liquidity_pools_value_history_task
wait_on_dbt_enriched_base_tables >> liquidity_pools_values_task
wait_on_dbt_enriched_base_tables >> liquidity_pools_value_history_task
wait_on_dbt_enriched_base_tables >> trade_agg_task
wait_on_dbt_enriched_base_tables >> fee_stats_agg_task
wait_on_dbt_enriched_base_tables >> asset_stats_agg_task
wait_on_dbt_enriched_base_tables >> network_stats_agg_task
wait_on_dbt_enriched_base_tables >> partnership_assets_task
wait_on_dbt_enriched_base_tables >> history_assets
wait_on_dbt_enriched_base_tables >> soroban
wait_on_dbt_enriched_base_tables >> snapshot_state

mgi_task >> elementary
liquidity_providers_task >> elementary
# liquidity_pools_values_task >> elementary
# liquidity_pools_value_history_task >> elementary
liquidity_pools_values_task >> elementary
liquidity_pools_value_history_task >> elementary
trade_agg_task >> elementary
fee_stats_agg_task >> elementary
asset_stats_agg_task >> elementary
Expand All @@ -79,3 +81,4 @@
history_assets >> elementary
soroban >> elementary
liquidity_pool_trade_volume_task >> elementary
snapshot_state >> elementary
3 changes: 1 addition & 2 deletions dags/stellar_etl_airflow/build_export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ def build_export_task(
else:
arguments = f"""
{etl_cmd_string} 2>> stderr.out && echo "{{\\"output\\": \\"{output_file}\\",
\\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`\\",
\\"succcessful_transforms\\": `grep successful_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json
\\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json
"""
return KubernetesPodOperator(
service_account_name=Variable.get("k8s_service_account"),
Expand Down
5 changes: 5 additions & 0 deletions schemas/history_operations_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,11 @@
"mode": "NULLABLE",
"name": "contract_code_hash",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "ledger_key_hash",
"type": "STRING"
}
],
"mode": "NULLABLE",
Expand Down
Loading