diff --git a/bots/event_trading_framework/base_event_trading_framework.py b/bots/event_trading_framework/base_event_trading_framework.py index 5b0e23b..31a3b54 100644 --- a/bots/event_trading_framework/base_event_trading_framework.py +++ b/bots/event_trading_framework/base_event_trading_framework.py @@ -3,6 +3,7 @@ from typing import Union from rubi import OrderBook, OrderEvent +from rubi.data.helpers.query_types import SubgraphResponse from event_trading_framework.helpers import FreshEventQueue, FIFOEventQueue @@ -26,6 +27,9 @@ def __init__(self, event_queue: Queue): # Initialize message queues self.orderbook_event_queue = FreshEventQueue(message_handler=self.on_orderbook) self.order_event_queue = FIFOEventQueue(message_handler=self.on_order) + self.subgrounds_order_query_event_queue = FreshEventQueue( + message_handler=self.on_subgrounds_order_query + ) @abstractmethod def on_startup(self): @@ -46,6 +50,7 @@ def start(self): # start message queue handlers self.orderbook_event_queue.start() self.order_event_queue.start() + self.subgrounds_order_query_event_queue.start() self.running = True while self.running: @@ -55,6 +60,8 @@ def start(self): self.orderbook_event_queue.add_message(message=message) elif isinstance(message, OrderEvent): self.order_event_queue.add_message(message=message) + elif isinstance(message, SubgraphResponse): + self.subgrounds_order_query_event_queue.add_message(message=message) else: raise Exception("Unexpected message fetched from queue") @@ -93,3 +100,12 @@ def on_order(self, order: OrderEvent): :type order: OrderEvent """ raise NotImplementedError() + + @abstractmethod + def on_subgrounds_order_query(self, response: SubgraphResponse): + """This method should be implemented by subclasses to handle subgrounds order queries. + + :param response: The subgraph response to be handled. + :type response: SubgraphResponse + """ + raise NotImplementedError() diff --git a/bots/example_bots/gridbot.py b/bots/example_bots/gridbot.py index a5e811e..e8a9c18 100644 --- a/bots/example_bots/gridbot.py +++ b/bots/example_bots/gridbot.py @@ -14,6 +14,7 @@ RubiconMarketApproval, TransactionStatus, ) +from rubi.data.helpers.query_types import SubgraphResponse from web3.types import TxParams from event_trading_framework import BaseEventTradingFramework @@ -89,15 +90,6 @@ def on_startup(self): # Orderbook poller self.client.start_orderbook_poller(pair_name=self.pair_name) - base_asset_wallet_balance = self.client.get_balance(self.grid.base_asset) - quote_asset_wallet_balance = self.client.get_balance(self.grid.quote_asset) - - self.grid.update_inventory( - open_orders=self.client.open_limit_orders, - base_asset_wallet_balance=base_asset_wallet_balance, - quote_asset_wallet_balance=quote_asset_wallet_balance, - ) - # set allowed_to_place_new_orders to True self.allowed_to_place_new_orders = True @@ -133,6 +125,21 @@ def on_order(self, order: OrderEvent): order_side=order.order_side, price=order.price, size=order.size ) + def on_subgrounds_order_query(self, response: SubgraphResponse): + logger.debug(f"NEW SUBGROUNDS ORDER QUERY, timestamp: {time.time_ns()}") + logger.debug(response) + + base_asset_wallet_balance = self.client.get_balance(self.grid.base_asset) + quote_asset_wallet_balance = self.client.get_balance(self.grid.quote_asset) + + self.grid.update_inventory( + open_orders=self.client.open_limit_orders, + base_asset_wallet_balance=base_asset_wallet_balance, + quote_asset_wallet_balance=quote_asset_wallet_balance, + ) + + logger.info(f"inventory: {self.grid.inventory}") + ###################################################################### # place transaction methods ###################################################################### diff --git a/bots/example_bots/helpers/grid.py b/bots/example_bots/helpers/grid.py index 4bb52bb..7fedfcc 100644 --- a/bots/example_bots/helpers/grid.py +++ b/bots/example_bots/helpers/grid.py @@ -51,7 +51,7 @@ def __init__( self.pair_name = f"{base_asset}/{quote_asset}" # Grid Inventory - self._inventory = { + self.inventory = { base_asset: Decimal(starting_base_asset_amount), quote_asset: Decimal(starting_quote_asset_amount), } @@ -67,8 +67,8 @@ def __init__( self.min_level_size_in_base = Decimal(min_level_size_in_base) self.grid_size = ( - self._inventory[self.base_asset] - + self._inventory[self.quote_asset] / self.fair_price + self.inventory[self.base_asset] + + self.inventory[self.quote_asset] / self.fair_price ) # Grid @@ -88,19 +88,36 @@ def __init__( # inventory functions ###################################################################### + def update_fair_price(self, fair_price: Decimal): + self.fair_price = fair_price + + # Update grid size based on new fair price + self.grid_size = ( + self.inventory[self.base_asset] + + self.inventory[self.quote_asset] / self.fair_price + ) + + # Update grid based on new fair price + self.desired_grid: List[GridLevel] = self._construct_grid() + self.num_grid_levels = len(self.desired_grid) + self.middle_index = math.ceil(self.num_grid_levels / 2) + + # Calculate new current index + self.current_grid_index: int = self._calculate_grid_index() + def update_inventory( self, open_orders: Dict[int, LimitOrder], base_asset_wallet_balance: Decimal, quote_asset_wallet_balance: Decimal, ): - self._inventory[self.base_asset] = ( + self.inventory[self.base_asset] = ( self._amount_in_market( side=OrderSide.SELL, open_limit_orders=list(open_orders.values()) ) + base_asset_wallet_balance ) - self._inventory[self.quote_asset] = ( + self.inventory[self.quote_asset] = ( self._amount_in_market( side=OrderSide.BUY, open_limit_orders=list(open_orders.values()) ) @@ -109,8 +126,8 @@ def update_inventory( self.current_grid_index = self._calculate_grid_index() def add_trade(self, order_side: OrderSide, price: Decimal, size: Decimal) -> None: - self._inventory[self.base_asset] += size * order_side.sign() - self._inventory[self.quote_asset] -= size * price * order_side.sign() + self.inventory[self.base_asset] += size * order_side.sign() + self.inventory[self.quote_asset] -= size * price * order_side.sign() match order_side: case OrderSide.BUY: @@ -119,16 +136,16 @@ def add_trade(self, order_side: OrderSide, price: Decimal, size: Decimal) -> Non self._last_sold_price = price self.grid_size = ( - self._inventory[self.base_asset] - + self._inventory[self.quote_asset] / self.fair_price + self.inventory[self.base_asset] + + self.inventory[self.quote_asset] / self.fair_price ) self.current_grid_index = self._calculate_grid_index() def get_base_asset_amount(self): - return self._inventory[self.base_asset] + return self.inventory[self.base_asset] def get_quote_asset_amount(self): - return self._inventory[self.quote_asset] + return self.inventory[self.quote_asset] ###################################################################### # grid functions @@ -141,8 +158,8 @@ def get_orders( best_bid_price=best_bid_price, best_ask_price=best_ask_price ) - bid_amount_available = self._inventory[self.quote_asset] - ask_amount_available = self._inventory[self.base_asset] + bid_amount_available = self.inventory[self.quote_asset] + ask_amount_available = self.inventory[self.base_asset] bids_to_place = [] for bid in desired_bids: @@ -203,7 +220,7 @@ def _get_desired_orders( return desired_bids, desired_asks def _calculate_grid_index(self) -> int: - quote_as_percent_of_size = self._inventory[self.quote_asset] / ( + quote_as_percent_of_size = self.inventory[self.quote_asset] / ( self.grid_size * self.fair_price ) diff --git a/bots/main.py b/bots/main.py index 1563e9f..dfcddb3 100644 --- a/bots/main.py +++ b/bots/main.py @@ -13,7 +13,7 @@ if __name__ == "__main__": # setup logging logging.basicConfig( - format="%(asctime)s %(levelname)-8s %(name)s: %(message)s", + format="%(asctime)s %(levelname)-8s %(name)s:%(lineno)d - %(message)s", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S", ) diff --git a/rubi/rubi/client.py b/rubi/rubi/client.py index 15bdebf..872ab64 100644 --- a/rubi/rubi/client.py +++ b/rubi/rubi/client.py @@ -1,5 +1,6 @@ import logging from _decimal import Decimal +from concurrent.futures import ThreadPoolExecutor from multiprocessing import Queue from threading import Thread from time import sleep @@ -9,6 +10,7 @@ from eth_typing import ChecksumAddress from web3.types import EventData, Nonce, TxParams +from rubi.data.helpers.query_types import SubgraphResponse from rubi import LimitOrder from rubi.contracts import ( ERC20, @@ -395,6 +397,7 @@ def start_event_poller( event_type: Type[BaseEvent], filters: Optional[Dict[str, Any]] = None, event_handler: Optional[Callable] = None, + from_block: int = 0, poll_time: int = 2, ) -> None: """Starts a background event poller that continuously listens for events of the specified event type @@ -411,10 +414,11 @@ def start_event_poller( :param event_handler: Optional event handler function to process the retrieved events, defaults to the self._default_event_handler (optional, default is None). :type event_handler: Optional[Callable], optional + :param from_block: The block to create the filter from. Defaults to the latest block. + :type from_block: int :param poll_time: Polling interval in seconds, defaults to 2 seconds. :type poll_time: int, optional :raises Exception: If the message queue is not configured. - :raises PairDoesNotExistException: If the pair does not exist in the client. """ if self.message_queue is None: raise Exception( @@ -457,6 +461,7 @@ def start_event_poller( event_handler=self._default_event_handler if event_handler is None else event_handler, + from_block=from_block, poll_time=poll_time, ) @@ -496,6 +501,26 @@ def _default_event_handler( self.message_queue.put(event) + def stop_event_poller( + self, + pair_name: str, + event_type: Type[BaseEvent], + ) -> None: + """Stop a running event poller for a specific event type. + + :param pair_name: The name of the pair we are monitoring events of. + :type pair_name: str + :param event_type: The type of event we are polling for. + :type event_type: Type[BaseEvent] + """ + + event_type.get_event_contract( + market=self.network.rubicon_market, router=self.network.rubicon_router + ).stop_event_poller( + pair_name=pair_name, + event_type=event_type, + ) + ###################################################################### # order methods ###################################################################### @@ -892,7 +917,7 @@ def get_offers( order_by: str = "timestamp", order_direction: str = "desc", as_dataframe: bool = True, - ) -> Optional[pd.DataFrame] | List[LimitOrder]: + ) -> Optional[pd.DataFrame] | SubgraphResponse: # TODO: add support for multiple pair_names if len(pair_names) == 1: base_asset, quote_asset = pair_names[0].split("/") @@ -933,38 +958,43 @@ def get_offers( as_dataframe=as_dataframe, ) case _: - bids = self.market_data.get_offers( - maker=maker, - from_address=from_address, - buy_gem=self.network.tokens[base_asset].address, - pay_gem=self.network.tokens[quote_asset].address, - side=OrderSide.BUY.value.lower(), - open=open, - start_time=start_time, - end_time=end_time, - start_block=start_block, - end_block=end_block, - first=first, - order_by=order_by, - order_direction=order_direction, - as_dataframe=as_dataframe, - ) - asks = self.market_data.get_offers( - maker=maker, - from_address=from_address, - buy_gem=self.network.tokens[quote_asset].address, - pay_gem=self.network.tokens[base_asset].address, - side=OrderSide.SELL.value.lower(), - open=open, - start_time=start_time, - end_time=end_time, - start_block=start_block, - end_block=end_block, - first=first, - order_by=order_by, - order_direction=order_direction, - as_dataframe=as_dataframe, - ) + with ThreadPoolExecutor() as executor: + bids_future = executor.submit( + self.market_data.get_offers, + maker=maker, + from_address=from_address, + buy_gem=self.network.tokens[base_asset].address, + pay_gem=self.network.tokens[quote_asset].address, + side=OrderSide.BUY.value.lower(), + open=open, + start_time=start_time, + end_time=end_time, + start_block=start_block, + end_block=end_block, + first=first, + order_by=order_by, + order_direction=order_direction, + as_dataframe=as_dataframe, + ) + asks_future = executor.submit( + self.market_data.get_offers, + maker=maker, + from_address=from_address, + buy_gem=self.network.tokens[quote_asset].address, + pay_gem=self.network.tokens[base_asset].address, + side=OrderSide.SELL.value.lower(), + open=open, + start_time=start_time, + end_time=end_time, + start_block=start_block, + end_block=end_block, + first=first, + order_by=order_by, + order_direction=order_direction, + as_dataframe=as_dataframe, + ) + bids = bids_future.result() + asks = asks_future.result() if as_dataframe: result = pd.concat([bids, asks]).reset_index(drop=True) @@ -989,10 +1019,10 @@ def get_offers( as_dataframe=as_dataframe, ) - if isinstance(result, List): + if isinstance(result, SubgraphResponse): limit_orders: List[LimitOrder] = [] - for offer in result: + for offer in result.body: base_asset, quote_asset = self._get_base_and_quote_asset( raw=offer, pair_names=pair_names ) @@ -1004,7 +1034,7 @@ def get_offers( base_asset=base_asset, quote_asset=quote_asset, offer=offer ) ) - return limit_orders + return SubgraphResponse(block_number=result.block_number, body=limit_orders) return result diff --git a/rubi/rubi/contracts/base_contract.py b/rubi/rubi/contracts/base_contract.py index 222a710..a132f84 100644 --- a/rubi/rubi/contracts/base_contract.py +++ b/rubi/rubi/contracts/base_contract.py @@ -49,6 +49,8 @@ def __init__( self.error_decoder[error_hex_code] = item["name"] + self.running_event_pollers: Dict[str, int] = {} + @classmethod def from_address_and_abi( cls, @@ -137,6 +139,7 @@ def start_event_poller( event_type: Type[BaseEvent], argument_filters: Optional[Dict[str, Any]] = None, event_handler: Optional[Callable] = None, + from_block: int = 0, poll_time: int = 2, ) -> None: """Start a thread which runs an event poller for a specific event type. @@ -149,12 +152,16 @@ def start_event_poller( :type argument_filters: Optional[Dict[str, Any]] :param event_handler: Optional event handler function. Defaults to using the events default handler. :type event_handler: Optional[Callable] + :param from_block: The block to create the filter from. Defaults to the latest block. + :type from_block: int :param poll_time: The time interval between each poll in seconds. Defaults to 2 seconds. :type poll_time: int """ event_filter = event_type.create_event_filter( - contract=self.contract, argument_filters=argument_filters + contract=self.contract, + argument_filters=argument_filters, + from_block=from_block, ) handler = ( event_handler if event_handler is not None else event_type.default_handler @@ -169,23 +176,26 @@ def start_event_poller( argument_filters, event_filter, handler, + from_block, poll_time, ), daemon=True, ) thread.start() - @staticmethod def _start_default_event_poller( + self, pair_name: str, event_type: Type[BaseEvent], contract: Contract, argument_filters: Optional[Dict[str, Any]], event_filter: LogFilter, event_handler: Callable, + from_block: int, poll_time: int, ) -> None: - """Start the default event poller loop. This thread will stop if the pair is removed from the client. + """Start the default event poller loop. This thread will stop a new event poller with a different from_block is + started. :param pair_name: The name of the event pair. :type pair_name: str @@ -195,34 +205,66 @@ def _start_default_event_poller( :type event_filter: LogFilter :param event_handler: The event handler function. :type event_handler: Callable + :param from_block: The block to create the filter from. Defaults to the latest block. + :type from_block: int :param poll_time: The time interval between poll iterations in seconds. :type poll_time: int """ - polling = True + event_poller_name = pair_name + event_type.__name__ + + if ( + self.running_event_pollers.get(event_poller_name, False) + and from_block == self.running_event_pollers[event_poller_name] + ): + logging.debug( + f"Event poller for {event_type.__name__} on {pair_name} with this config already started." + ) + return + + self.running_event_pollers[event_poller_name] = from_block + logging.debug( + f"Event poller for {event_type.__name__} on {pair_name} started from block {from_block}." + ) - while polling: + while self.running_event_pollers[event_poller_name] == from_block: try: for event_data in event_filter.get_new_entries(): - event_handler(pair_name, event_type, event_data) + try: + event_handler(pair_name, event_type, event_data) + except Exception as e: + logger.error(f"Error {e} handling event data: {event_data}") except Exception as e: logger.error(e) # The filter has been deleted by the node and needs to be recreated if "filter not found" in str(e): event_filter = event_type.create_event_filter( - contract=contract, argument_filters=argument_filters + contract=contract, + argument_filters=argument_filters, + from_block=from_block, ) logger.info(f"event filter for: {event_type} has been recreated") - # TODO: this is a hack to detect if a PairDoesNotExistException is raised and polling should stop. - # Currently an additional except PairDoesNotExistException as e: cannot be added as this causes a - # circular import. Think about restructuring the directories to avoid this (e.g one root level types - # directory). - if "add pair to the client" in str(e): - polling = False - sleep(poll_time) + logging.debug(f"Event poller for {event_type.__name__} on {pair_name} stopped.") + + def stop_event_poller( + self, + pair_name: str, + event_type: Type[BaseEvent], + ) -> None: + """Stop a running event poller for a specific event type. + + :param pair_name: The name of the pair we are monitoring events of. + :type pair_name: str + :param event_type: The type of event to poll for. + :type event_type: Type[BaseEvent] + """ + + event_poller_name = pair_name + event_type.__name__ + self.running_event_pollers[event_poller_name] = -1 + ###################################################################### # helper methods ###################################################################### diff --git a/rubi/rubi/contracts/contract_types/events.py b/rubi/rubi/contracts/contract_types/events.py index d1d099b..cb0b9e8 100644 --- a/rubi/rubi/contracts/contract_types/events.py +++ b/rubi/rubi/contracts/contract_types/events.py @@ -82,7 +82,9 @@ def get_event_contract( @staticmethod @abstractmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """Abstract method to create an event filter for the given contract with optional argument filters. Must be overridden in each event subclass. @@ -92,6 +94,8 @@ def create_event_filter( :param argument_filters: Optional filters. Only events that match these filters will be returned when the filter is queried. :type argument_filters: Optional[Dict[str, Any]] + :param from_block: Optional block to get events from, defaults to fetching from the latest block. + :type from_block: int :return: The created event filter. :rtype: LogFilter """ @@ -210,11 +214,14 @@ def __init__( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" return contract.events.emitOffer.create_filter( - argument_filters=argument_filters, fromBlock="latest" + argument_filters=argument_filters, + fromBlock=from_block if from_block != 0 else "latest", ) @staticmethod @@ -266,11 +273,14 @@ def __init__( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" return contract.events.emitTake.create_filter( - argument_filters=argument_filters, fromBlock="latest" + argument_filters=argument_filters, + fromBlock=from_block if from_block != 0 else "latest", ) @staticmethod @@ -322,11 +332,14 @@ def __init__( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" return contract.events.emitCancel.create_filter( - argument_filters=argument_filters, fromBlock="latest" + argument_filters=argument_filters, + fromBlock=from_block if from_block != 0 else "latest", ) @staticmethod @@ -370,11 +383,14 @@ def __init__( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" return contract.events.emitFee.create_filter( - argument_filters=argument_filters, fromBlock="latest" + argument_filters=argument_filters, + fromBlock=from_block if from_block != 0 else "latest", ) @staticmethod @@ -404,11 +420,14 @@ def __init__(self, maker: ChecksumAddress, **args): @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" return contract.events.emitDelete.create_filter( - argument_filters=argument_filters, fromBlock="latest" + argument_filters=argument_filters, + fromBlock=from_block if from_block != 0 else "latest", ) @staticmethod @@ -476,11 +495,14 @@ def get_event_contract( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" return contract.events.emitSwap.create_filter( - argument_filters=argument_filters, fromBlock="latest" + argument_filters=argument_filters, + fromBlock=from_block if from_block != 0 else "latest", ) @staticmethod @@ -534,7 +556,9 @@ def get_event_contract( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" raise Exception("This method doesn't make sense on this class") @@ -583,7 +607,9 @@ def get_event_contract( @staticmethod def create_event_filter( - contract: Contract, argument_filters: Optional[Dict[str, Any]] = None + contract: Contract, + argument_filters: Optional[Dict[str, Any]] = None, + from_block: int = 0, ) -> LogFilter: """implementation of BaseEvent create_event_filter""" raise Exception("This method doesn't make sense on this class") diff --git a/rubi/rubi/data/helpers/query_types.py b/rubi/rubi/data/helpers/query_types.py index ea5ba64..58aba31 100644 --- a/rubi/rubi/data/helpers/query_types.py +++ b/rubi/rubi/data/helpers/query_types.py @@ -1,6 +1,22 @@ -from typing import Any, List +from typing import Any, List, Dict from eth_typing import ChecksumAddress +from subgrounds import FieldPath + + +class SubgraphResponse: + def __init__(self, block_number: int, body: List[Any]): + self.block_number = block_number + self.body = body + + def __add__(self, other: "SubgraphResponse") -> "SubgraphResponse": + block_number = ( + self.block_number + if self.block_number < other.block_number + else other.block_number + ) + + return SubgraphResponse(block_number=block_number, body=self.body + other.body) class SubgraphOffer: @@ -29,36 +45,45 @@ def __init__( self.open = open @staticmethod - def get_fields(offer_query: Any) -> List: + def get_fields(field_paths: Dict[str, FieldPath], formatted: bool = True) -> List: """Helper method to build a list of fields for the offers subgraph entity.""" - return [ - offer_query.id, - offer_query.timestamp, - offer_query.index, - offer_query.maker.id, - offer_query.from_address.id, - offer_query.pay_gem, - offer_query.buy_gem, - offer_query.pay_amt, - offer_query.buy_amt, - offer_query.paid_amt, - offer_query.bought_amt, - offer_query.price, - offer_query.open, - offer_query.removed_timestamp, - offer_query.removed_block, - offer_query.transaction.id, - offer_query.transaction.block_number, - offer_query.transaction.block_index, - offer_query.pay_amt_decimals, - offer_query.buy_amt_decimals, - offer_query.paid_amt_decimals, - offer_query.bought_amt_decimals, - offer_query.pay_gem_symbol, - offer_query.buy_gem_symbol, - offer_query.datetime, + fields = [ + field_paths["offer"].id, + field_paths["offer"].timestamp, + field_paths["offer"].index, + field_paths["offer"].maker.id, + field_paths["offer"].from_address.id, + field_paths["offer"].pay_gem, + field_paths["offer"].buy_gem, + field_paths["offer"].pay_amt, + field_paths["offer"].buy_amt, + field_paths["offer"].paid_amt, + field_paths["offer"].bought_amt, + field_paths["offer"].price, + field_paths["offer"].open, + field_paths["offer"].removed_timestamp, + field_paths["offer"].removed_block, + field_paths["offer"].transaction.id, + field_paths["offer"].transaction.block_number, + field_paths["offer"].transaction.block_index, + field_paths["offer"].datetime, + field_paths["block"].number, ] + if formatted: + fields.extend( + [ + field_paths["offer"].pay_amt_decimals, + field_paths["offer"].buy_amt_decimals, + field_paths["offer"].paid_amt_decimals, + field_paths["offer"].bought_amt_decimals, + field_paths["offer"].pay_gem_symbol, + field_paths["offer"].buy_gem_symbol, + ] + ) + + return fields + class SubgraphTrade: """Helper object for querying subgraph Trades""" @@ -66,6 +91,7 @@ class SubgraphTrade: @staticmethod def get_fields(trade_query: Any) -> List: """Helper method to build a list of fields for the offers subgraph entity.""" + return [ trade_query.id, trade_query.timestamp, diff --git a/rubi/rubi/data/market.py b/rubi/rubi/data/market.py index e5c67d6..d7cecd7 100644 --- a/rubi/rubi/data/market.py +++ b/rubi/rubi/data/market.py @@ -5,10 +5,11 @@ import pandas as pd from eth_typing import ChecksumAddress -from subgrounds import Subgrounds, Subgraph, SyntheticField +from subgrounds import Subgrounds, Subgraph, SyntheticField, FieldPath from subgrounds.pagination import ShallowStrategy from web3 import Web3 +from rubi.data.helpers.query_types import SubgraphResponse from rubi.contracts import ERC20 from rubi.data.helpers import QueryValidation from rubi.data.helpers import SubgraphOffer, SubgraphTrade @@ -266,7 +267,7 @@ def get_offers( order_by: str = "timestamp", order_direction: str = "desc", as_dataframe: bool = True, - ) -> Optional[pd.DataFrame] | List[SubgraphOffer]: + ) -> Optional[pd.DataFrame] | SubgraphResponse: """Returns a dataframe of offers placed on the market contract, with the option to pass in filters. :param maker: the address of the maker of the offer @@ -294,10 +295,10 @@ def get_offers( :param as_dataframe: If the response should be a dataframe (default: True) :type as_dataframe: bool :return: a dataframe of offers placed on the market contract or a list of subgraph offer objects - :rtype: Optional[pd.DataFrame] | List[SubgraphOffer] + :rtype: Optional[pd.DataFrame] | SubgraphResponse """ - offer_query = self._build_offers_query( + field_paths = self._build_offers_field_paths( order_by=order_by, order_direction=order_direction, first=first, @@ -312,7 +313,7 @@ def get_offers( end_block=end_block, ) - query_fields = SubgraphOffer.get_fields(offer_query=offer_query) + query_fields = SubgraphOffer.get_fields(field_paths=field_paths) if as_dataframe: response = self._query_offers_as_dataframe(query_fields=query_fields) # TODO: we could also pass this data to the offers_query method and handle it there, could help with price @@ -401,7 +402,7 @@ def get_trades( # helper methods ###################################################################### - def _build_offers_query( + def _build_offers_field_paths( self, order_by: str, order_direction: str, @@ -415,7 +416,7 @@ def _build_offers_query( end_time: Optional[int] = None, start_block: Optional[int] = None, # TODO: add in start_block and end_block end_block: Optional[int] = None, # TODO: add in start_block and end_block - ): + ) -> Dict["str", FieldPath]: """Helper method build an offers query.""" QueryValidation.validate_offer_query( @@ -445,14 +446,16 @@ def _build_offers_query( ] where = [condition for condition in where if condition is not None] - offers_query = self.subgraph.Query.offers( # noqa + offers_field_path = self.subgraph.Query.offers( # noqa orderBy=order_by, orderDirection=order_direction, first=first, where=where if where else {}, ) - return offers_query + block_field_path = self.subgraph.Query._meta().block() # noqa + + return {"offer": offers_field_path, "block": block_field_path} def _build_trades_query( self, @@ -530,7 +533,7 @@ def _query_offers_as_dataframe(self, query_fields: List) -> Optional[pd.DataFram # TODO: apply any data type conversions to the dataframe - possibly converting unformatted values to integers return df - def _query_offers(self, query_fields: List) -> List[SubgraphOffer]: + def _query_offers(self, query_fields: List) -> SubgraphResponse: """Helper method to query the offers subgraph entity.""" response = self.subgrounds.query_json( @@ -540,6 +543,9 @@ def _query_offers(self, query_fields: List) -> List[SubgraphOffer]: ) if response: + block_number = response[0]["_meta"]["block"]["number"] + del response[0]["_meta"] + raw_offers = list(response[0].values())[0] offers: List[SubgraphOffer] = [] @@ -557,9 +563,9 @@ def _query_offers(self, query_fields: List) -> List[SubgraphOffer]: open=raw_offer["open"], ) ) - return offers + return SubgraphResponse(block_number=block_number, body=offers) - return [] + raise Exception("Query failed for some reason") def _query_trades_as_dataframe( self, diff --git a/rubi/rubi/order_tracking_client.py b/rubi/rubi/order_tracking_client.py index e0232aa..fbddf20 100644 --- a/rubi/rubi/order_tracking_client.py +++ b/rubi/rubi/order_tracking_client.py @@ -1,6 +1,8 @@ import logging +import time from _decimal import Decimal from multiprocessing import Queue +from threading import Lock, Thread from typing import Optional, Dict, List, Any, Type, Union from eth_typing import ChecksumAddress @@ -52,17 +54,21 @@ def __init__( network=network, message_queue=message_queue, wallet=wallet, key=key ) - limit_orders_from_subgraph = self.get_offers( - maker=self.wallet, open=True, as_dataframe=False, pair_names=pair_names - ) - self.open_limit_orders: Dict[int, LimitOrder] = {} - if limit_orders_from_subgraph: - for limit_order in limit_orders_from_subgraph: - self.open_limit_orders[limit_order.order_id] = limit_order - self.pairs_with_registered_event_listeners: List[str] = [] - self._register_listeners(pair_names=pair_names) + self.running = True + self._subgrounds_update_lock = Lock() + + subgrounds_poller = Thread( + target=self._register_subgrounds_poller, + kwargs={"pair_names": pair_names, "poll_time": 300}, # poll every 5 minutes + daemon=True, + ) + subgrounds_poller.start() + + # Make sure that we have gotten the subgrounds response for open orders before finishing instantiation + first_message = message_queue.get(block=True) + message_queue.put(first_message) @classmethod def from_http_node_url( @@ -186,36 +192,39 @@ def _default_event_handler( def _update_active_limit_orders(self, events: List[Any]) -> None: """Update active limit order based on incoming events""" - for event in events: - if not isinstance(event, OrderEvent): - continue - - event: OrderEvent - if event.limit_order_owner != self.wallet: - continue - - match event.order_type: - case OrderType.LIMIT: - if self.open_limit_orders.get(event.limit_order_id): - # If we are already tracking the order then don't add it again - continue - - self.open_limit_orders[ - event.limit_order_id - ] = LimitOrder.from_order_event(order_event=event) - case OrderType.LIMIT_TAKEN: - taken_order = self.open_limit_orders[event.limit_order_id] - - if taken_order.remaining_size - event.size <= Decimal("0"): + if not self._subgrounds_update_lock.locked(): + for event in events: + if not isinstance(event, OrderEvent): + continue + + event: OrderEvent + if event.limit_order_owner != self.wallet: + continue + + match event.order_type: + case OrderType.LIMIT: + if self.open_limit_orders.get(event.limit_order_id): + # If we are already tracking the order then don't add it again + continue + + self.open_limit_orders[ + event.limit_order_id + ] = LimitOrder.from_order_event(order_event=event) + case OrderType.LIMIT_TAKEN: + taken_order = self.open_limit_orders[event.limit_order_id] + + if taken_order.remaining_size - event.size <= Decimal("0"): + self._delete_active_limit_order( + order_id=event.limit_order_id + ) + else: + self.open_limit_orders[ + event.limit_order_id + ].update_with_take(order_event=event) + case OrderType.CANCEL: + self._delete_active_limit_order(order_id=event.limit_order_id) + case OrderType.LIMIT_DELETED: self._delete_active_limit_order(order_id=event.limit_order_id) - else: - self.open_limit_orders[event.limit_order_id].update_with_take( - order_event=event - ) - case OrderType.CANCEL: - self._delete_active_limit_order(order_id=event.limit_order_id) - case OrderType.LIMIT_DELETED: - self._delete_active_limit_order(order_id=event.limit_order_id) def _delete_active_limit_order(self, order_id: int) -> None: """Delete active limit order""" @@ -226,7 +235,37 @@ def _delete_active_limit_order(self, order_id: int) -> None: f"Limit order {order_id} already removed from active limit orders." ) - def _register_listeners(self, pair_names: List[str]) -> None: + def _register_subgrounds_poller( + self, pair_names: List[str], poll_time: int | float + ) -> None: + """Register subgrounds poller and event listeners""" + while self.running: + with self._subgrounds_update_lock: + self._stop_listeners() + + subgrounds_response = self.get_offers( + maker=self.wallet, + open=True, + as_dataframe=False, + pair_names=pair_names, + ) + + if subgrounds_response.body: + for limit_order in subgrounds_response.body: + self.open_limit_orders[limit_order.order_id] = limit_order + else: + self.open_limit_orders = {} + + self.message_queue.put(subgrounds_response) + + self._register_listeners( + pair_names=pair_names, + from_block=subgrounds_response.block_number, + ) + + time.sleep(poll_time) + + def _register_listeners(self, pair_names: List[str], from_block: int) -> None: """Register event listeners""" new_pair_names = [ pair_name @@ -236,24 +275,47 @@ def _register_listeners(self, pair_names: List[str]) -> None: self.pairs_with_registered_event_listeners.extend(new_pair_names) - for pair_name in new_pair_names: + for pair_name in self.pairs_with_registered_event_listeners: self.start_event_poller( pair_name=pair_name, event_type=EmitOfferEvent, filters={"maker": self.wallet}, + from_block=from_block, ) self.start_event_poller( pair_name=pair_name, event_type=EmitTakeEvent, filters={"maker": self.wallet}, + from_block=from_block, ) self.start_event_poller( pair_name=pair_name, event_type=EmitCancelEvent, filters={"maker": self.wallet}, + from_block=from_block, ) self.start_event_poller( pair_name=pair_name, event_type=EmitDeleteEvent, filters={"maker": self.wallet}, + from_block=from_block, + ) + + def _stop_listeners(self): + for pair_name in self.pairs_with_registered_event_listeners: + self.stop_event_poller( + pair_name=pair_name, + event_type=EmitOfferEvent, + ) + self.stop_event_poller( + pair_name=pair_name, + event_type=EmitTakeEvent, + ) + self.stop_event_poller( + pair_name=pair_name, + event_type=EmitCancelEvent, + ) + self.stop_event_poller( + pair_name=pair_name, + event_type=EmitDeleteEvent, ) diff --git a/rubi/rubi/rubicon_types/orders.py b/rubi/rubi/rubicon_types/orders.py index e8f7454..f94f47c 100644 --- a/rubi/rubi/rubicon_types/orders.py +++ b/rubi/rubi/rubicon_types/orders.py @@ -85,6 +85,10 @@ def __init__( self.order_side = order_side self.order_type = order_type + def __repr__(self): + items = ("{}={!r}".format(k, self.__dict__[k]) for k in self.__dict__) + return "{}({})".format(type(self).__name__, ", ".join(items)) + class NewMarketOrder(BaseOrder): """Class representing a new market order. @@ -150,10 +154,6 @@ def __init__( self.size = size self.price = price - def __repr__(self): - items = ("{}={!r}".format(k, self.__dict__[k]) for k in self.__dict__) - return "{}({})".format(type(self).__name__, ", ".join(items)) - class UpdateLimitOrder(BaseOrder): """Class representing an update to an existing limit order @@ -288,10 +288,6 @@ def from_subgraph_offer( def update_with_take(self, order_event: "OrderEvent"): self.filled_size += order_event.size - def __repr__(self): - items = ("{}={!r}".format(k, self.__dict__[k]) for k in self.__dict__) - return "{}({})".format(type(self).__name__, ", ".join(items)) - class NewCancelOrder(BaseOrder): """Class representing a limit order cancellation