From 2cae1c62dbd25dcffb30fb6616690ce612944125 Mon Sep 17 00:00:00 2001 From: Chris Heaney Date: Tue, 5 Dec 2023 08:58:12 -0500 Subject: [PATCH] add fetch to account subscriber interfaces --- src/driftpy/accounts/cache/drift_client.py | 3 +++ src/driftpy/accounts/cache/user.py | 3 +++ src/driftpy/accounts/polling/drift_client.py | 3 +++ src/driftpy/accounts/polling/user.py | 13 ++++++++++--- src/driftpy/accounts/types.py | 8 ++++++++ src/driftpy/accounts/ws/account_subscriber.py | 1 - src/driftpy/accounts/ws/drift_client.py | 19 +++++++++++++++++++ 7 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/driftpy/accounts/cache/drift_client.py b/src/driftpy/accounts/cache/drift_client.py index 0b0f1c9a..e7a47a2d 100644 --- a/src/driftpy/accounts/cache/drift_client.py +++ b/src/driftpy/accounts/cache/drift_client.py @@ -75,6 +75,9 @@ async def update_cache(self): self.cache["oracle_price_data"] = oracle_data + async def fetch(self): + await self.update_cache() + def get_state_account_and_slot(self) -> Optional[DataAndSlot[StateAccount]]: return self.cache["state"] diff --git a/src/driftpy/accounts/cache/user.py b/src/driftpy/accounts/cache/user.py index 36d535e6..e4fac62b 100644 --- a/src/driftpy/accounts/cache/user.py +++ b/src/driftpy/accounts/cache/user.py @@ -28,6 +28,9 @@ async def update_cache(self): user_and_slot = await get_user_account_and_slot(self.program, self.user_pubkey) self.user_and_slot = user_and_slot + async def fetch(self): + await self.update_cache() + def get_user_account_and_slot(self) -> Optional[DataAndSlot[UserAccount]]: return self.user_and_slot diff --git a/src/driftpy/accounts/polling/drift_client.py b/src/driftpy/accounts/polling/drift_client.py index 03ea0cd8..6bef050c 100644 --- a/src/driftpy/accounts/polling/drift_client.py +++ b/src/driftpy/accounts/polling/drift_client.py @@ -67,6 +67,9 @@ async def subscribe(self): while self.accounts_ready() is False: await self.bulk_account_loader.load() + async def fetch(self): + await self.bulk_account_loader.load() + def accounts_ready(self) -> bool: return self.state is not None diff --git a/src/driftpy/accounts/polling/user.py b/src/driftpy/accounts/polling/user.py index 740f2311..d8ad6211 100644 --- a/src/driftpy/accounts/polling/user.py +++ b/src/driftpy/accounts/polling/user.py @@ -3,7 +3,11 @@ from anchorpy import Program from solders.pubkey import Pubkey -from driftpy.accounts import UserAccountSubscriber, DataAndSlot +from driftpy.accounts import ( + UserAccountSubscriber, + DataAndSlot, + get_user_account_and_slot, +) from driftpy.accounts.bulk_account_loader import BulkAccountLoader from driftpy.types import UserAccount @@ -51,13 +55,16 @@ def _account_loader_callback(self, buffer: bytes, slot: int): self.data_and_slot = DataAndSlot(slot, account) async def fetch(self): - await self.bulk_account_loader.load() + data_and_slot = await get_user_account_and_slot( + self.program, self.user_account_pubkey + ) + self._update_data(data_and_slot) def _update_data(self, new_data: Optional[DataAndSlot[UserAccount]]): if new_data is None: return - if self.data_and_slot is None or new_data.slot > self.data_and_slot.slot: + if self.data_and_slot is None or new_data.slot >= self.data_and_slot.slot: self.data_and_slot = new_data def unsubscribe(self): diff --git a/src/driftpy/accounts/types.py b/src/driftpy/accounts/types.py index a3946df0..9c72245d 100644 --- a/src/driftpy/accounts/types.py +++ b/src/driftpy/accounts/types.py @@ -31,6 +31,10 @@ async def subscribe(self): def unsubscribe(self): pass + @abstractmethod + async def fetch(self): + pass + @abstractmethod def get_state_account_and_slot(self) -> Optional[DataAndSlot[StateAccount]]: pass @@ -63,6 +67,10 @@ async def subscribe(self): def unsubscribe(self): pass + @abstractmethod + async def fetch(self): + pass + @abstractmethod def get_user_account_and_slot(self) -> Optional[DataAndSlot[UserAccount]]: pass diff --git a/src/driftpy/accounts/ws/account_subscriber.py b/src/driftpy/accounts/ws/account_subscriber.py index 1c7a9f91..c2d36f03 100644 --- a/src/driftpy/accounts/ws/account_subscriber.py +++ b/src/driftpy/accounts/ws/account_subscriber.py @@ -80,7 +80,6 @@ async def fetch(self): new_data = await get_account_data_and_slot( self.pubkey, self.program, self.commitment, self.decode ) - self._update_data(new_data) def _update_data(self, new_data: Optional[DataAndSlot[T]]): diff --git a/src/driftpy/accounts/ws/drift_client.py b/src/driftpy/accounts/ws/drift_client.py index b681714b..f6061d3b 100644 --- a/src/driftpy/accounts/ws/drift_client.py +++ b/src/driftpy/accounts/ws/drift_client.py @@ -1,3 +1,5 @@ +import asyncio + from anchorpy import Program from solana.rpc.commitment import Commitment @@ -120,6 +122,23 @@ def is_subscribed(self): self.state_subscriber is not None and self.state_subscriber.is_subscribed() ) + async def fetch(self): + if not self.is_subscribed(): + return + + tasks = [self.state_subscriber.fetch()] + + for perp_market_subscriber in self.perp_market_subscribers.values(): + tasks.append(perp_market_subscriber.fetch()) + + for spot_market_subscriber in self.spot_market_subscribers.values(): + tasks.append(spot_market_subscriber.fetch()) + + for oracle_subscriber in self.oracle_subscribers.values(): + tasks.append(oracle_subscriber.fetch()) + + await asyncio.gather(*tasks) + def get_state_account_and_slot(self) -> Optional[DataAndSlot[StateAccount]]: return self.state_subscriber.data_and_slot