From 7de75e3799742bfbe8ab91930b69e9d4436eea73 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Fri, 26 May 2023 14:24:34 -0400 Subject: [PATCH 01/28] wip --- dags/bucket_list_dag.py | 7 +++++++ dags/history_archive_with_captive_core_dag.py | 7 ++++++- dags/history_archive_without_captive_core_dag.py | 5 ++++- dags/state_table_dag.py | 6 ++++-- dags/stellar_etl_airflow/build_dbt_task.py | 1 + dags/stellar_etl_airflow/build_export_task.py | 13 +++++++++---- dags/stellar_etl_airflow/build_time_task.py | 4 +++- 7 files changed, 34 insertions(+), 9 deletions(-) diff --git a/dags/bucket_list_dag.py b/dags/bucket_list_dag.py index 6307554f..2ccfc08a 100644 --- a/dags/bucket_list_dag.py +++ b/dags/bucket_list_dag.py @@ -44,6 +44,7 @@ public_project = Variable.get("public_project") public_dataset = Variable.get("public_dataset") use_testnet = ast.literal_eval(Variable.get("use_testnet")) +use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) """ The time task reads in the execution time of the current run, as well as the next @@ -61,6 +62,7 @@ "export_accounts", file_names["accounts"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) export_bal_task = build_export_task( @@ -69,6 +71,7 @@ "export_claimable_balances", file_names["claimable_balances"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) export_off_task = build_export_task( @@ -77,6 +80,7 @@ "export_offers", file_names["offers"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) export_pool_task = build_export_task( @@ -85,6 +89,7 @@ "export_pools", file_names["liquidity_pools"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) export_sign_task = build_export_task( @@ -93,6 +98,7 @@ "export_signers", file_names["signers"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) export_trust_task = build_export_task( @@ -101,6 +107,7 @@ "export_trustlines", file_names["trustlines"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_archive_with_captive_core_dag.py index c3b242b6..60fbcdd4 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_archive_with_captive_core_dag.py @@ -43,12 +43,13 @@ public_project = Variable.get("public_project") public_dataset = Variable.get("public_dataset") use_testnet = ast.literal_eval(Variable.get("use_testnet")) +use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) """ The time task reads in the execution time of the current run, as well as the next execution time. It converts these two times into ledger ranges. """ -time_task = build_time_task(dag, use_testnet=use_testnet) +time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) """ The write batch stats task will take a snapshot of the DAG run_id, execution date, @@ -75,6 +76,7 @@ "export_operations", file_names["operations"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, resource_cfg="cc", ) @@ -84,6 +86,7 @@ "export_trades", file_names["trades"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, resource_cfg="cc", ) @@ -93,6 +96,7 @@ "export_effects", "effects.txt", use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, resource_cfg="cc", ) @@ -102,6 +106,7 @@ "export_transactions", file_names["transactions"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, resource_cfg="cc", ) diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py index 4e0c16d1..acfd3bb2 100644 --- a/dags/history_archive_without_captive_core_dag.py +++ b/dags/history_archive_without_captive_core_dag.py @@ -42,12 +42,13 @@ public_project = Variable.get("public_project") public_dataset = Variable.get("public_dataset") use_testnet = ast.literal_eval(Variable.get("use_testnet")) +use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) """ The time task reads in the execution time of the current run, as well as the next execution time. It converts these two times into ledger ranges. """ -time_task = build_time_task(dag, use_testnet=use_testnet) +time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) """ The write batch stats task will take a snapshot of the DAG run_id, execution date, @@ -72,6 +73,7 @@ "export_ledgers", file_names["ledgers"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) asset_export_task = build_export_task( @@ -80,6 +82,7 @@ "export_assets", file_names["assets"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, ) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 47af00f6..2b8731f4 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -26,7 +26,7 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2022, 3, 11, 19, 00), + start_date=datetime.datetime(2023, 5, 11, 19, 00), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ @@ -46,14 +46,16 @@ public_project = Variable.get("public_project") public_dataset = Variable.get("public_dataset") use_testnet = ast.literal_eval(Variable.get("use_testnet")) +use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) -date_task = build_time_task(dag, use_testnet=use_testnet) +date_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) changes_task = build_export_task( dag, "bounded-core", "export_ledger_entry_changes", file_names["changes"], use_testnet=use_testnet, + use_futurenet=use_futurenet, use_gcs=True, resource_cfg="state", ) diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index ed44a4ba..cb97c16d 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -129,4 +129,5 @@ def build_dbt_task(dag, model_name, resource_cfg="default"): container_resources=k8s.V1ResourceRequirements(requests=resources_requests), on_failure_callback=alert_after_max_retries, image_pull_policy="Always", # TODO: Update to ifNotPresent when image pull issue is fixed + image_pull_secrets=[k8s.V1LocalObjectReference("regcred")], ) diff --git a/dags/stellar_etl_airflow/build_export_task.py b/dags/stellar_etl_airflow/build_export_task.py index 8b1b3daf..360361fb 100644 --- a/dags/stellar_etl_airflow/build_export_task.py +++ b/dags/stellar_etl_airflow/build_export_task.py @@ -15,13 +15,15 @@ from stellar_etl_airflow.default import alert_after_max_retries -def get_path_variables(use_testnet=False): +def get_path_variables(use_testnet=False, use_futurenet=False): """ Returns the image output path, core executable path, and core config path. """ config = "/etl/docker/stellar-core.cfg" if use_testnet: config = "/etl/docker/stellar-core_testnet.cfg" + elif use_futurenet: + config = "/etl/docker/stellar-core_futurenet.cfg" return "/usr/bin/stellar-core", config @@ -39,7 +41,7 @@ def select_correct_filename(cmd_type, base_name, batched_name): def generate_etl_cmd( - command, base_filename, cmd_type, use_gcs=False, use_testnet=False + command, base_filename, cmd_type, use_gcs=False, use_testnet=False, use_futurenet=False ): """ Runs the provided stellar-etl command with arguments that are appropriate for the command type. @@ -71,7 +73,7 @@ def generate_etl_cmd( if cmd_type in ("archive", "bounded-core"): end_ledger = '{{ [ti.xcom_pull(task_ids="get_ledger_range_from_times")["end"]-1, ti.xcom_pull(task_ids="get_ledger_range_from_times")["start"]] | max}}' - core_exec, core_cfg = get_path_variables(use_testnet) + core_exec, core_cfg = get_path_variables(use_testnet, use_futurenet) batch_filename = "-".join([start_ledger, end_ledger, base_filename]) run_id = "{{ run_id }}" @@ -134,6 +136,8 @@ def generate_etl_cmd( cmd.extend(["-u", metadata]) if use_testnet: cmd.append("--testnet") + elif use_futurenet: + cmd.append("--futurenet") cmd.append("--strict-export") return cmd, os.path.join(filepath, correct_filename) @@ -145,6 +149,7 @@ def build_export_task( filename, use_gcs=False, use_testnet=False, + use_futurenet=False, resource_cfg="default", ): """ @@ -161,7 +166,7 @@ def build_export_task( """ etl_cmd, output_file = generate_etl_cmd( - command, filename, cmd_type, use_gcs, use_testnet + command, filename, cmd_type, use_gcs, use_testnet, use_futurenet ) etl_cmd_string = " ".join(etl_cmd) config_file_location = Variable.get("kube_config_location") diff --git a/dags/stellar_etl_airflow/build_time_task.py b/dags/stellar_etl_airflow/build_time_task.py index c85816d4..6d662ba6 100644 --- a/dags/stellar_etl_airflow/build_time_task.py +++ b/dags/stellar_etl_airflow/build_time_task.py @@ -13,7 +13,7 @@ def build_time_task( - dag, use_testnet=False, use_next_exec_time=True, resource_cfg="default" + dag, use_testnet=False, use_next_exec_time=True, resource_cfg="default", use_futurenet=False ): """ Creates a task to run the get_ledger_range_from_times command from the stellar-etl Docker image. The start time is the previous @@ -47,6 +47,8 @@ def build_time_task( logging.info(f"Constructing command with args: {args}") if use_testnet: args.append("--testnet") + elif use_futurenet: + args.append("--futurenet") config_file_location = Variable.get("kube_config_location") in_cluster = False if config_file_location else True resources_requests = ( From 923f6dc086ae11b627d36bf2cd0dcf51deae8404 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Mon, 24 Jul 2023 14:28:04 -0500 Subject: [PATCH 02/28] Update history_effects_schema.json --- schemas/history_effects_schema.json | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/schemas/history_effects_schema.json b/schemas/history_effects_schema.json index 7d720f57..fed3ec61 100644 --- a/schemas/history_effects_schema.json +++ b/schemas/history_effects_schema.json @@ -760,6 +760,16 @@ "mode": "NULLABLE", "name": "bought_asset_issuer", "type": "STRING" + }, + { + "mode": "REPEATED", + "name": "entries", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "ledgers_to_expire", + "type": "INTEGER" } ], "mode": "NULLABLE", From 76c23078c8556cf909708bad8541463b8e9b4c89 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Mon, 24 Jul 2023 14:28:49 -0500 Subject: [PATCH 03/28] Update history_operations_schema.json --- schemas/history_operations_schema.json | 69 ++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index d2883ba7..b61e20b4 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -968,6 +968,75 @@ "mode": "NULLABLE", "name": "shares", "type": "FLOAT" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "amount", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "asset_type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "from", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "to", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "type", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "asset_balance_changes", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "function", + "type": "STRING" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "value", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "parameters", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "address", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "type", + "type": "STRING" + }, + { + "mode": "REPEATED", + "name": "salt", + "type": "INTEGER" } ], "mode": "NULLABLE", From e395413897a7200c80298b0705de07f81d01578b Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 25 Jul 2023 00:57:17 -0400 Subject: [PATCH 04/28] Add futurenet preview 10 soroban data --- dags/state_table_dag.py | 147 ++++++++++ schemas/contract_code_schema.json | 53 ++++ schemas/contract_config_settings_schema.json | 282 +++++++++++++++++++ schemas/contract_data_schema.json | 83 ++++++ 4 files changed, 565 insertions(+) create mode 100644 schemas/contract_code_schema.json create mode 100644 schemas/contract_config_settings_schema.json create mode 100644 schemas/contract_data_schema.json diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 5138d2bf..499e550b 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -72,6 +72,9 @@ write_pool_stats = build_batch_stats(dag, table_names["liquidity_pools"]) write_sign_stats = build_batch_stats(dag, table_names["signers"]) write_trust_stats = build_batch_stats(dag, table_names["trustlines"]) +write_contract_data_stats = build_batch_stats(dag, table_names["contract_data"]) +write_contract_code_stats = build_batch_stats(dag, table_names["contract_code"]) +write_config_settings = build_batch_stats(dag, table_names["config_settings"]) """ The delete partition task checks to see if the given partition/batch id exists in @@ -131,6 +134,33 @@ delete_trust_pub_new_task = build_delete_data_task( dag, public_project, public_dataset_new, table_names["trustlines"] ) +delete_contract_data_task = build_delete_data_task( + dag, internal_project, internal_dataset, table_names["contract_data"] +) +delete_contract_data_pub_task = build_delete_data_task( + dag, public_project, public_dataset, table_names["contract_data"] +) +delete_contract_data_pub_new_task = build_delete_data_task( + dag, public_project, public_dataset_new, table_names["contract_data"] +) +delete_contract_code_task = build_delete_data_task( + dag, internal_project, internal_dataset, table_names["contract_code"] +) +delete_contract_code_pub_task = build_delete_data_task( + dag, public_project, public_dataset, table_names["contract_code"] +) +delete_contract_code_pub_new_task = build_delete_data_task( + dag, public_project, public_dataset_new, table_names["contract_code"] +) +delete_config_settings_task = build_delete_data_task( + dag, internal_project, internal_dataset, table_names["config_settings"] +) +delete_config_settings_pub_task = build_delete_data_task( + dag, public_project, public_dataset, table_names["config_settings"] +) +delete_config_settings_pub_new_task = build_delete_data_task( + dag, public_project, public_dataset_new, table_names["config_settings"] +) """ The apply tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. @@ -197,6 +227,36 @@ partition=True, cluster=True, ) +send_contract_data_to_bq_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + internal_project, + internal_dataset, + table_names["contract_data"], + "/*-contract_data.txt", + partition=True, + cluster=True, +) +send_contract_code_to_bq_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + internal_project, + internal_dataset, + table_names["contract_code"], + "/*-contract_code.txt", + partition=True, + cluster=True, +) +send_config_settings_to_bq_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + internal_project, + internal_dataset, + table_names["config_settings"], + "/*-config_settings.txt", + partition=True, + cluster=True, +) """ The apply tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. @@ -263,6 +323,36 @@ partition=True, cluster=True, ) +send_contract_data_to_pub_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + public_project, + public_dataset, + table_names["contract_data"], + "/*-contract_data.txt", + partition=True, + cluster=True, +) +send_contract_code_to_pub_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + public_project, + public_dataset, + table_names["contract_code"], + "/*-contract_code.txt", + partition=True, + cluster=True, +) +send_config_settings_to_pub_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + public_project, + public_dataset, + table_names["config_settings"], + "/*-config_settings.txt", + partition=True, + cluster=True, +) """ Send to new public dataset @@ -327,6 +417,36 @@ partition=True, cluster=True, ) +send_contract_data_to_pub_new_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + public_project, + public_dataset_new, + table_names["contract_data"], + "/*-contract_data.txt", + partition=True, + cluster=True, +) +send_contract_code_to_pub_new_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + public_project, + public_dataset_new, + table_names["contract_code"], + "/*-contract_code.txt", + partition=True, + cluster=True, +) +send_config_settings_to_pub_new_task = build_gcs_to_bq_task( + dag, + changes_task.task_id, + public_project, + public_dataset_new, + table_names["config_settings"], + "/*-config_settings.txt", + partition=True, + cluster=True, +) date_task >> changes_task >> write_acc_stats >> delete_acc_task >> send_acc_to_bq_task write_acc_stats >> delete_acc_pub_task >> send_acc_to_pub_task @@ -364,3 +484,30 @@ ) write_trust_stats >> delete_trust_pub_task >> send_trust_to_pub_task write_trust_stats >> delete_trust_pub_new_task >> send_trust_to_pub_new_task +( + date_task + >> changes_task + >> write_contract_data_stats + >> delete_contract_data_task + >> send_contract_data_to_bq_task +) +write_contract_data_stats >> delete_contract_data_pub_task >> send_contract_data_to_pub_task +write_contract_data_stats >> delete_contract_data_pub_new_task >> send_contract_data_to_pub_new_task +( + date_task + >> changes_task + >> write_contract_code_stats + >> delete_contract_code_task + >> send_contract_code_to_bq_task +) +write_contract_code_stats >> delete_contract_code_pub_task >> send_contract_code_to_pub_task +write_contract_code_stats >> delete_contract_code_pub_new_task >> send_contract_code_to_pub_new_task +( + date_task + >> changes_task + >> write_config_settings_stats + >> delete_config_settings_task + >> send_config_settings_to_bq_task +) +write_config_settings_stats >> delete_config_settings_pub_task >> send_config_settings_to_pub_task +write_config_settings_stats >> delete_config_settings_pub_new_task >> send_config_settings_to_pub_new_task \ No newline at end of file diff --git a/schemas/contract_code_schema.json b/schemas/contract_code_schema.json new file mode 100644 index 00000000..aea2d3a1 --- /dev/null +++ b/schemas/contract_code_schema.json @@ -0,0 +1,53 @@ +[ + { + "mode": "NULLABLE", + "name": "contract_code_hash", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_code_ext_v", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_code_expiration_ledger_seq", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_code_entry_body_type", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } + ] + \ No newline at end of file diff --git a/schemas/contract_config_settings_schema.json b/schemas/contract_config_settings_schema.json new file mode 100644 index 00000000..a1aa42a0 --- /dev/null +++ b/schemas/contract_config_settings_schema.json @@ -0,0 +1,282 @@ +[ + { + "mode": "NULLABLE", + "name": "config_setting_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_max_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_instructions", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_instructions", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_rate_per_instructions_increment", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_memory_limit", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_read_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_read_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_write_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_write_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_read_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_read_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_write_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_write_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_read_ledger_entry", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_write_ledger_entry", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_read_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_write_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_fee_rate_low", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_fee_rate_high", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_growth_factor", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_historical_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_extended_meta_data_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_extended_meta_data_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_propagate_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_propagate_data_1kb", + "type": "INTEGER" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "ExtV", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "ConstTerm", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "LinearTerm", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "contract_cost_params_cpu_insns", + "type": "RECORD" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "ExtV", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "ConstTerm", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "LinearTerm", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "contract_cost_params_mem_bytes", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "contract_data_key_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_data_entry_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "max_entry_expiration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "min_temp_entry_expiration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "min_persistent_entry_expiration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "auto_bump_ledgers", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "persistent_rent_rate_denominator", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "temp_rent_rate_denominator", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "max_entries_to_expire", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_size_window_sample_size", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "eviction_scan_size", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_tx_count", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "bucket_list_size_window", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } + ] + \ No newline at end of file diff --git a/schemas/contract_data_schema.json b/schemas/contract_data_schema.json new file mode 100644 index 00000000..795a6026 --- /dev/null +++ b/schemas/contract_data_schema.json @@ -0,0 +1,83 @@ +[ + { + "mode": "NULLABLE", + "name": "contract_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_key", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_durability", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_data_flags", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_data_val", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_expiration_ledger_seq", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "asset_code", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "asset_issuer", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "balance_holder", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "balance", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } + ] + \ No newline at end of file From afac4af4eb728548f1bdd7237fcd804e088689c6 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Tue, 25 Jul 2023 21:52:05 -0500 Subject: [PATCH 05/28] Update history_operations_schema.json --- schemas/history_operations_schema.json | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index b61e20b4..f257f101 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1037,6 +1037,11 @@ "mode": "REPEATED", "name": "salt", "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledgers_to_expire", + "type": "INTEGER" } ], "mode": "NULLABLE", From ddb1adb0004cad3928e7f6eef886bd9d2b71bcff Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Tue, 25 Jul 2023 22:52:30 -0500 Subject: [PATCH 06/28] Update history_operations_schema.json --- schemas/history_operations_schema.json | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index f257f101..84fc1563 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -976,6 +976,16 @@ "name": "amount", "type": "FLOAT" }, + { + "mode": "NULLABLE", + "name": "asset_code", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "asset_issuer", + "type": "STRING" + }, { "mode": "NULLABLE", "name": "asset_type", From 2043cae28d3c6beb1ca17fb3be5e54d1f83ba3a8 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 26 Jul 2023 21:08:26 -0400 Subject: [PATCH 07/28] Updates --- dags/state_table_dag.py | 4 ++-- ...onfig_settings_schema.json => config_settings_schema.json} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename schemas/{contract_config_settings_schema.json => config_settings_schema.json} (100%) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 499e550b..de27be11 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -26,7 +26,7 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 5, 11, 19, 00), + start_date=datetime.datetime(2023, 7, 25, 19, 00), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ @@ -510,4 +510,4 @@ >> send_config_settings_to_bq_task ) write_config_settings_stats >> delete_config_settings_pub_task >> send_config_settings_to_pub_task -write_config_settings_stats >> delete_config_settings_pub_new_task >> send_config_settings_to_pub_new_task \ No newline at end of file +write_config_settings_stats >> delete_config_settings_pub_new_task >> send_config_settings_to_pub_new_task diff --git a/schemas/contract_config_settings_schema.json b/schemas/config_settings_schema.json similarity index 100% rename from schemas/contract_config_settings_schema.json rename to schemas/config_settings_schema.json From 55264d700923649047b846cd9d17437ff9f60ad2 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 26 Jul 2023 21:28:59 -0400 Subject: [PATCH 08/28] Fix typo --- dags/state_table_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index de27be11..36e47dfd 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -74,7 +74,7 @@ write_trust_stats = build_batch_stats(dag, table_names["trustlines"]) write_contract_data_stats = build_batch_stats(dag, table_names["contract_data"]) write_contract_code_stats = build_batch_stats(dag, table_names["contract_code"]) -write_config_settings = build_batch_stats(dag, table_names["config_settings"]) +write_config_settings_stats = build_batch_stats(dag, table_names["config_settings"]) """ The delete partition task checks to see if the given partition/batch id exists in From e7f7cc30709b3b07fe2cbce47a6dfa1edd23bb10 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 27 Jul 2023 11:09:08 -0400 Subject: [PATCH 09/28] Update effects schema --- dags/state_table_dag.py | 6 +++--- schemas/history_effects_schema.json | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 36e47dfd..2920ce42 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -253,7 +253,7 @@ internal_project, internal_dataset, table_names["config_settings"], - "/*-config_settings.txt", + "/*-config_setting.txt", partition=True, cluster=True, ) @@ -349,7 +349,7 @@ public_project, public_dataset, table_names["config_settings"], - "/*-config_settings.txt", + "/*-config_setting.txt", partition=True, cluster=True, ) @@ -443,7 +443,7 @@ public_project, public_dataset_new, table_names["config_settings"], - "/*-config_settings.txt", + "/*-config_setting.txt", partition=True, cluster=True, ) diff --git a/schemas/history_effects_schema.json b/schemas/history_effects_schema.json index fed3ec61..99b486a1 100644 --- a/schemas/history_effects_schema.json +++ b/schemas/history_effects_schema.json @@ -770,6 +770,11 @@ "mode": "NULLABLE", "name": "ledgers_to_expire", "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "contract", + "type": "STRING" } ], "mode": "NULLABLE", From 0a54f87ee65c946d2f63cac66b71b0d5d411e79b Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 27 Jul 2023 14:34:02 -0400 Subject: [PATCH 10/28] Change bad records to 0 for low volume in futurenet --- dags/stellar_etl_airflow/build_gcs_to_bq_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py index 277c3b25..6d1a119a 100644 --- a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py +++ b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py @@ -130,7 +130,7 @@ def build_gcs_to_bq_task( destination_project_dataset_table=f"{project_name}.{dataset_name}.{data_type}{staging_table_suffix}", write_disposition="WRITE_APPEND", create_disposition="CREATE_IF_NEEDED", - max_bad_records=10, + max_bad_records=0, time_partitioning=time_partition, cluster_fields=cluster_fields, export_task_id=export_task_id, @@ -164,7 +164,7 @@ def build_gcs_to_bq_task( destination_project_dataset_table=f"{project_name}.{dataset_name}.{data_type}{staging_table_suffix}", write_disposition="WRITE_APPEND", create_disposition="CREATE_IF_NEEDED", - max_bad_records=10, + max_bad_records=0, time_partitioning=time_partition, cluster_fields=cluster_fields, on_failure_callback=alert_after_max_retries, From fd81d917e04561ad8a8619d0b13fb4f145d63f19 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 27 Jul 2023 14:46:33 -0400 Subject: [PATCH 11/28] Fix typos --- dags/state_table_dag.py | 6 +++--- schemas/contract_code_schema.json | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 2920ce42..36e47dfd 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -253,7 +253,7 @@ internal_project, internal_dataset, table_names["config_settings"], - "/*-config_setting.txt", + "/*-config_settings.txt", partition=True, cluster=True, ) @@ -349,7 +349,7 @@ public_project, public_dataset, table_names["config_settings"], - "/*-config_setting.txt", + "/*-config_settings.txt", partition=True, cluster=True, ) @@ -443,7 +443,7 @@ public_project, public_dataset_new, table_names["config_settings"], - "/*-config_setting.txt", + "/*-config_settings.txt", partition=True, cluster=True, ) diff --git a/schemas/contract_code_schema.json b/schemas/contract_code_schema.json index aea2d3a1..00091c4f 100644 --- a/schemas/contract_code_schema.json +++ b/schemas/contract_code_schema.json @@ -17,7 +17,7 @@ { "mode": "NULLABLE", "name": "contract_code_entry_body_type", - "type": "INTEGER" + "type": "STRING" }, { "mode": "NULLABLE", @@ -50,4 +50,4 @@ "type": "TIMESTAMP" } ] - \ No newline at end of file + From c942031814b536f6d0f30282c66e4e44daea7034 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Mon, 24 Jul 2023 14:24:21 -0500 Subject: [PATCH 12/28] Soroban support for operations and effects --- ...istory_archive_without_captive_core_dag.py | 44 ------------------- 1 file changed, 44 deletions(-) diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py index a3b0ad63..acfd3bb2 100644 --- a/dags/history_archive_without_captive_core_dag.py +++ b/dags/history_archive_without_captive_core_dag.py @@ -41,7 +41,6 @@ internal_dataset = Variable.get("bq_dataset") public_project = Variable.get("public_project") public_dataset = Variable.get("public_dataset") -public_dataset_new = Variable.get("public_dataset_new") use_testnet = ast.literal_eval(Variable.get("use_testnet")) use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) @@ -97,18 +96,12 @@ delete_old_ledger_pub_task = build_delete_data_task( dag, public_project, public_dataset, table_names["ledgers"] ) -delete_old_ledger_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["ledgers"] -) delete_old_asset_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["assets"] ) delete_old_asset_pub_task = build_delete_data_task( dag, public_project, public_dataset, table_names["assets"] ) -delete_old_asset_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["assets"] -) """ The send tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. @@ -160,27 +153,6 @@ cluster=True, ) -send_ledgers_to_pub_new_task = build_gcs_to_bq_task( - dag, - ledger_export_task.task_id, - public_project, - public_dataset_new, - table_names["ledgers"], - "", - partition=True, - cluster=True, -) -send_assets_to_pub_new_task = build_gcs_to_bq_task( - dag, - asset_export_task.task_id, - public_project, - public_dataset_new, - table_names["assets"], - "", - partition=True, - cluster=True, -) - """ The tasks below use a job in BigQuery to deduplicate the table history_assets_stg. The job refreshes the table history_assets with only new records. @@ -203,15 +175,6 @@ cluster=True, create=True, ) -dedup_assets_pub_new_task = build_bq_insert_job( - dag, - public_project, - public_dataset_new, - table_names["assets"], - partition=True, - cluster=True, - create=True, -) ( time_task @@ -221,7 +184,6 @@ >> send_ledgers_to_bq_task ) ledger_export_task >> delete_old_ledger_pub_task >> send_ledgers_to_pub_task -ledger_export_task >> delete_old_ledger_pub_new_task >> send_ledgers_to_pub_new_task ( time_task >> write_asset_stats @@ -236,9 +198,3 @@ >> send_assets_to_pub_task >> dedup_assets_pub_task ) -( - asset_export_task - >> delete_old_asset_pub_new_task - >> send_assets_to_pub_new_task - >> dedup_assets_pub_new_task -) From 8854f7b1f67b13d48196920c99ff0983c41745f9 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Mon, 28 Aug 2023 09:17:54 -0500 Subject: [PATCH 13/28] Update futurenet schemas --- .../build_gcs_to_bq_task.py | 2 + schemas/contract_data_schema.json | 173 +++++++++--------- schemas/history_operations_schema.json | 15 ++ schemas/history_transactions_schema.json | 30 +++ 4 files changed, 138 insertions(+), 82 deletions(-) diff --git a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py index 6d1a119a..93421518 100644 --- a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py +++ b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py @@ -130,6 +130,7 @@ def build_gcs_to_bq_task( destination_project_dataset_table=f"{project_name}.{dataset_name}.{data_type}{staging_table_suffix}", write_disposition="WRITE_APPEND", create_disposition="CREATE_IF_NEEDED", + schema_update_option="ALLOW_FIELD_ADDITION", max_bad_records=0, time_partitioning=time_partition, cluster_fields=cluster_fields, @@ -153,6 +154,7 @@ def build_gcs_to_bq_task( ), bucket=bucket_name, schema_fields=schema_fields, + schema_update_options=["ALLOW_FIELD_ADDITION"], autodetect=False, source_format="NEWLINE_DELIMITED_JSON", source_objects=[ diff --git a/schemas/contract_data_schema.json b/schemas/contract_data_schema.json index 795a6026..ee6bdd26 100644 --- a/schemas/contract_data_schema.json +++ b/schemas/contract_data_schema.json @@ -1,83 +1,92 @@ [ - { - "mode": "NULLABLE", - "name": "contract_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_key", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_durability", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_data_flags", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "contract_data_val", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_expiration_ledger_seq", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "asset_code", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "asset_issuer", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "balance_holder", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "balance", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "last_modified_ledger", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_entry_change", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "deleted", - "type": "BOOLEAN" - }, - { - "mode": "NULLABLE", - "name": "batch_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "batch_run_date", - "type": "DATETIME" - }, - { - "mode": "NULLABLE", - "name": "batch_insert_ts", - "type": "TIMESTAMP" - } - ] - \ No newline at end of file + { + "mode": "NULLABLE", + "name": "contract_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_key", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_durability", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_data_flags", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_data_val", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_expiration_ledger_seq", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "asset_code", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "asset_issuer", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "balance_holder", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "balance", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "contract_key_type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "address_id", + "type": "STRING" + } +] diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index 84fc1563..1812728b 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1052,6 +1052,16 @@ "mode": "NULLABLE", "name": "ledgers_to_expire", "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_code_hash", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_id", + "type": "STRING" } ], "mode": "NULLABLE", @@ -1102,5 +1112,10 @@ "mode": "NULLABLE", "name": "batch_insert_ts", "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "ledger_closed_at", + "type": "TIMESTAMP" } ] diff --git a/schemas/history_transactions_schema.json b/schemas/history_transactions_schema.json index 4fbb60bd..21473571 100644 --- a/schemas/history_transactions_schema.json +++ b/schemas/history_transactions_schema.json @@ -148,5 +148,35 @@ "mode": "REPEATED", "name": "extra_signers", "type": "string" + }, + { + "mode": "NULLABLE", + "name": "refundable_fee", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "soroban_resources_instructions", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "soroban_resources_read_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "soroban_resources_write_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "soroban_resource_extended_meta_data_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_closed_at", + "type": "TIMESTAMP" } ] From 8a4d83f11ee73e22220c9a8f268a20e4d18d6347 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 30 Aug 2023 22:22:17 -0400 Subject: [PATCH 14/28] Update soroban columns --- dags/state_table_dag.py | 3 ++- schemas/contract_data_schema.json | 18 ++++++-------- schemas/history_operations_schema.json | 25 +++++++++++++++++++ schemas/history_transactions_schema.json | 31 ++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 36e47dfd..b34900fc 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -26,7 +26,7 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 7, 25, 19, 00), + start_date=datetime.datetime(2023, 7, 21, 20, 30), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ @@ -37,6 +37,7 @@ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, + catchup=True ) file_names = Variable.get("output_file_names", deserialize_json=True) diff --git a/schemas/contract_data_schema.json b/schemas/contract_data_schema.json index 795a6026..158a5a24 100644 --- a/schemas/contract_data_schema.json +++ b/schemas/contract_data_schema.json @@ -4,11 +4,6 @@ "name": "contract_id", "type": "STRING" }, - { - "mode": "NULLABLE", - "name": "contract_key", - "type": "STRING" - }, { "mode": "NULLABLE", "name": "contract_durability", @@ -19,11 +14,6 @@ "name": "contract_data_flags", "type": "INTEGER" }, - { - "mode": "NULLABLE", - "name": "contract_data_val", - "type": "STRING" - }, { "mode": "NULLABLE", "name": "contract_expiration_ledger_seq", @@ -78,6 +68,12 @@ "mode": "NULLABLE", "name": "batch_insert_ts", "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "contract_key_type", + "type": "STRING" } + ] - \ No newline at end of file + diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index 84fc1563..59f0f740 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1048,6 +1048,26 @@ "name": "salt", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "ledgers_to_expire", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_code_hash", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledgers_to_expire", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "ledgers_to_expire", @@ -1102,5 +1122,10 @@ "mode": "NULLABLE", "name": "batch_insert_ts", "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "ledger_closed_at", + "type": "TIMESTAMP" } ] diff --git a/schemas/history_transactions_schema.json b/schemas/history_transactions_schema.json index 4fbb60bd..01c4e86e 100644 --- a/schemas/history_transactions_schema.json +++ b/schemas/history_transactions_schema.json @@ -148,5 +148,36 @@ "mode": "REPEATED", "name": "extra_signers", "type": "string" + }, + { + "mode": "REPEATED", + "name": "refundable_fee", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "soroban_resources_instructions", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "soroban_resources_read_bytes", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "soroban_resources_write_bytes", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "soroban_resource_extended_meta_data_size_bytes", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "ledger_closed_at", + "type": "TIMESTAMP" } + ] From 3d2e7a5c905a4eddca43611c2206a61c38524d3f Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 30 Aug 2023 22:35:12 -0400 Subject: [PATCH 15/28] Remove parameters from invoke host functions in operations --- schemas/history_operations_schema.json | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index 46ab97e7..4ca62d52 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1016,23 +1016,6 @@ "name": "function", "type": "STRING" }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "value", - "type": "STRING" - } - ], - "mode": "REPEATED", - "name": "parameters", - "type": "RECORD" - }, { "mode": "NULLABLE", "name": "address", From 5d280b7766db09d322e747749e8fee6f72563d7d Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 31 Aug 2023 11:57:56 -0400 Subject: [PATCH 16/28] Add more columns --- schemas/history_effects_schema.json | 7 ++++++- schemas/history_operations_schema.json | 7 +------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/schemas/history_effects_schema.json b/schemas/history_effects_schema.json index 99b486a1..254f5e2c 100644 --- a/schemas/history_effects_schema.json +++ b/schemas/history_effects_schema.json @@ -772,9 +772,14 @@ "type": "INTEGER" }, { - "mode": "REPEATED", + "mode": "NULLABLE", "name": "contract", "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_event_type", + "type": "STRING" } ], "mode": "NULLABLE", diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index 4ca62d52..de9542fd 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1023,14 +1023,9 @@ }, { "mode": "NULLABLE", - "name": "type", + "name": "from_asset", "type": "STRING" }, - { - "mode": "REPEATED", - "name": "salt", - "type": "INTEGER" - }, { "mode": "NULLABLE", "name": "ledgers_to_expire", From d4d12046a994b569fa13e064bafdd78b6ba760a3 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 7 Sep 2023 11:36:06 -0400 Subject: [PATCH 17/28] Update config settings schema --- schemas/config_settings_schema.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/schemas/config_settings_schema.json b/schemas/config_settings_schema.json index a1aa42a0..1445eb12 100644 --- a/schemas/config_settings_schema.json +++ b/schemas/config_settings_schema.json @@ -136,7 +136,7 @@ }, { "mode": "NULLABLE", - "name": "fee_propagate_data_1kb", + "name": "fee_tx_size_1kb", "type": "INTEGER" }, { @@ -279,4 +279,4 @@ "type": "TIMESTAMP" } ] - \ No newline at end of file + From 5c8026ea6f7aa476d42b3f6b98f41e23d991b707 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 19 Sep 2023 11:12:04 -0400 Subject: [PATCH 18/28] Updates for soroban --- dags/history_archive_with_captive_core_dag.py | 3 +- ...istory_archive_without_captive_core_dag.py | 7 ++- dags/state_table_dag.py | 4 +- schemas/contract_data_schema.json | 5 +++ schemas/expiration_schema.json | 43 +++++++++++++++++++ schemas/history_operations_schema.json | 13 +++--- 6 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 schemas/expiration_schema.json diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_archive_with_captive_core_dag.py index d9113260..66844bb2 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_archive_with_captive_core_dag.py @@ -23,7 +23,8 @@ dag = DAG( "history_archive_with_captive_core", default_args=get_default_dag_args(), - start_date=datetime.datetime(2022, 3, 11, 18, 30), + start_date=datetime.datetime(2023, 9, 15, 19, 30), + catchup=True, description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.", schedule_interval="*/30 * * * *", params={ diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py index e4138687..31ba2b4b 100644 --- a/dags/history_archive_without_captive_core_dag.py +++ b/dags/history_archive_without_captive_core_dag.py @@ -22,7 +22,8 @@ dag = DAG( "history_archive_without_captive_core", default_args=get_default_dag_args(), - start_date=datetime.datetime(2022, 3, 11, 18, 30), + start_date=datetime.datetime(2023, 9, 15, 19, 30), + catchup=True, description="This DAG exports ledgers, transactions, and assets from the history archive to BigQuery. Incremental Loads", schedule_interval="*/15 * * * *", params={ @@ -41,6 +42,7 @@ internal_dataset = Variable.get("bq_dataset") public_project = Variable.get("public_project") public_dataset = Variable.get("public_dataset") +public_dataset_new = Variable.get("public_dataset_new") use_testnet = ast.literal_eval(Variable.get("use_testnet")) use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) @@ -96,6 +98,9 @@ delete_old_ledger_pub_task = build_delete_data_task( dag, public_project, public_dataset, table_names["ledgers"] ) +delete_old_ledger_pub_new_task = build_delete_data_task( + dag, public_project, public_dataset_new, table_names["ledgers"] +) delete_old_asset_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["assets"] ) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index de79ea34..02ff731d 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -26,7 +26,7 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 7, 21, 20, 30), + start_date=datetime.datetime(2023, 9, 15, 19, 30), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ @@ -131,7 +131,7 @@ dag, public_project, public_dataset_new, table_names["config_settings"] ) delete_expiration_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["config_settings"] + dag, public_project, public_dataset_new, table_names["expiration"] ) """ diff --git a/schemas/contract_data_schema.json b/schemas/contract_data_schema.json index e23e193b..46a3cdda 100644 --- a/schemas/contract_data_schema.json +++ b/schemas/contract_data_schema.json @@ -29,6 +29,11 @@ "name": "asset_issuer", "type": "STRING" }, + { + "mode": "NULLABLE", + "name": "asset_type", + "type": "STRING" + }, { "mode": "NULLABLE", "name": "balance_holder", diff --git a/schemas/expiration_schema.json b/schemas/expiration_schema.json new file mode 100644 index 00000000..d9e75dc2 --- /dev/null +++ b/schemas/expiration_schema.json @@ -0,0 +1,43 @@ +[ + { + "mode": "NULLABLE", + "name": "key_hash", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "expiration_ledger_seq", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } + ] + diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index 287282f3..bc42d32a 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1021,11 +1021,6 @@ "name": "address", "type": "STRING" }, - { - "mode": "NULLABLE", - "name": "asset", - "type": "STRING" - }, { "mode": "NULLABLE", "name": "type", @@ -1033,13 +1028,17 @@ }, { "mode": "NULLABLE", - "name": "contract_id", + "name": "ledgers_to_expire", "type": "INTEGER" + }, { + "mode": "NULLABLE", + "name": "contract_id", + "type": "STRING" }, { "mode": "NULLABLE", "name": "contract_code_hash", - "type": "INTEGER" + "type": "STRING" } ], "mode": "NULLABLE", From 977e5f97b7ef2c1ff63a58e72054257b71bb494c Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 19 Sep 2023 16:05:07 -0400 Subject: [PATCH 19/28] Add diagnostic events --- dags/history_archive_with_captive_core_dag.py | 16 ++++++++++ dags/state_table_dag.py | 32 +++++++++---------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_archive_with_captive_core_dag.py index 66844bb2..49c380ef 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_archive_with_captive_core_dag.py @@ -62,6 +62,7 @@ write_trade_stats = build_batch_stats(dag, table_names["trades"]) write_effects_stats = build_batch_stats(dag, table_names["effects"]) write_tx_stats = build_batch_stats(dag, table_names["transactions"]) +write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"]) """ The export tasks call export commands on the Stellar ETL using the ledger range from the time task. @@ -112,6 +113,16 @@ use_gcs=True, resource_cfg="cc", ) +diagnostic_events_export_task = build_export_task( + dag, + "archive", + "export_diagnostic_events", + file_names["diagnostic_events"], + use_testnet=use_testnet, + use_futurenet=use_futurenet, + use_gcs=True, + resource_cfg="cc", +) """ The delete partition task checks to see if the given partition/batch id exists in @@ -386,3 +397,8 @@ ) tx_export_task >> delete_old_tx_pub_task >> send_txs_to_pub_task >> wait_on_dag tx_export_task >> delete_old_tx_pub_new_task >> send_txs_to_pub_new_task >> wait_on_dag +( + time_task + >> write_diagnostic_events_stats + >> diagnostic_events_export_task +) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 02ff731d..591d31e7 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -121,16 +121,16 @@ delete_trust_pub_new_task = build_delete_data_task( dag, public_project, public_dataset_new, table_names["trustlines"] ) -delete_contract_data_pub_new_task = build_delete_data_task( +delete_contract_data_task = build_delete_data_task( dag, public_project, public_dataset_new, table_names["contract_data"] ) -delete_contract_code_pub_new_task = build_delete_data_task( +delete_contract_code_task = build_delete_data_task( dag, public_project, public_dataset_new, table_names["contract_code"] ) -delete_config_settings_pub_new_task = build_delete_data_task( +delete_config_settings_task = build_delete_data_task( dag, public_project, public_dataset_new, table_names["config_settings"] ) -delete_expiration_pub_new_task = build_delete_data_task( +delete_expiration_task = build_delete_data_task( dag, public_project, public_dataset_new, table_names["expiration"] ) @@ -279,7 +279,7 @@ partition=True, cluster=True, ) -send_contract_data_to_pub_new_task = build_gcs_to_bq_task( +send_contract_data_to_pub_task = build_gcs_to_bq_task( dag, changes_task.task_id, public_project, @@ -289,7 +289,7 @@ partition=True, cluster=True, ) -send_contract_code_to_pub_new_task = build_gcs_to_bq_task( +send_contract_code_to_pub_task = build_gcs_to_bq_task( dag, changes_task.task_id, public_project, @@ -299,7 +299,7 @@ partition=True, cluster=True, ) -send_config_settings_to_pub_new_task = build_gcs_to_bq_task( +send_config_settings_to_pub_task = build_gcs_to_bq_task( dag, changes_task.task_id, public_project, @@ -309,7 +309,7 @@ partition=True, cluster=True, ) -send_expiration_to_pub_new_task = build_gcs_to_bq_task( +send_expiration_to_pub_task = build_gcs_to_bq_task( dag, changes_task.task_id, public_project, @@ -355,27 +355,27 @@ date_task >> changes_task >> write_contract_data_stats - >> delete_contract_data_pub_new_task - >> send_contract_data_to_pub_new_task + >> delete_contract_data_task + >> send_contract_data_to_pub_task ) ( date_task >> changes_task >> write_contract_code_stats - >> delete_contract_code_pub_new_task - >> send_contract_code_to_pub_new_task + >> delete_contract_code_task + >> send_contract_code_to_pub_task ) ( date_task >> changes_task >> write_config_settings_stats - >> delete_config_settings_pub_new_task - >> send_config_settings_to_pub_new_task + >> delete_config_settings_task + >> send_config_settings_to_pub_task ) ( date_task >> changes_task >> write_expiration_stats - >> delete_expiration_pub_new_task - >> send_expiration_to_pub_new_task + >> delete_expiration_task + >> send_expiration_to_pub_task ) From 82a91e46cf0e1c2acefe87e213852cea26940eb6 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 10:44:17 -0400 Subject: [PATCH 20/28] Update start dates --- dags/history_archive_with_captive_core_dag.py | 2 +- dags/history_archive_without_captive_core_dag.py | 2 +- dags/state_table_dag.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_archive_with_captive_core_dag.py index 49c380ef..075f1de4 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_archive_with_captive_core_dag.py @@ -23,7 +23,7 @@ dag = DAG( "history_archive_with_captive_core", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 15, 19, 30), + start_date=datetime.datetime(2023, 9, 20, 15, 0), catchup=True, description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.", schedule_interval="*/30 * * * *", diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py index 31ba2b4b..4dfef794 100644 --- a/dags/history_archive_without_captive_core_dag.py +++ b/dags/history_archive_without_captive_core_dag.py @@ -22,7 +22,7 @@ dag = DAG( "history_archive_without_captive_core", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 15, 19, 30), + start_date=datetime.datetime(2023, 9, 20, 15, 0), catchup=True, description="This DAG exports ledgers, transactions, and assets from the history archive to BigQuery. Incremental Loads", schedule_interval="*/15 * * * *", diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 591d31e7..1201d3a9 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -26,7 +26,7 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 15, 19, 30), + start_date=datetime.datetime(2023, 9, 20, 15, 0), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ From aa03620003573dae4b3a88d83a4b1cc6f20a3aa0 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 11:17:34 -0400 Subject: [PATCH 21/28] fix type --- schemas/history_operations_schema.json | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index bc42d32a..d02e8777 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1011,6 +1011,23 @@ "name": "asset_balance_changes", "type": "RECORD" }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "value", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "parameters", + "type": "RECORD" + }, { "mode": "NULLABLE", "name": "function", From b2b28053d889f96f87f35b7a073b01bb56e4eb52 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 15:06:31 -0400 Subject: [PATCH 22/28] fix schemas --- schemas/config_settings_schema.json | 20 ++++++++++---------- schemas/contract_code_schema.json | 10 ---------- schemas/contract_data_schema.json | 16 +++------------- 3 files changed, 13 insertions(+), 33 deletions(-) diff --git a/schemas/config_settings_schema.json b/schemas/config_settings_schema.json index 1445eb12..56e1fadd 100644 --- a/schemas/config_settings_schema.json +++ b/schemas/config_settings_schema.json @@ -86,27 +86,22 @@ }, { "mode": "NULLABLE", - "name": "fee_write_1kb", + "name": "bucket_list_target_size_bytes", "type": "INTEGER" }, { "mode": "NULLABLE", - "name": "bucket_list_size_bytes", + "name": "write_fee_1kb_bucket_list_low", "type": "INTEGER" }, { "mode": "NULLABLE", - "name": "bucket_list_fee_rate_low", + "name": "write_fee_1kb_bucket_list_high", "type": "INTEGER" }, { "mode": "NULLABLE", - "name": "bucket_list_fee_rate_high", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "bucket_list_growth_factor", + "name": "bucket_list_write_fee_growth_factor", "type": "INTEGER" }, { @@ -126,7 +121,7 @@ }, { "mode": "NULLABLE", - "name": "ledger_max_propagate_size_bytes", + "name": "ledger_max_txs_size_bytes", "type": "INTEGER" }, { @@ -238,6 +233,11 @@ "name": "eviction_scan_size", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "starting_eviction_scan_level", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "ledger_max_tx_count", diff --git a/schemas/contract_code_schema.json b/schemas/contract_code_schema.json index 00091c4f..ddf498d2 100644 --- a/schemas/contract_code_schema.json +++ b/schemas/contract_code_schema.json @@ -9,16 +9,6 @@ "name": "contract_code_ext_v", "type": "INTEGER" }, - { - "mode": "NULLABLE", - "name": "contract_code_expiration_ledger_seq", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "contract_code_entry_body_type", - "type": "STRING" - }, { "mode": "NULLABLE", "name": "last_modified_ledger", diff --git a/schemas/contract_data_schema.json b/schemas/contract_data_schema.json index 46a3cdda..686b63f9 100644 --- a/schemas/contract_data_schema.json +++ b/schemas/contract_data_schema.json @@ -6,18 +6,13 @@ }, { "mode": "NULLABLE", - "name": "contract_durability", + "name": "contract_key_type", "type": "STRING" }, { "mode": "NULLABLE", - "name": "contract_data_flags", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "contract_expiration_ledger_seq", - "type": "INTEGER" + "name": "contract_durability", + "type": "STRING" }, { "mode": "NULLABLE", @@ -73,10 +68,5 @@ "mode": "NULLABLE", "name": "batch_insert_ts", "type": "TIMESTAMP" - }, - { - "mode": "NULLABLE", - "name": "contract_key_type", - "type": "STRING" } ] From cb8a4e8ca709538f4e34aba429099609a96dbaa9 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 18:26:28 -0400 Subject: [PATCH 23/28] update vars --- airflow_variables_dev.json | 40 +++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index fa88d88c..966942f6 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -78,6 +78,18 @@ "transaction_id", "account", "type" + ], + "contract_data": [ + "last_modified_ledger" + ], + "contract_code": [ + "last_modified_ledger" + ], + "config_settings": [ + "last_modified_ledger" + ], + "expiration": [ + "last_modified_ledger" ] }, "dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", @@ -104,7 +116,7 @@ "dbt_token_uri": "https://oauth2.googleapis.com/token", "gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:6211230", + "image_name": "chowbao/stellar-etl:testnet-noUsdDb", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", @@ -130,7 +142,12 @@ "signers": "account_signers.txt", "trades": "trades.txt", "transactions": "transactions.txt", - "trustlines": "trustlines.txt" + "trustlines": "trustlines.txt", + "contract_data": "contract_data.txt", + "contract_code": "contract_code.txt", + "config_settings": "config_settings.txt", + "expiration": "expiration.txt", + "diagnostic_events": "diagnostic_events.txt" }, "output_path": "/home/airflow/gcs/data/", "owner": "SDF", @@ -194,6 +211,22 @@ "mgi": { "field": "tran_evnt_date", "type": "MONTH" + }, + "contract_data": { + "field": "batch_run_date", + "type": "MONTH" + }, + "contract_code": { + "field": "batch_run_date", + "type": "MONTH" + }, + "config_settings": { + "field": "batch_run_date", + "type": "MONTH" + }, + "expiration": { + "field": "batch_run_date", + "type": "MONTH" } }, "partners_data": { @@ -268,5 +301,6 @@ "use_testnet": "True", "sandbox_dataset": "crypto_stellar_internal_sandbox", "volume_config": {}, - "volume_name": "etl-data" + "volume_name": "etl-data", + "use_futurenet": "False" } From c5c03e460db88decd8761461e6f9693f07eac313 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 18:36:55 -0400 Subject: [PATCH 24/28] format --- airflow_variables_dev.json | 16 +- dags/history_archive_with_captive_core_dag.py | 6 +- ...istory_archive_without_captive_core_dag.py | 2 +- dags/state_table_dag.py | 4 +- dags/stellar_etl_airflow/build_export_task.py | 7 +- dags/stellar_etl_airflow/build_time_task.py | 6 +- schemas/config_settings_schema.json | 561 +++++++++--------- schemas/contract_code_schema.json | 83 ++- schemas/contract_data_schema.json | 142 ++--- schemas/expiration_schema.json | 83 ++- schemas/history_operations_schema.json | 3 +- 11 files changed, 454 insertions(+), 459 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 848f36f9..49105c4f 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -79,18 +79,10 @@ "account", "type" ], - "contract_data": [ - "last_modified_ledger" - ], - "contract_code": [ - "last_modified_ledger" - ], - "config_settings": [ - "last_modified_ledger" - ], - "expiration": [ - "last_modified_ledger" - ] + "contract_data": ["last_modified_ledger"], + "contract_code": ["last_modified_ledger"], + "config_settings": ["last_modified_ledger"], + "expiration": ["last_modified_ledger"] }, "dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth", diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_archive_with_captive_core_dag.py index 075f1de4..f337dbe1 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_archive_with_captive_core_dag.py @@ -397,8 +397,4 @@ ) tx_export_task >> delete_old_tx_pub_task >> send_txs_to_pub_task >> wait_on_dag tx_export_task >> delete_old_tx_pub_new_task >> send_txs_to_pub_new_task >> wait_on_dag -( - time_task - >> write_diagnostic_events_stats - >> diagnostic_events_export_task -) +(time_task >> write_diagnostic_events_stats >> diagnostic_events_export_task) diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py index 4dfef794..68d15eee 100644 --- a/dags/history_archive_without_captive_core_dag.py +++ b/dags/history_archive_without_captive_core_dag.py @@ -44,7 +44,7 @@ public_dataset = Variable.get("public_dataset") public_dataset_new = Variable.get("public_dataset_new") use_testnet = ast.literal_eval(Variable.get("use_testnet")) -use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) +use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) """ The time task reads in the execution time of the current run, as well as the next diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 1201d3a9..62f83ea5 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -26,7 +26,7 @@ dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 20, 15, 0), + start_date=datetime.datetime(2023, 9, 20, 15, 0), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ @@ -37,7 +37,7 @@ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, - catchup=True + catchup=True, ) file_names = Variable.get("output_file_names", deserialize_json=True) diff --git a/dags/stellar_etl_airflow/build_export_task.py b/dags/stellar_etl_airflow/build_export_task.py index 360361fb..ec0a02c0 100644 --- a/dags/stellar_etl_airflow/build_export_task.py +++ b/dags/stellar_etl_airflow/build_export_task.py @@ -41,7 +41,12 @@ def select_correct_filename(cmd_type, base_name, batched_name): def generate_etl_cmd( - command, base_filename, cmd_type, use_gcs=False, use_testnet=False, use_futurenet=False + command, + base_filename, + cmd_type, + use_gcs=False, + use_testnet=False, + use_futurenet=False, ): """ Runs the provided stellar-etl command with arguments that are appropriate for the command type. diff --git a/dags/stellar_etl_airflow/build_time_task.py b/dags/stellar_etl_airflow/build_time_task.py index 6d662ba6..87f31242 100644 --- a/dags/stellar_etl_airflow/build_time_task.py +++ b/dags/stellar_etl_airflow/build_time_task.py @@ -13,7 +13,11 @@ def build_time_task( - dag, use_testnet=False, use_next_exec_time=True, resource_cfg="default", use_futurenet=False + dag, + use_testnet=False, + use_next_exec_time=True, + resource_cfg="default", + use_futurenet=False, ): """ Creates a task to run the get_ledger_range_from_times command from the stellar-etl Docker image. The start time is the previous diff --git a/schemas/config_settings_schema.json b/schemas/config_settings_schema.json index 56e1fadd..5c70a979 100644 --- a/schemas/config_settings_schema.json +++ b/schemas/config_settings_schema.json @@ -1,282 +1,281 @@ [ - { - "mode": "NULLABLE", - "name": "config_setting_id", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "contract_max_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_instructions", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_instructions", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_rate_per_instructions_increment", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_memory_limit", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_read_ledger_entries", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_read_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_write_ledger_entries", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_write_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_read_ledger_entries", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_read_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_write_ledger_entries", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_write_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_read_ledger_entry", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_write_ledger_entry", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_read_1kb", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "bucket_list_target_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "write_fee_1kb_bucket_list_low", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "write_fee_1kb_bucket_list_high", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "bucket_list_write_fee_growth_factor", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_historical_1kb", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_extended_meta_data_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_extended_meta_data_1kb", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_txs_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "tx_max_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "fee_tx_size_1kb", - "type": "INTEGER" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "ExtV", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "ConstTerm", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "LinearTerm", - "type": "STRING" - } - ], - "mode": "REPEATED", - "name": "contract_cost_params_cpu_insns", - "type": "RECORD" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "ExtV", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "ConstTerm", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "LinearTerm", - "type": "STRING" - } - ], - "mode": "REPEATED", - "name": "contract_cost_params_mem_bytes", - "type": "RECORD" - }, - { - "mode": "NULLABLE", - "name": "contract_data_key_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "contract_data_entry_size_bytes", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "max_entry_expiration", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "min_temp_entry_expiration", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "min_persistent_entry_expiration", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "auto_bump_ledgers", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "persistent_rent_rate_denominator", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "temp_rent_rate_denominator", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "max_entries_to_expire", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "bucket_list_size_window_sample_size", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "eviction_scan_size", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "starting_eviction_scan_level", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_max_tx_count", - "type": "INTEGER" - }, - { - "mode": "REPEATED", - "name": "bucket_list_size_window", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "last_modified_ledger", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_entry_change", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "deleted", - "type": "BOOLEAN" - }, - { - "mode": "NULLABLE", - "name": "batch_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "batch_run_date", - "type": "DATETIME" - }, - { - "mode": "NULLABLE", - "name": "batch_insert_ts", - "type": "TIMESTAMP" - } - ] - + { + "mode": "NULLABLE", + "name": "config_setting_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_max_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_instructions", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_instructions", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_rate_per_instructions_increment", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_memory_limit", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_read_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_read_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_write_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_write_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_read_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_read_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_write_ledger_entries", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_write_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_read_ledger_entry", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_write_ledger_entry", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_read_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_target_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "write_fee_1kb_bucket_list_low", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "write_fee_1kb_bucket_list_high", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_write_fee_growth_factor", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_historical_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_extended_meta_data_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_extended_meta_data_1kb", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_txs_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "tx_max_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "fee_tx_size_1kb", + "type": "INTEGER" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "ExtV", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "ConstTerm", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "LinearTerm", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "contract_cost_params_cpu_insns", + "type": "RECORD" + }, + { + "fields": [ + { + "mode": "NULLABLE", + "name": "ExtV", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "ConstTerm", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "LinearTerm", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "contract_cost_params_mem_bytes", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "contract_data_key_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "contract_data_entry_size_bytes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "max_entry_expiration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "min_temp_entry_expiration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "min_persistent_entry_expiration", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "auto_bump_ledgers", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "persistent_rent_rate_denominator", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "temp_rent_rate_denominator", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "max_entries_to_expire", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "bucket_list_size_window_sample_size", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "eviction_scan_size", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "starting_eviction_scan_level", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_max_tx_count", + "type": "INTEGER" + }, + { + "mode": "REPEATED", + "name": "bucket_list_size_window", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } +] diff --git a/schemas/contract_code_schema.json b/schemas/contract_code_schema.json index ddf498d2..7c141cc9 100644 --- a/schemas/contract_code_schema.json +++ b/schemas/contract_code_schema.json @@ -1,43 +1,42 @@ [ - { - "mode": "NULLABLE", - "name": "contract_code_hash", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_code_ext_v", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "last_modified_ledger", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_entry_change", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "deleted", - "type": "BOOLEAN" - }, - { - "mode": "NULLABLE", - "name": "batch_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "batch_run_date", - "type": "DATETIME" - }, - { - "mode": "NULLABLE", - "name": "batch_insert_ts", - "type": "TIMESTAMP" - } - ] - + { + "mode": "NULLABLE", + "name": "contract_code_hash", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_code_ext_v", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } +] diff --git a/schemas/contract_data_schema.json b/schemas/contract_data_schema.json index 686b63f9..351e0f9c 100644 --- a/schemas/contract_data_schema.json +++ b/schemas/contract_data_schema.json @@ -1,72 +1,72 @@ [ - { - "mode": "NULLABLE", - "name": "contract_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_key_type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "contract_durability", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "asset_code", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "asset_issuer", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "asset_type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "balance_holder", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "balance", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "last_modified_ledger", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_entry_change", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "deleted", - "type": "BOOLEAN" - }, - { - "mode": "NULLABLE", - "name": "batch_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "batch_run_date", - "type": "DATETIME" - }, - { - "mode": "NULLABLE", - "name": "batch_insert_ts", - "type": "TIMESTAMP" - } - ] + { + "mode": "NULLABLE", + "name": "contract_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_key_type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "contract_durability", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "asset_code", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "asset_issuer", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "asset_type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "balance_holder", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "balance", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } +] diff --git a/schemas/expiration_schema.json b/schemas/expiration_schema.json index d9e75dc2..e1389a97 100644 --- a/schemas/expiration_schema.json +++ b/schemas/expiration_schema.json @@ -1,43 +1,42 @@ [ - { - "mode": "NULLABLE", - "name": "key_hash", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "expiration_ledger_seq", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "last_modified_ledger", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "ledger_entry_change", - "type": "INTEGER" - }, - { - "mode": "NULLABLE", - "name": "deleted", - "type": "BOOLEAN" - }, - { - "mode": "NULLABLE", - "name": "batch_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "batch_run_date", - "type": "DATETIME" - }, - { - "mode": "NULLABLE", - "name": "batch_insert_ts", - "type": "TIMESTAMP" - } - ] - + { + "mode": "NULLABLE", + "name": "key_hash", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "expiration_ledger_seq", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "last_modified_ledger", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "ledger_entry_change", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "deleted", + "type": "BOOLEAN" + }, + { + "mode": "NULLABLE", + "name": "batch_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "batch_run_date", + "type": "DATETIME" + }, + { + "mode": "NULLABLE", + "name": "batch_insert_ts", + "type": "TIMESTAMP" + } +] diff --git a/schemas/history_operations_schema.json b/schemas/history_operations_schema.json index d02e8777..bd1678fb 100644 --- a/schemas/history_operations_schema.json +++ b/schemas/history_operations_schema.json @@ -1047,7 +1047,8 @@ "mode": "NULLABLE", "name": "ledgers_to_expire", "type": "INTEGER" - }, { + }, + { "mode": "NULLABLE", "name": "contract_id", "type": "STRING" From c71c87e74037781d1f13a6a447b6365cbc25da88 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 20:57:11 -0400 Subject: [PATCH 25/28] Add prod variables --- airflow_variables.json | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/airflow_variables.json b/airflow_variables.json index c43b79b7..a37a5cd6 100644 --- a/airflow_variables.json +++ b/airflow_variables.json @@ -91,7 +91,11 @@ "transaction_id", "account", "type" - ] + ], + "contract_data": ["last_modified_ledger"], + "contract_code": ["last_modified_ledger"], + "config_settings": ["last_modified_ledger"], + "expiration": ["last_modified_ledger"] }, "dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth", @@ -153,7 +157,12 @@ "signers": "account_signers.txt", "trades": "trades.txt", "transactions": "transactions.txt", - "trustlines": "trustlines.txt" + "trustlines": "trustlines.txt", + "contract_data": "contract_data.txt", + "contract_code": "contract_code.txt", + "config_settings": "config_settings.txt", + "expiration": "expiration.txt", + "diagnostic_events": "diagnostic_events.txt" }, "output_path": "/home/airflow/gcs/data/", "owner": "SDF", @@ -213,6 +222,22 @@ "trust_lines": { "type": "MONTH", "field": "batch_run_date" + }, + "contract_data": { + "field": "batch_run_date", + "type": "MONTH" + }, + "contract_code": { + "field": "batch_run_date", + "type": "MONTH" + }, + "config_settings": { + "field": "batch_run_date", + "type": "MONTH" + }, + "expiration": { + "field": "batch_run_date", + "type": "MONTH" } }, "public_dataset": "crypto_stellar_2", @@ -289,6 +314,7 @@ } }, "partners_bucket": "ext-partner-sftp", + "use_futurenet": "False", "currency_ohlc": { "currency": "euro_ohlc", "table_name": "euro_usd_ohlc", From ea701ddeeec99e76e4b57afb3cbc294a67170d27 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 21:16:14 -0400 Subject: [PATCH 26/28] Fix variables --- airflow_variables.json | 7 ++++++- airflow_variables_dev.json | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/airflow_variables.json b/airflow_variables.json index a37a5cd6..cda380d0 100644 --- a/airflow_variables.json +++ b/airflow_variables.json @@ -283,7 +283,12 @@ "trades": "history_trades", "transactions": "history_transactions", "trustlines": "trust_lines", - "enriched_history_operations": "enriched_history_operations" + "enriched_history_operations": "enriched_history_operations", + "contract_data": "contract_data", + "contract_code": "contract_code", + "config_settings": "config_settings", + "expiration": "expiration", + "diagnostic_events": "diagnostic_events" }, "task_timeout": { "build_batch_stats": 180, diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 49105c4f..ab33dbac 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -271,7 +271,12 @@ "trades": "history_trades", "transactions": "history_transactions", "trustlines": "trust_lines", - "enriched_history_operations": "enriched_history_operations" + "enriched_history_operations": "enriched_history_operations", + "contract_data": "contract_data", + "contract_code": "contract_code", + "config_settings": "config_settings", + "expiration": "expiration", + "diagnostic_events": "diagnostic_events" }, "task_timeout": { "build_batch_stats": 180, From 24cdda387ab44d4c90d33b8a89883bfeaa189356 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 20 Sep 2023 22:39:48 -0400 Subject: [PATCH 27/28] Update schema --- schemas/config_settings_schema.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/schemas/config_settings_schema.json b/schemas/config_settings_schema.json index 5c70a979..cb3eb2e9 100644 --- a/schemas/config_settings_schema.json +++ b/schemas/config_settings_schema.json @@ -111,12 +111,12 @@ }, { "mode": "NULLABLE", - "name": "tx_max_extended_meta_data_size_bytes", + "name": "tx_max_contract_events_size_bytes", "type": "INTEGER" }, { "mode": "NULLABLE", - "name": "fee_extended_meta_data_1kb", + "name": "fee_contract_events_1kb", "type": "INTEGER" }, { From 35c15d7eb31f0124242dc6b2a30242e1162062e5 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Fri, 13 Oct 2023 15:18:29 -0500 Subject: [PATCH 28/28] Adjust DAGs for aggregate table changes --- airflow_variables.json | 4 ++-- airflow_variables_dev.json | 2 +- dags/asset_pricing_pipeline_dag.py | 1 + dags/marts_tables_dag.py | 19 ++++++++++--------- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/airflow_variables.json b/airflow_variables.json index d2e9a992..fbca13cd 100644 --- a/airflow_variables.json +++ b/airflow_variables.json @@ -116,7 +116,7 @@ "partnership_assets__asset_activity_fact": false, "trade_agg": false }, - "dbt_image_name": "stellar/stellar-dbt:0643adc", + "dbt_image_name": "stellar/stellar-dbt:011d897", "dbt_job_execution_timeout_seconds": 6000, "dbt_job_retries": 1, "dbt_keyfile_profile": "", @@ -130,7 +130,7 @@ "dbt_token_uri": "https://oauth2.googleapis.com/token", "gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:6211230", + "image_name": "stellar/stellar-etl:e9c803c", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 8da2f0ad..fbdc4f87 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -94,7 +94,7 @@ "partnership_assets__account_holders_activity_fact": true, "partnership_assets__asset_activity_fact": true }, - "dbt_image_name": "stellar/stellar-dbt:0643adc", + "dbt_image_name": "stellar/stellar-dbt:011d897", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, "dbt_keyfile_profile": "", diff --git a/dags/asset_pricing_pipeline_dag.py b/dags/asset_pricing_pipeline_dag.py index b154a6fe..14120229 100644 --- a/dags/asset_pricing_pipeline_dag.py +++ b/dags/asset_pricing_pipeline_dag.py @@ -1,4 +1,5 @@ import datetime +import json from airflow import DAG from airflow.models.variable import Variable diff --git a/dags/marts_tables_dag.py b/dags/marts_tables_dag.py index 98626d38..5c447d50 100644 --- a/dags/marts_tables_dag.py +++ b/dags/marts_tables_dag.py @@ -10,16 +10,16 @@ dag = DAG( "marts_tables", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 4, 4, 0, 0), + start_date=datetime.datetime(2015, 9, 30), description="This DAG runs dbt to create the tables for the models in marts/ but not any marts subdirectories.", schedule_interval="0 11 * * *", # Daily 11 AM UTC params={}, - catchup=False, + catchup=True, + max_active_runs=1, ) # tasks for staging tables for marts stg_history_transactions = build_dbt_task(dag, "stg_history_transactions") -stg_history_ledgers = build_dbt_task(dag, "stg_history_ledgers") stg_history_assets = build_dbt_task(dag, "stg_history_assets") stg_history_trades = build_dbt_task(dag, "stg_history_trades") @@ -31,11 +31,12 @@ # tasks for intermediate asset stats tables int_meaningful_asset_prices = build_dbt_task(dag, "int_meaningful_asset_prices") +int_asset_stats_agg = build_dbt_task(dag, "int_asset_stats_agg") stg_excluded_accounts = build_dbt_task(dag, "stg_excluded_accounts") stg_xlm_to_usd = build_dbt_task(dag, "stg_xlm_to_usd") # tasks for marts tables -agg_network_stats = build_dbt_task(dag, "agg_network_stats") +network_stats_agg = build_dbt_task(dag, "network_stats_agg") asset_stats_agg = build_dbt_task(dag, "asset_stats_agg") fee_stats_agg = build_dbt_task(dag, "fee_stats_agg") history_assets = build_dbt_task(dag, "history_assets") @@ -44,14 +45,14 @@ # DAG task graph # graph for marts tables -agg_network_stats +network_stats_agg liquidity_providers -int_meaningful_asset_prices >> asset_stats_agg -stg_excluded_accounts >> asset_stats_agg -stg_xlm_to_usd >> asset_stats_agg +int_meaningful_asset_prices >> int_asset_stats_agg +stg_excluded_accounts >> int_asset_stats_agg +stg_xlm_to_usd >> int_asset_stats_agg +int_asset_stats_agg >> asset_stats_agg stg_history_transactions >> fee_stats_agg -stg_history_ledgers >> fee_stats_agg stg_history_assets >> history_assets