Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
[Batch Rewards] Schema & ETL Process for Sync (#28)
Browse files Browse the repository at this point in the history
This defined the "schema" for what will be uploaded to Dune. We reused/generalize some common code for the order rewards sync. More or less, this was a minor change. Still remaining todo for parsing the bytea[] type with participants field (since duneV2 uses lower case hex strings). We will want to replace all \x coming from postgres with 0x.
  • Loading branch information
bh2smith authored Feb 27, 2023
1 parent 47128c7 commit 752697c
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 24 deletions.
21 changes: 19 additions & 2 deletions src/fetch/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def get_latest_block(cls) -> int:
return min(int(barn["latest"][0]), int(prod["latest"][0])) - REORG_THRESHOLD

@classmethod
def get_orderbook_rewards(cls, block_range: BlockRange) -> DataFrame:
def get_order_rewards(cls, block_range: BlockRange) -> DataFrame:
"""
Fetches and validates Orderbook Reward DataFrame as concatenation from Prod and Staging DB
Fetches and validates Order Reward DataFrame as concatenation from Prod and Staging DB
"""
cow_reward_query = (
open_query("orderbook/order_rewards.sql")
Expand All @@ -90,3 +90,20 @@ def get_orderbook_rewards(cls, block_range: BlockRange) -> DataFrame:
# Solvers do not appear in both environments!
assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!"
return pd.concat([prod, barn])

@classmethod
def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame:
"""
Fetches and validates Batch Rewards DataFrame as concatenation from Prod and Staging DB
"""
cow_reward_query = (
open_query("orderbook/batch_rewards.sql")
.replace("{{start_block}}", str(block_range.block_from))
.replace("{{end_block}}", str(block_range.block_to))
)
data_types = {"block_number": "int64"}
barn, prod = cls._query_both_dbs(cow_reward_query, data_types)

# Solvers do not appear in both environments!
assert set(prod.solver).isdisjoint(set(barn.solver)), "solver overlap!"
return pd.concat([prod, barn])
9 changes: 8 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from src.post.aws import AWSClient
from src.sync import sync_app_data
from src.sync.config import SyncConfig, AppDataSyncConfig
from src.sync.order_rewards import sync_order_rewards
from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards

log = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s")
Expand Down Expand Up @@ -72,8 +72,15 @@ def __init__(self) -> None:
elif args.sync_table == SyncTable.ORDER_REWARDS:
sync_order_rewards(
aws,
config=SyncConfig(volume_path),
fetcher=OrderbookFetcher(),
dry_run=args.dry_run,
)
elif args.sync_table == SyncTable.BATCH_REWARDS:
sync_batch_rewards(
aws,
config=SyncConfig(volume_path),
fetcher=OrderbookFetcher(),
dry_run=args.dry_run,
)
else:
Expand Down
38 changes: 38 additions & 0 deletions src/models/batch_rewards_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Model for Batch Rewards Data"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from pandas import DataFrame


@dataclass
class BatchRewards:
"""
This class provides a transformation interface for the Dataframe we fetch from the orderbook
"""

@classmethod
def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]:
"""Converts Pandas DataFrame into the expected stream type for Dune"""
return [
{
"block_number": int(row["block_number"]),
"tx_hash": row["tx_hash"],
"solver": row["solver"],
"data": {
# All the following values are in WEI.
"reward_eth": str(row["reward_eth"]),
"execution_cost": str(row["execution_cost"]),
"surplus": str(row["surplus"]),
"fee": str(row["fee"]),
"winning_score": str(row["winning_score"]),
"reference_score": str(row["reference_score"]),
# TODO - Not sure yet how to parse this bytea[]
# Will need to experiment with this.
"participating_solvers": row["participating_solvers"],
},
}
for row in rewards_df.to_dict(orient="records")
]
1 change: 1 addition & 0 deletions src/models/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class SyncTable(Enum):

APP_DATA = "app_data"
ORDER_REWARDS = "order_rewards"
BATCH_REWARDS = "batch_rewards"

def __str__(self) -> str:
return str(self.value)
Expand Down
13 changes: 12 additions & 1 deletion src/sql/orderbook/batch_rewards.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,15 @@ LEFT OUTER JOIN observed_settlements os
ON os.auction_id = ss.auction_id
)

SELECT * FROM reward_data
SELECT
block_number,
concat('0x', encode(tx_hash, 'hex')) as tx_hash,
concat('0x', encode(solver, 'hex')) as solver,
execution_cost,
surplus,
fee,
surplus + fee - reference_score as reward_eth,
winning_score,
reference_score,
participants as participating_solvers
FROM reward_data
86 changes: 67 additions & 19 deletions src/sync/order_rewards.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Main Entry point for app_hash sync"""
from typing import Any

from dune_client.file.interface import FileIO
from pandas import DataFrame

from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.batch_rewards_schema import BatchRewards
from src.models.block_range import BlockRange
from src.models.order_rewards_schema import OrderRewards
from src.models.tables import SyncTable
Expand All @@ -16,10 +17,10 @@

log = set_log(__name__)

SYNC_TABLE = SyncTable.ORDER_REWARDS


class OrderbookDataHandler(RecordHandler): # pylint:disable=too-few-public-methods
class OrderbookDataHandler(
RecordHandler
): # pylint:disable=too-few-public-methods,too-many-arguments
"""
This class is responsible for consuming new dune records and missing values from previous runs
it attempts to fetch content for them and filters them into "found" and "not found" as necessary
Expand All @@ -29,21 +30,20 @@ def __init__(
self,
file_manager: FileIO,
block_range: BlockRange,
sync_table: SyncTable,
config: SyncConfig,
order_rewards: DataFrame,
data_list: list[dict[str, Any]],
):
super().__init__(block_range, SYNC_TABLE, config)
log.info(f"Handling {len(order_rewards)} new records")
super().__init__(block_range, sync_table, config)
log.info(f"Handling {len(data_list)} new records")
self.file_manager = file_manager
self.order_rewards = OrderRewards.from_pdf_to_dune_records(order_rewards)
self.data_list = data_list

def num_records(self) -> int:
return len(self.order_rewards)
return len(self.data_list)

def write_found_content(self) -> None:
self.file_manager.write_ndjson(
data=self.order_rewards, name=self.content_filename
)
self.file_manager.write_ndjson(data=self.data_list, name=self.content_filename)

def write_sync_data(self) -> None:
# Only write these if upload was successful.
Expand All @@ -56,23 +56,71 @@ def write_sync_data(self) -> None:
def sync_order_rewards(
aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool
) -> None:
"""App Data Sync Logic"""

"""Order Rewards Data Sync Logic"""
sync_table = SyncTable.ORDER_REWARDS
block_range = BlockRange(
block_from=last_sync_block(
aws,
table=SYNC_TABLE,
table=sync_table,
genesis_block=15719994, # First Recorded Order Reward block
),
block_to=fetcher.get_latest_block(),
)
sync_orderbook_data(
aws,
block_range,
config,
dry_run,
sync_table=sync_table,
data_list=OrderRewards.from_pdf_to_dune_records(
fetcher.get_order_rewards(block_range)
),
)


def sync_batch_rewards(
aws: AWSClient, fetcher: OrderbookFetcher, config: SyncConfig, dry_run: bool
) -> None:
"""Batch Reward Sync Logic"""
sync_table = SyncTable.BATCH_REWARDS
block_range = BlockRange(
block_from=last_sync_block(
aws,
table=sync_table,
genesis_block=16691686, # First Recorded Batch Reward block
# TODO - use correct genesis block here. (note this is not actually determined yet)
),
block_to=fetcher.get_latest_block(),
)
sync_orderbook_data(
aws,
block_range,
config,
dry_run,
sync_table,
data_list=BatchRewards.from_pdf_to_dune_records(
fetcher.get_batch_rewards(block_range)
),
)


def sync_orderbook_data( # pylint:disable=too-many-arguments
aws: AWSClient,
block_range: BlockRange,
config: SyncConfig,
dry_run: bool,
sync_table: SyncTable,
data_list: list[dict[str, Any]],
) -> None:
"""Generic Orderbook Sync Logic"""
record_handler = OrderbookDataHandler(
file_manager=FileIO(config.volume_path / str(SYNC_TABLE)),
file_manager=FileIO(config.volume_path / str(sync_table)),
block_range=block_range,
config=config,
order_rewards=fetcher.get_orderbook_rewards(block_range),
data_list=data_list,
sync_table=sync_table,
)
UploadHandler(aws, record_handler, table=SYNC_TABLE).write_and_upload_content(
UploadHandler(aws, record_handler, table=sync_table).write_and_upload_content(
dry_run
)
log.info("order_rewards sync run completed successfully")
log.info(f"{sync_table} sync run completed successfully")
2 changes: 1 addition & 1 deletion tests/integration/test_fetch_orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_latest_block_increasing(self):
def test_get_order_rewards(self):
block_number = 16000000
block_range = BlockRange(block_number, block_number + 50)
rewards_df = OrderbookFetcher.get_orderbook_rewards(block_range)
rewards_df = OrderbookFetcher.get_order_rewards(block_range)
expected = pd.DataFrame(
{
"block_number": [16000018, 16000050],
Expand Down

0 comments on commit 752697c

Please sign in to comment.