Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make order tracking more robust. #96

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bots/event_trading_framework/base_event_trading_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Union

from rubi import OrderBook, OrderEvent
from rubi.data.helpers.query_types import SubgraphResponse

from event_trading_framework.helpers import FreshEventQueue, FIFOEventQueue

Expand All @@ -26,6 +27,9 @@ def __init__(self, event_queue: Queue):
# Initialize message queues
self.orderbook_event_queue = FreshEventQueue(message_handler=self.on_orderbook)
self.order_event_queue = FIFOEventQueue(message_handler=self.on_order)
self.subgrounds_order_query_event_queue = FreshEventQueue(
message_handler=self.on_subgrounds_order_query
)

@abstractmethod
def on_startup(self):
Expand All @@ -46,6 +50,7 @@ def start(self):
# start message queue handlers
self.orderbook_event_queue.start()
self.order_event_queue.start()
self.subgrounds_order_query_event_queue.start()

self.running = True
while self.running:
Expand All @@ -55,6 +60,8 @@ def start(self):
self.orderbook_event_queue.add_message(message=message)
elif isinstance(message, OrderEvent):
self.order_event_queue.add_message(message=message)
elif isinstance(message, SubgraphResponse):
self.subgrounds_order_query_event_queue.add_message(message=message)
else:
raise Exception("Unexpected message fetched from queue")

Expand Down Expand Up @@ -93,3 +100,12 @@ def on_order(self, order: OrderEvent):
:type order: OrderEvent
"""
raise NotImplementedError()

@abstractmethod
def on_subgrounds_order_query(self, response: SubgraphResponse):
"""This method should be implemented by subclasses to handle subgrounds order queries.

:param response: The subgraph response to be handled.
:type response: SubgraphResponse
"""
raise NotImplementedError()
25 changes: 16 additions & 9 deletions bots/example_bots/gridbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
RubiconMarketApproval,
TransactionStatus,
)
from rubi.data.helpers.query_types import SubgraphResponse
from web3.types import TxParams

from event_trading_framework import BaseEventTradingFramework
Expand Down Expand Up @@ -89,15 +90,6 @@ def on_startup(self):
# Orderbook poller
self.client.start_orderbook_poller(pair_name=self.pair_name)

base_asset_wallet_balance = self.client.get_balance(self.grid.base_asset)
quote_asset_wallet_balance = self.client.get_balance(self.grid.quote_asset)

self.grid.update_inventory(
open_orders=self.client.open_limit_orders,
base_asset_wallet_balance=base_asset_wallet_balance,
quote_asset_wallet_balance=quote_asset_wallet_balance,
)

# set allowed_to_place_new_orders to True
self.allowed_to_place_new_orders = True

Expand Down Expand Up @@ -133,6 +125,21 @@ def on_order(self, order: OrderEvent):
order_side=order.order_side, price=order.price, size=order.size
)

def on_subgrounds_order_query(self, response: SubgraphResponse):
logger.debug(f"NEW SUBGROUNDS ORDER QUERY, timestamp: {time.time_ns()}")
logger.debug(response)

base_asset_wallet_balance = self.client.get_balance(self.grid.base_asset)
quote_asset_wallet_balance = self.client.get_balance(self.grid.quote_asset)

self.grid.update_inventory(
open_orders=self.client.open_limit_orders,
base_asset_wallet_balance=base_asset_wallet_balance,
quote_asset_wallet_balance=quote_asset_wallet_balance,
)

logger.info(f"inventory: {self.grid.inventory}")

######################################################################
# place transaction methods
######################################################################
Expand Down
45 changes: 31 additions & 14 deletions bots/example_bots/helpers/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
self.pair_name = f"{base_asset}/{quote_asset}"

# Grid Inventory
self._inventory = {
self.inventory = {
base_asset: Decimal(starting_base_asset_amount),
quote_asset: Decimal(starting_quote_asset_amount),
}
Expand All @@ -67,8 +67,8 @@ def __init__(
self.min_level_size_in_base = Decimal(min_level_size_in_base)

self.grid_size = (
self._inventory[self.base_asset]
+ self._inventory[self.quote_asset] / self.fair_price
self.inventory[self.base_asset]
+ self.inventory[self.quote_asset] / self.fair_price
)

# Grid
Expand All @@ -88,19 +88,36 @@ def __init__(
# inventory functions
######################################################################

def update_fair_price(self, fair_price: Decimal):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be helpful to have the ability to return back whatever the difference between the existing grid and the new grid is. however, without tracking what existing offers there are on the market this may not be all that helpful within the Grid class itself.

self.fair_price = fair_price

# Update grid size based on new fair price
self.grid_size = (
self.inventory[self.base_asset]
+ self.inventory[self.quote_asset] / self.fair_price
)

# Update grid based on new fair price
self.desired_grid: List[GridLevel] = self._construct_grid()
self.num_grid_levels = len(self.desired_grid)
self.middle_index = math.ceil(self.num_grid_levels / 2)

# Calculate new current index
self.current_grid_index: int = self._calculate_grid_index()

def update_inventory(
self,
open_orders: Dict[int, LimitOrder],
base_asset_wallet_balance: Decimal,
quote_asset_wallet_balance: Decimal,
):
self._inventory[self.base_asset] = (
self.inventory[self.base_asset] = (
self._amount_in_market(
side=OrderSide.SELL, open_limit_orders=list(open_orders.values())
)
+ base_asset_wallet_balance
)
self._inventory[self.quote_asset] = (
self.inventory[self.quote_asset] = (
self._amount_in_market(
side=OrderSide.BUY, open_limit_orders=list(open_orders.values())
)
Expand All @@ -109,8 +126,8 @@ def update_inventory(
self.current_grid_index = self._calculate_grid_index()

def add_trade(self, order_side: OrderSide, price: Decimal, size: Decimal) -> None:
self._inventory[self.base_asset] += size * order_side.sign()
self._inventory[self.quote_asset] -= size * price * order_side.sign()
self.inventory[self.base_asset] += size * order_side.sign()
self.inventory[self.quote_asset] -= size * price * order_side.sign()

match order_side:
case OrderSide.BUY:
Expand All @@ -119,16 +136,16 @@ def add_trade(self, order_side: OrderSide, price: Decimal, size: Decimal) -> Non
self._last_sold_price = price

self.grid_size = (
self._inventory[self.base_asset]
+ self._inventory[self.quote_asset] / self.fair_price
self.inventory[self.base_asset]
+ self.inventory[self.quote_asset] / self.fair_price
)
self.current_grid_index = self._calculate_grid_index()

def get_base_asset_amount(self):
return self._inventory[self.base_asset]
return self.inventory[self.base_asset]

def get_quote_asset_amount(self):
return self._inventory[self.quote_asset]
return self.inventory[self.quote_asset]

######################################################################
# grid functions
Expand All @@ -141,8 +158,8 @@ def get_orders(
best_bid_price=best_bid_price, best_ask_price=best_ask_price
)

bid_amount_available = self._inventory[self.quote_asset]
ask_amount_available = self._inventory[self.base_asset]
bid_amount_available = self.inventory[self.quote_asset]
ask_amount_available = self.inventory[self.base_asset]

bids_to_place = []
for bid in desired_bids:
Expand Down Expand Up @@ -203,7 +220,7 @@ def _get_desired_orders(
return desired_bids, desired_asks

def _calculate_grid_index(self) -> int:
quote_as_percent_of_size = self._inventory[self.quote_asset] / (
quote_as_percent_of_size = self.inventory[self.quote_asset] / (
self.grid_size * self.fair_price
)

Expand Down
2 changes: 1 addition & 1 deletion bots/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
if __name__ == "__main__":
# setup logging
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
format="%(asctime)s %(levelname)-8s %(name)s:%(lineno)d - %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
Expand Down
104 changes: 67 additions & 37 deletions rubi/rubi/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from _decimal import Decimal
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Queue
from threading import Thread
from time import sleep
Expand All @@ -9,6 +10,7 @@
from eth_typing import ChecksumAddress
from web3.types import EventData, Nonce, TxParams

from rubi.data.helpers.query_types import SubgraphResponse
from rubi import LimitOrder
from rubi.contracts import (
ERC20,
Expand Down Expand Up @@ -395,6 +397,7 @@ def start_event_poller(
event_type: Type[BaseEvent],
filters: Optional[Dict[str, Any]] = None,
event_handler: Optional[Callable] = None,
from_block: int = 0,
poll_time: int = 2,
) -> None:
"""Starts a background event poller that continuously listens for events of the specified event type
Expand All @@ -411,10 +414,11 @@ def start_event_poller(
:param event_handler: Optional event handler function to process the retrieved events, defaults to the
self._default_event_handler (optional, default is None).
:type event_handler: Optional[Callable], optional
:param from_block: The block to create the filter from. Defaults to the latest block.
:type from_block: int
:param poll_time: Polling interval in seconds, defaults to 2 seconds.
:type poll_time: int, optional
:raises Exception: If the message queue is not configured.
:raises PairDoesNotExistException: If the pair does not exist in the client.
"""
if self.message_queue is None:
raise Exception(
Expand Down Expand Up @@ -457,6 +461,7 @@ def start_event_poller(
event_handler=self._default_event_handler
if event_handler is None
else event_handler,
from_block=from_block,
poll_time=poll_time,
)

Expand Down Expand Up @@ -496,6 +501,26 @@ def _default_event_handler(

self.message_queue.put(event)

def stop_event_poller(
self,
pair_name: str,
event_type: Type[BaseEvent],
) -> None:
"""Stop a running event poller for a specific event type.

:param pair_name: The name of the pair we are monitoring events of.
:type pair_name: str
:param event_type: The type of event we are polling for.
:type event_type: Type[BaseEvent]
"""

event_type.get_event_contract(
market=self.network.rubicon_market, router=self.network.rubicon_router
).stop_event_poller(
pair_name=pair_name,
event_type=event_type,
)

######################################################################
# order methods
######################################################################
Expand Down Expand Up @@ -892,7 +917,7 @@ def get_offers(
order_by: str = "timestamp",
order_direction: str = "desc",
as_dataframe: bool = True,
) -> Optional[pd.DataFrame] | List[LimitOrder]:
) -> Optional[pd.DataFrame] | SubgraphResponse:
# TODO: add support for multiple pair_names
if len(pair_names) == 1:
base_asset, quote_asset = pair_names[0].split("/")
Expand Down Expand Up @@ -933,38 +958,43 @@ def get_offers(
as_dataframe=as_dataframe,
)
case _:
bids = self.market_data.get_offers(
maker=maker,
from_address=from_address,
buy_gem=self.network.tokens[base_asset].address,
pay_gem=self.network.tokens[quote_asset].address,
side=OrderSide.BUY.value.lower(),
open=open,
start_time=start_time,
end_time=end_time,
start_block=start_block,
end_block=end_block,
first=first,
order_by=order_by,
order_direction=order_direction,
as_dataframe=as_dataframe,
)
asks = self.market_data.get_offers(
maker=maker,
from_address=from_address,
buy_gem=self.network.tokens[quote_asset].address,
pay_gem=self.network.tokens[base_asset].address,
side=OrderSide.SELL.value.lower(),
open=open,
start_time=start_time,
end_time=end_time,
start_block=start_block,
end_block=end_block,
first=first,
order_by=order_by,
order_direction=order_direction,
as_dataframe=as_dataframe,
)
with ThreadPoolExecutor() as executor:
bids_future = executor.submit(
self.market_data.get_offers,
maker=maker,
from_address=from_address,
buy_gem=self.network.tokens[base_asset].address,
pay_gem=self.network.tokens[quote_asset].address,
side=OrderSide.BUY.value.lower(),
open=open,
start_time=start_time,
end_time=end_time,
start_block=start_block,
end_block=end_block,
first=first,
order_by=order_by,
order_direction=order_direction,
as_dataframe=as_dataframe,
)
asks_future = executor.submit(
self.market_data.get_offers,
maker=maker,
from_address=from_address,
buy_gem=self.network.tokens[quote_asset].address,
pay_gem=self.network.tokens[base_asset].address,
side=OrderSide.SELL.value.lower(),
open=open,
start_time=start_time,
end_time=end_time,
start_block=start_block,
end_block=end_block,
first=first,
order_by=order_by,
order_direction=order_direction,
as_dataframe=as_dataframe,
)
bids = bids_future.result()
asks = asks_future.result()

if as_dataframe:
result = pd.concat([bids, asks]).reset_index(drop=True)
Expand All @@ -989,10 +1019,10 @@ def get_offers(
as_dataframe=as_dataframe,
)

if isinstance(result, List):
if isinstance(result, SubgraphResponse):
limit_orders: List[LimitOrder] = []

for offer in result:
for offer in result.body:
base_asset, quote_asset = self._get_base_and_quote_asset(
raw=offer, pair_names=pair_names
)
Expand All @@ -1004,7 +1034,7 @@ def get_offers(
base_asset=base_asset, quote_asset=quote_asset, offer=offer
)
)
return limit_orders
return SubgraphResponse(block_number=result.block_number, body=limit_orders)

return result

Expand Down
Loading