diff --git a/src/models/order_rewards_schema.py b/src/models/order_rewards_schema.py new file mode 100644 index 00000000..59cc000e --- /dev/null +++ b/src/models/order_rewards_schema.py @@ -0,0 +1,31 @@ +"""Model for Order Rewards Data""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from pandas import DataFrame + + +@dataclass +class OrderRewards: + """ + This class provides a transformation interface for the Dataframe we fetch from the orderbook + """ + + @classmethod + def from_pdf_to_dune_records(cls, rewards_df: DataFrame) -> list[dict[str, Any]]: + """Converts Pandas DataFrame into the expected stream type for Dune""" + return [ + { + "order_uid": row["order_uid"], + "tx_hash": row["tx_hash"], + "solver": row["solver"], + "data": { + "surplus_fee": str(row["surplus_fee"]), + "amount": float(row["amount"]), + "safe_liquidity": row["safe_liquidity"], + }, + } + for row in rewards_df.to_dict(orient="records") + ] diff --git a/src/sql/orderbook/order_rewards.sql b/src/sql/orderbook/order_rewards.sql index 0d8a9140..e97d7f29 100644 --- a/src/sql/orderbook/order_rewards.sql +++ b/src/sql/orderbook/order_rewards.sql @@ -23,7 +23,7 @@ with trade_hashes as (SELECT solver, select concat('0x', encode(trade_hashes.order_uid, 'hex')) as order_uid, concat('0x', encode(solver, 'hex')) as solver, concat('0x', encode(tx_hash, 'hex')) as tx_hash, - surplus_fee::text, + coalesce(surplus_fee, 0)::text as surplus_fee, coalesce(reward, 0.0) as amount, -- An order is a liquidity order if and only if reward is null. -- A liquidity order is safe if and only if its fee_amount is > 0 diff --git a/src/sync/order_rewards.py b/src/sync/order_rewards.py index c038cba8..627407f0 100644 --- a/src/sync/order_rewards.py +++ b/src/sync/order_rewards.py @@ -1,12 +1,12 @@ """Main Entry point for app_hash sync""" -import csv -import os.path +from dune_client.file.interface import FileIO from pandas import DataFrame from src.fetch.orderbook import OrderbookFetcher from src.logger import set_log from src.models.block_range import BlockRange +from src.models.order_rewards_schema import OrderRewards from src.models.tables import SyncTable from src.post.aws import AWSClient from src.sync.common import last_sync_block @@ -26,37 +26,31 @@ class OrderbookDataHandler(RecordHandler): # pylint:disable=too-few-public-meth """ def __init__( - self, block_range: BlockRange, config: SyncConfig, order_rewards: DataFrame + self, + file_manager: FileIO, + block_range: BlockRange, + config: SyncConfig, + order_rewards: DataFrame, ): super().__init__(block_range, SYNC_TABLE, config) log.info(f"Handling {len(order_rewards)} new records") - self.order_rewards = order_rewards + self.file_manager = file_manager + self.order_rewards = OrderRewards.from_pdf_to_dune_records(order_rewards) def num_records(self) -> int: return len(self.order_rewards) def write_found_content(self) -> None: - path = self.file_path - if not os.path.exists(path): - log.info(f"creating write path {path}") - os.makedirs(path) - - self.order_rewards.to_json( - os.path.join(self.file_path, self.content_filename), - orient="records", - lines=True, + self.file_manager.write_ndjson( + data=self.order_rewards, name=self.content_filename ) def write_sync_data(self) -> None: # Only write these if upload was successful. - config = self.config - column = config.sync_column - with open( - os.path.join(self.file_path, config.sync_file), "w", encoding="utf-8" - ) as sync_file: - writer = csv.DictWriter(sync_file, fieldnames=[column], lineterminator="\n") - writer.writeheader() - writer.writerows([{column: str(self.block_range.block_to)}]) + self.file_manager.write_csv( + data=[{self.config.sync_column: str(self.block_range.block_to)}], + name=self.config.sync_file, + ) def sync_order_rewards( @@ -73,7 +67,10 @@ def sync_order_rewards( block_to=fetcher.get_latest_block(), ) record_handler = OrderbookDataHandler( - block_range, config, order_rewards=fetcher.get_orderbook_rewards(block_range) + file_manager=FileIO(config.volume_path / str(SYNC_TABLE)), + block_range=block_range, + config=config, + order_rewards=fetcher.get_orderbook_rewards(block_range), ) UploadHandler(aws, record_handler, table=SYNC_TABLE).write_and_upload_content( dry_run diff --git a/tests/integration/test_fetch_orderbook.py b/tests/integration/test_fetch_orderbook.py index dddfbef3..d57f7809 100644 --- a/tests/integration/test_fetch_orderbook.py +++ b/tests/integration/test_fetch_orderbook.py @@ -1,4 +1,3 @@ -import json import unittest import pandas as pd @@ -33,7 +32,7 @@ def test_get_order_rewards(self): "0xb6f7df8a1114129f7b61f2863b3f81b3620e95f73e5b769a62bb7a87ab6983f4", "0x2ce77009e78c291cdf39eb6f8ddf7e2c3401b4f962ef1240bdac47e632f8eb7f", ], - "surplus_fee": [None, None], + "surplus_fee": ["0", "0"], "amount": [40.70410, 39.00522], "safe_liquidity": [None, None], } diff --git a/tests/unit/test_order_rewards_schema.py b/tests/unit/test_order_rewards_schema.py new file mode 100644 index 00000000..b1544e34 --- /dev/null +++ b/tests/unit/test_order_rewards_schema.py @@ -0,0 +1,59 @@ +import unittest + +import pandas as pd + +from src.models.order_rewards_schema import OrderRewards + + +class TestModelOrderRewards(unittest.TestCase): + def test_order_rewards_transformation(self): + rewards_df = pd.DataFrame( + { + "order_uid": ["0x01", "0x02", "0x03"], + "solver": ["0x51", "0x52", "0x53"], + "tx_hash": ["0x71", "0x72", "0x73"], + "surplus_fee": [12345678910111213, 0, 0], + "amount": [40.70410, 39.00522, 0], + "safe_liquidity": [None, True, False], + } + ) + + self.assertEqual( + [ + { + "order_uid": "0x01", + "solver": "0x51", + "tx_hash": "0x71", + "data": { + "surplus_fee": "12345678910111213", + "amount": 40.70410, + "safe_liquidity": None, + }, + }, + { + "order_uid": "0x02", + "solver": "0x52", + "tx_hash": "0x72", + "data": { + "surplus_fee": "0", + "amount": 39.00522, + "safe_liquidity": True, + }, + }, + { + "order_uid": "0x03", + "solver": "0x53", + "tx_hash": "0x73", + "data": { + "surplus_fee": "0", + "amount": 0.0, + "safe_liquidity": False, + }, + }, + ], + OrderRewards.from_pdf_to_dune_records(rewards_df), + ) + + +if __name__ == "__main__": + unittest.main()