From 752697c6c5a04aadb2ccdf00a425fb2f2a111a73 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Mon, 27 Feb 2023 10:19:35 +0100 Subject: [PATCH] [Batch Rewards] Schema & ETL Process for Sync (#28) This defined the "schema" for what will be uploaded to Dune. We reused/generalize some common code for the order rewards sync. More or less, this was a minor change. Still remaining todo for parsing the bytea[] type with participants field (since duneV2 uses lower case hex strings). We will want to replace all \x coming from postgres with 0x. --- src/fetch/orderbook.py | 21 +++++- src/main.py | 9 ++- src/models/batch_rewards_schema.py | 38 ++++++++++ src/models/tables.py | 1 + src/sql/orderbook/batch_rewards.sql | 13 +++- src/sync/order_rewards.py | 86 ++++++++++++++++++----- tests/integration/test_fetch_orderbook.py | 2 +- 7 files changed, 146 insertions(+), 24 deletions(-) create mode 100644 src/models/batch_rewards_schema.py diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index d2142ae2..1a70d313 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -75,9 +75,9 @@ def get_latest_block(cls) -> int: return min(int(barn["latest"][0]), int(prod["latest"][0])) - REORG_THRESHOLD @classmethod - def get_orderbook_rewards(cls, block_range: BlockRange) -> DataFrame: + def get_order_rewards(cls, block_range: BlockRange) -> DataFrame: """ - Fetches and validates Orderbook Reward DataFrame as concatenation from Prod and Staging DB + Fetches and validates Order Reward DataFrame as concatenation from Prod and Staging DB """ cow_reward_query = ( open_query("orderbook/order_rewards.sql") @@ -90,3 +90,20 @@ def get_orderbook_rewards(cls, block_range: BlockRange) -> DataFrame: # Solvers do not appear in both environments! assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!" return pd.concat([prod, barn]) + + @classmethod + def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Batch Rewards DataFrame as concatenation from Prod and Staging DB + """ + cow_reward_query = ( + open_query("orderbook/batch_rewards.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + ) + data_types = {"block_number": "int64"} + barn, prod = cls._query_both_dbs(cow_reward_query, data_types) + + # Solvers do not appear in both environments! + assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!" + return pd.concat([prod, barn]) diff --git a/src/main.py b/src/main.py index d070f301..8155c9b9 100644 --- a/src/main.py +++ b/src/main.py @@ -14,7 +14,7 @@ from src.post.aws import AWSClient from src.sync import sync_app_data from src.sync.config import SyncConfig, AppDataSyncConfig -from src.sync.order_rewards import sync_order_rewards +from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards log = logging.getLogger(__name__) logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s") @@ -72,8 +72,15 @@ def __init__(self) -> None: elif args.sync_table == SyncTable.ORDER_REWARDS: sync_order_rewards( aws, + config=SyncConfig(volume_path), fetcher=OrderbookFetcher(), + dry_run=args.dry_run, + ) + elif args.sync_table == SyncTable.BATCH_REWARDS: + sync_batch_rewards( + aws, config=SyncConfig(volume_path), + fetcher=OrderbookFetcher(), dry_run=args.dry_run, ) else: diff --git a/src/models/batch_rewards_schema.py b/src/models/batch_rewards_schema.py new file mode 100644 index 00000000..d134c7ce --- /dev/null +++ b/src/models/batch_rewards_schema.py @@ -0,0 +1,38 @@ +"""Model for Batch Rewards Data""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from pandas import DataFrame + + +@dataclass +class BatchRewards: + """ + This class provides a transformation interface for the Dataframe we fetch from the orderbook + """ + + @classmethod + def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]: + """Converts Pandas DataFrame into the expected stream type for Dune""" + return [ + { + "block_number": int(row["block_number"]), + "tx_hash": row["tx_hash"], + "solver": row["solver"], + "data": { + # All the following values are in WEI. + "reward_eth": str(row["reward_eth"]), + "execution_cost": str(row["execution_cost"]), + "surplus": str(row["surplus"]), + "fee": str(row["fee"]), + "winning_score": str(row["winning_score"]), + "reference_score": str(row["reference_score"]), + # TODO - Not sure yet how to parse this bytea[] + # Will need to experiment with this. + "participating_solvers": row["participating_solvers"], + }, + } + for row in rewards_df.to_dict(orient="records") + ] diff --git a/src/models/tables.py b/src/models/tables.py index b8d6c59f..08068f6c 100644 --- a/src/models/tables.py +++ b/src/models/tables.py @@ -7,6 +7,7 @@ class SyncTable(Enum): APP_DATA = "app_data" ORDER_REWARDS = "order_rewards" + BATCH_REWARDS = "batch_rewards" def __str__(self) -> str: return str(self.value) diff --git a/src/sql/orderbook/batch_rewards.sql b/src/sql/orderbook/batch_rewards.sql index 6b3dcc38..4a748d61 100644 --- a/src/sql/orderbook/batch_rewards.sql +++ b/src/sql/orderbook/batch_rewards.sql @@ -54,4 +54,15 @@ LEFT OUTER JOIN observed_settlements os ON os.auction_id = ss.auction_id ) -SELECT * FROM reward_data +SELECT + block_number, + concat('0x', encode(tx_hash, 'hex')) as tx_hash, + concat('0x', encode(solver, 'hex')) as solver, + execution_cost, + surplus, + fee, + surplus + fee - reference_score as reward_eth, + winning_score, + reference_score, + participants as participating_solvers +FROM reward_data diff --git a/src/sync/order_rewards.py b/src/sync/order_rewards.py index 627407f0..8312fbf8 100644 --- a/src/sync/order_rewards.py +++ b/src/sync/order_rewards.py @@ -1,10 +1,11 @@ """Main Entry point for app_hash sync""" +from typing import Any from dune_client.file.interface import FileIO -from pandas import DataFrame from src.fetch.orderbook import OrderbookFetcher from src.logger import set_log +from src.models.batch_rewards_schema import BatchRewards from src.models.block_range import BlockRange from src.models.order_rewards_schema import OrderRewards from src.models.tables import SyncTable @@ -16,10 +17,10 @@ log = set_log(__name__) -SYNC_TABLE = SyncTable.ORDER_REWARDS - -class OrderbookDataHandler(RecordHandler): # pylint:disable=too-few-public-methods +class OrderbookDataHandler( + RecordHandler +): # pylint:disable=too-few-public-methods,too-many-arguments """ This class is responsible for consuming new dune records and missing values from previous runs it attempts to fetch content for them and filters them into "found" and "not found" as necessary @@ -29,21 +30,20 @@ def __init__( self, file_manager: FileIO, block_range: BlockRange, + sync_table: SyncTable, config: SyncConfig, - order_rewards: DataFrame, + data_list: list[dict[str, Any]], ): - super().__init__(block_range, SYNC_TABLE, config) - log.info(f"Handling {len(order_rewards)} new records") + super().__init__(block_range, sync_table, config) + log.info(f"Handling {len(data_list)} new records") self.file_manager = file_manager - self.order_rewards = OrderRewards.from_pdf_to_dune_records(order_rewards) + self.data_list = data_list def num_records(self) -> int: - return len(self.order_rewards) + return len(self.data_list) def write_found_content(self) -> None: - self.file_manager.write_ndjson( - data=self.order_rewards, name=self.content_filename - ) + self.file_manager.write_ndjson(data=self.data_list, name=self.content_filename) def write_sync_data(self) -> None: # Only write these if upload was successful. @@ -56,23 +56,71 @@ def write_sync_data(self) -> None: def sync_order_rewards( aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool ) -> None: - """App Data Sync Logic""" - + """Order Rewards Data Sync Logic""" + sync_table = SyncTable.ORDER_REWARDS block_range = BlockRange( block_from=last_sync_block( aws, - table=SYNC_TABLE, + table=sync_table, genesis_block=15719994, # First Recorded Order Reward block ), block_to=fetcher.get_latest_block(), ) + sync_orderbook_data( + aws, + block_range, + config, + dry_run, + sync_table=sync_table, + data_list=OrderRewards.from_pdf_to_dune_records( + fetcher.get_order_rewards(block_range) + ), + ) + + +def sync_batch_rewards( + aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool +) -> None: + """Batch Reward Sync Logic""" + sync_table = SyncTable.BATCH_REWARDS + block_range = BlockRange( + block_from=last_sync_block( + aws, + table=sync_table, + genesis_block=16691686, # First Recorded Batch Reward block + # TODO - use correct genesis block here. (note this is not actually determined yet) + ), + block_to=fetcher.get_latest_block(), + ) + sync_orderbook_data( + aws, + block_range, + config, + dry_run, + sync_table, + data_list=BatchRewards.from_pdf_to_dune_records( + fetcher.get_batch_rewards(block_range) + ), + ) + + +def sync_orderbook_data( # pylint:disable=too-many-arguments + aws: AWSClient, + block_range: BlockRange, + config: SyncConfig, + dry_run: bool, + sync_table: SyncTable, + data_list: list[dict[str, Any]], +) -> None: + """Generic Orderbook Sync Logic""" record_handler = OrderbookDataHandler( - file_manager=FileIO(config.volume_path / str(SYNC_TABLE)), + file_manager=FileIO(config.volume_path / str(sync_table)), block_range=block_range, config=config, - order_rewards=fetcher.get_orderbook_rewards(block_range), + data_list=data_list, + sync_table=sync_table, ) - UploadHandler(aws, record_handler, table=SYNC_TABLE).write_and_upload_content( + UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content( dry_run ) - log.info("order_rewards sync run completed successfully") + log.info(f"{sync_table} sync run completed successfully") diff --git a/tests/integration/test_fetch_orderbook.py b/tests/integration/test_fetch_orderbook.py index b30174e1..d2dee2c0 100644 --- a/tests/integration/test_fetch_orderbook.py +++ b/tests/integration/test_fetch_orderbook.py @@ -17,7 +17,7 @@ def test_latest_block_increasing(self): def test_get_order_rewards(self): block_number = 16000000 block_range = BlockRange(block_number, block_number + 50) - rewards_df = OrderbookFetcher.get_orderbook_rewards(block_range) + rewards_df = OrderbookFetcher.get_order_rewards(block_range) expected = pd.DataFrame( { "block_number": [16000018, 16000050],