diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc1df335..60ace9ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,8 +59,8 @@ jobs: - id: "get-credentials" uses: "google-github-actions/get-gke-credentials@v2" with: - cluster_name: "us-central1-hubble-1pt5-dev-7db0e004-gke" - location: "us-central1-c" + cluster_name: "us-central1-test-hubble-2-5f1f2dbf-gke" + location: "us-central1" - name: Pytest run: pytest dags/ @@ -98,20 +98,20 @@ jobs: run: python dags/stellar_etl_airflow/add_files_to_composer.py --bucket $BUCKET env: GOOGLE_CLOUD_PROJECT: test-hubble-319619 - BUCKET: us-central1-hubble-1pt5-dev-7db0e004-bucket + BUCKET: us-central1-test-hubble-2-5f1f2dbf-bucket - name: Update Airflow variables uses: actions-hub/gcloud@master env: PROJECT_ID: test-hubble-319619 APPLICATION_CREDENTIALS: "${{ secrets.CREDS_TEST_HUBBLE }}" - COMPOSER_ENVIRONMENT: hubble-1pt5-dev + COMPOSER_ENVIRONMENT: test-hubble-2 LOCATION: us-central1 with: args: > components install kubectl && gcloud composer environments run $COMPOSER_ENVIRONMENT --location $LOCATION variables import - -- gcsfuse/variables.json + -- gcsfuse/actual_mount_path/variables.json promote-to-prod: runs-on: ubuntu-latest diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 8d9bde81..ffc9bdd3 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -1,74 +1,5 @@ { - "affinity": { - "default": { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "cloud.google.com/gke-nodepool", - "operator": "In", - "values": ["default-pool"] - } - ] - } - ] - } - } - }, - "cc": { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "cloud.google.com/gke-nodepool", - "operator": "In", - "values": ["pool-2"] - } - ] - } - ] - } - } - }, - "wocc": { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "cloud.google.com/gke-nodepool", - "operator": "In", - "values": ["default-pool"] - } - ] - } - ] - } - } - }, - "state": { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "cloud.google.com/gke-nodepool", - "operator": "In", - "values": ["default-pool"] - } - ] - } - ] - } - } - } - }, + "affinity": {}, "api_key_path": "/home/airflow/gcs/data/apiKey.json", "bq_dataset": "test_crypto_stellar_internal", "bq_project": "test-hubble-319619", @@ -121,7 +52,7 @@ "partnership_assets__account_holders_activity_fact": false, "partnership_assets__asset_activity_fact": false }, - "dbt_image_name": "stellar/stellar-dbt:b621192", + "dbt_image_name": "stellar/stellar-dbt:8ae3438", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, "dbt_mart_dataset": "test_sdf_marts", @@ -129,9 +60,9 @@ "dbt_project": "test-hubble-319619", "dbt_target": "test", "dbt_threads": 12, - "gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket", + "gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:75c9a9c", + "image_name": "stellar/stellar-etl:395f1f8", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 105a02c5..7af9edc7 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -143,7 +143,7 @@ "partnership_assets__asset_activity_fact": false, "trade_agg": false }, - "dbt_image_name": "stellar/stellar-dbt:b621192", + "dbt_image_name": "stellar/stellar-dbt:8ae3438", "dbt_job_execution_timeout_seconds": 1800, "dbt_job_retries": 1, "dbt_mart_dataset": "sdf_marts", @@ -153,7 +153,7 @@ "dbt_threads": 12, "gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:75c9a9c", + "image_name": "stellar/stellar-etl:395f1f8", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", diff --git a/dags/dbt_sdf_marts_dag.py b/dags/dbt_sdf_marts_dag.py index 558fdf52..8c29900f 100644 --- a/dags/dbt_sdf_marts_dag.py +++ b/dags/dbt_sdf_marts_dag.py @@ -38,8 +38,8 @@ mgi_task = dbt_task(dag, tag="mgi") liquidity_providers_task = dbt_task(dag, tag="liquidity_providers") -# liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value") -# liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history") +liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value") +liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history") trade_agg_task = dbt_task(dag, tag="trade_agg") fee_stats_agg_task = dbt_task(dag, tag="fee_stats") asset_stats_agg_task = dbt_task(dag, tag="asset_stats") @@ -47,6 +47,7 @@ partnership_assets_task = dbt_task(dag, tag="partnership_assets") history_assets = dbt_task(dag, tag="history_assets") soroban = dbt_task(dag, tag="soroban") +snapshot_state = dbt_task(dag, tag="snapshot_state") elementary = elementary_task(dag, "dbt_sdf_marts") @@ -57,8 +58,8 @@ wait_on_partner_pipeline_dag >> mgi_task wait_on_dbt_enriched_base_tables >> liquidity_providers_task -# wait_on_dbt_enriched_base_tables >> liquidity_pools_values_task -# wait_on_dbt_enriched_base_tables >> liquidity_pools_value_history_task +wait_on_dbt_enriched_base_tables >> liquidity_pools_values_task +wait_on_dbt_enriched_base_tables >> liquidity_pools_value_history_task wait_on_dbt_enriched_base_tables >> trade_agg_task wait_on_dbt_enriched_base_tables >> fee_stats_agg_task wait_on_dbt_enriched_base_tables >> asset_stats_agg_task @@ -66,11 +67,12 @@ wait_on_dbt_enriched_base_tables >> partnership_assets_task wait_on_dbt_enriched_base_tables >> history_assets wait_on_dbt_enriched_base_tables >> soroban +wait_on_dbt_enriched_base_tables >> snapshot_state mgi_task >> elementary liquidity_providers_task >> elementary -# liquidity_pools_values_task >> elementary -# liquidity_pools_value_history_task >> elementary +liquidity_pools_values_task >> elementary +liquidity_pools_value_history_task >> elementary trade_agg_task >> elementary fee_stats_agg_task >> elementary asset_stats_agg_task >> elementary @@ -79,3 +81,4 @@ history_assets >> elementary soroban >> elementary liquidity_pool_trade_volume_task >> elementary +snapshot_state >> elementary diff --git a/dags/stellar_etl_airflow/build_export_task.py b/dags/stellar_etl_airflow/build_export_task.py index ded28975..1d15e099 100644 --- a/dags/stellar_etl_airflow/build_export_task.py +++ b/dags/stellar_etl_airflow/build_export_task.py @@ -198,8 +198,7 @@ def build_export_task( else: arguments = f""" {etl_cmd_string} 2>> stderr.out && echo "{{\\"output\\": \\"{output_file}\\", - \\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`\\", - \\"succcessful_transforms\\": `grep successful_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json + \\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json """ return KubernetesPodOperator( service_account_name=Variable.get("k8s_service_account"), diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index 3ebe8982..c5cdd219 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1074,6 +1074,11 @@ "mode": "NULLABLE", "name": "contract_code_hash", "type": "STRING" + }, + { + "mode": "REPEATED", + "name": "ledger_key_hash", + "type": "STRING" } ], "mode": "NULLABLE",