Skip to content

Commit

Permalink
feat: add cached balances examples & minor tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
FJ-Riveros committed Nov 19, 2024
1 parent 636565a commit bcfceb3
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 7 deletions.
27 changes: 26 additions & 1 deletion campaign/campaign.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from typing import List
from constants.example_integrations import (
ACTIVE_ENA_START_BLOCK_EXAMPLE,
BEEFY_ARBITRUM_START_BLOCK_EXAMPLE,
)
from integrations.beefy_cached_balance_example_integration import (
BeefyCachedBalanceIntegration,
)
from integrations.claimed_ena_example_integration import ClaimedEnaIntegration
from utils import pendle
from web3 import Web3

Expand All @@ -25,7 +33,24 @@
},
end_block=40000000,
),
# Examples
# Example integration using cached user balances for improved performance,
# reads from previous balance snapshots
ClaimedEnaIntegration(
integration_id=IntegrationID.CLAIMED_ENA_EXAMPLE,
start_block=ACTIVE_ENA_START_BLOCK_EXAMPLE,
summary_cols=[SummaryColumn.CLAIMED_ENA_PTS_EXAMPLE],
reward_multiplier=1,
),
# Cached balances integration example, based on API calls
BeefyCachedBalanceIntegration(
integration_id=IntegrationID.BEEFY_CACHED_BALANCE_EXAMPLE,
start_block=BEEFY_ARBITRUM_START_BLOCK_EXAMPLE,
summary_cols=[SummaryColumn.BEEFY_CACHED_BALANCE_EXAMPLE],
chain=Chain.ARBITRUM,
reward_multiplier=1,
),
# Simple Integration class examples (outdated),
# don't use these anymore
PendleLPTIntegration(
integration_id=IntegrationID.PENDLE_USDE_LPT,
start_block=PENDLE_USDE_JULY_DEPLOYMENT_BLOCK,
Expand Down
16 changes: 16 additions & 0 deletions constants/example_integrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
from web3 import Web3
from utils.web3_utils import w3

PAGINATION_SIZE = 2000
ACTIVE_ENA_START_BLOCK_EXAMPLE = 21202656
ENA_ADDRESS = Web3.to_checksum_address("0x57e114B691Db790C35207b2e685D4A43181e6061")
with open("abi/ERC20_abi.json") as f:
ERC20_ABI = json.load(f)

ENA_CONTRACT = w3.eth.contract(
address=ENA_ADDRESS,
abi=ERC20_ABI,
)

BEEFY_ARBITRUM_START_BLOCK_EXAMPLE = 219870802
7 changes: 7 additions & 0 deletions constants/summary_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ class SummaryColumn(Enum):

CURVE_LLAMALEND_SHARDS = ("curve_llamalend_shards", SummaryColumnType.ETHENA_PTS)

CLAIMED_ENA_PTS_EXAMPLE = ("claimed_ena_example", SummaryColumnType.ETHENA_PTS)

BEEFY_CACHED_BALANCE_EXAMPLE = (
"beefy_cached_balance_example",
SummaryColumnType.ETHENA_PTS,
)

def __init__(self, column_name: str, col_type: SummaryColumnType):
self.column_name = column_name
self.col_type = col_type
Expand Down
112 changes: 112 additions & 0 deletions integrations/beefy_cached_balance_example_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging

from constants.example_integrations import (
BEEFY_ARBITRUM_START_BLOCK_EXAMPLE,
)
from integrations.cached_balances_integration import CachedBalancesIntegration
from web3 import Web3
from constants.chains import Chain
from typing import Dict, List, Set, cast
from eth_typing import ChecksumAddress

from constants.beefy import BEEFY_LRT_API_URL
from constants.summary_columns import SummaryColumn
from integrations.integration_ids import IntegrationID
from utils.request_utils import requests_retry_session
from utils.slack import slack_message

CHAIN_TO_API_URL_PREFIX = {
Chain.ARBITRUM: f"{BEEFY_LRT_API_URL}/partner/ethena/arbitrum",
Chain.FRAXTAL: f"{BEEFY_LRT_API_URL}/partner/ethena/fraxtal",
Chain.MANTLE: f"{BEEFY_LRT_API_URL}/partner/ethena/mantle",
Chain.OPTIMISM: f"{BEEFY_LRT_API_URL}/partner/ethena/optimism",
}


class BeefyCachedBalanceIntegration(CachedBalancesIntegration):
def __init__(
self,
integration_id: IntegrationID,
start_block: int,
chain: Chain,
summary_cols: List[SummaryColumn],
reward_multiplier: int = 1,
):
super().__init__(
integration_id=integration_id,
start_block=start_block,
chain=chain,
summary_cols=summary_cols,
reward_multiplier=reward_multiplier,
balance_multiplier=1,
excluded_addresses=None,
end_block=None,
)

def get_beefy_users(self) -> Set[ChecksumAddress]:
"""
Get all participants of the protocol, ever.
"""
logging.info("[Beefy integration] Getting participants...")
try:
base_url = CHAIN_TO_API_URL_PREFIX[self.chain]
url = f"{base_url}/users"

response = requests_retry_session().get(url)
data = cast(List[str], response.json())
return set(Web3.to_checksum_address(user) for user in data)
except Exception as e:
msg = f"Error getting participants for beefy: {e}"
logging.error(msg)
slack_message(msg)
return set()

def get_data_for_block(
self, block: int, users: Set[ChecksumAddress]
) -> Dict[ChecksumAddress, float]:
logging.info(f"Getting data for beefy at block {block}...")

if block < self.start_block:
return {}
data: Dict[ChecksumAddress, float] = {}
# Just get the first 10 users as a quick example
for user in list(users)[:10]:
try:
base_url = CHAIN_TO_API_URL_PREFIX[self.chain]
url = f"{base_url}/user/{user}/balance/{block}"
response = requests_retry_session(retries=1, backoff_factor=0).get(url)
user_data = response.json()

if user_data is None or "effective_balance" not in user_data:
data[user] = 0.0
data[user] = round(float(user_data["effective_balance"]), 4)
except Exception as e:
msg = f"Error getting beefy data for {user} at block {block}: {e}"
logging.error(msg)
slack_message(msg)
data[user] = 0.0
return data

def get_block_balances(
self, cached_data: Dict[int, Dict[ChecksumAddress, float]], blocks: List[int]
) -> Dict[int, Dict[ChecksumAddress, float]]:
logging.info("Getting block data for beefy...")
block_data: Dict[int, Dict[ChecksumAddress, float]] = {}
beefy_users = self.get_beefy_users()
for block in blocks:
if block < self.start_block:
block_data[block] = {}
continue
block_data[block] = self.get_data_for_block(block, beefy_users)
return block_data


if __name__ == "__main__":
example_integration = BeefyCachedBalanceIntegration(
integration_id=IntegrationID.BEEFY_CACHED_BALANCE_EXAMPLE,
start_block=BEEFY_ARBITRUM_START_BLOCK_EXAMPLE,
chain=Chain.ARBITRUM,
summary_cols=[SummaryColumn.BEEFY_CACHED_BALANCE_EXAMPLE],
)
# Since this integration is based on API calls, we don't need to use the cached data
print(example_integration.get_block_balances(cached_data={}, blocks=[276231389]))
2 changes: 1 addition & 1 deletion integrations/cached_balances_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ def __init__(

def get_block_balances(
self, cached_data: Dict[int, Dict[ChecksumAddress, float]], blocks: List[int]
) -> Dict[int, Dict[str, float]]:
) -> Dict[int, Dict[ChecksumAddress, float]]:
raise NotImplementedError
109 changes: 109 additions & 0 deletions integrations/claimed_ena_example_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from copy import deepcopy
import logging

from typing import Dict, List, Optional

from constants.summary_columns import SummaryColumn
from eth_typing import ChecksumAddress

from constants.example_integrations import (
ACTIVE_ENA_START_BLOCK_EXAMPLE,
ENA_CONTRACT,
PAGINATION_SIZE,
)

from constants.chains import Chain
from integrations.integration_ids import IntegrationID
from integrations.cached_balances_integration import CachedBalancesIntegration
from utils.web3_utils import fetch_events_logs_with_retry


class ClaimedEnaIntegration(CachedBalancesIntegration):
def __init__(
self,
integration_id: IntegrationID,
start_block: int,
chain: Chain = Chain.ETHEREUM,
summary_cols: Optional[List[SummaryColumn]] = None,
reward_multiplier: int = 1,
):
super().__init__(
integration_id,
start_block,
chain,
summary_cols,
reward_multiplier,
)

def get_block_balances(
self, cached_data: Dict[int, Dict[ChecksumAddress, float]], blocks: List[int]
) -> Dict[int, Dict[ChecksumAddress, float]]:
logging.info("Getting block data for claimed ENA")
new_block_data: Dict[int, Dict[ChecksumAddress, float]] = {}
if not blocks:
logging.error("No blocks provided to claimed ENA get_block_balances")
return new_block_data
sorted_blocks = sorted(blocks)
cache_copy: Dict[int, Dict[ChecksumAddress, float]] = deepcopy(cached_data)
for block in sorted_blocks:
# find the closest prev block in the data
# list keys parsed as ints and in descending order
sorted_existing_blocks = sorted(
cache_copy,
reverse=True,
)
# loop through the sorted blocks and find the closest previous block
prev_block = self.start_block
start = prev_block
bals = {}
for existing_block in sorted_existing_blocks:
if existing_block < block:
prev_block = existing_block
start = existing_block + 1
bals = deepcopy(cache_copy[prev_block])
break
# parse transfer events since and update bals
while start <= block:
to_block = min(start + PAGINATION_SIZE, block)
# print(f"Fetching transfers from {start} to {to_block}")
transfers = fetch_events_logs_with_retry(
"Token transfers claimed ENA",
ENA_CONTRACT.events.Transfer(),
start,
to_block,
)
for transfer in transfers:
recipient = transfer["args"]["to"]
if recipient not in bals:
bals[recipient] = 0
bals[recipient] += round(transfer["args"]["value"] / 10**18, 4)
start = to_block + 1
new_block_data[block] = bals
cache_copy[block] = bals
return new_block_data


if __name__ == "__main__":
example_integration = ClaimedEnaIntegration(
integration_id=IntegrationID.CLAIMED_ENA_EXAMPLE,
start_block=ACTIVE_ENA_START_BLOCK_EXAMPLE,
summary_cols=[SummaryColumn.CLAIMED_ENA_PTS_EXAMPLE],
reward_multiplier=20,
)

# Without cached data
without_cached_data_output = example_integration.get_block_balances(
cached_data={}, blocks=[21209856, 21217056]
)

print("=" * 120)
print("Run without cached data", without_cached_data_output)
print("=" * 120, "\n" * 5)

# With cached data, using the previous output so there is no need
# to fetch the previous blocks again
with_cached_data_output = example_integration.get_block_balances(
cached_data=without_cached_data_output, blocks=[21224256]
)
print("Run with cached data", with_cached_data_output)
print("=" * 120)
2 changes: 1 addition & 1 deletion integrations/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ def get_participants(
# either get_participants OR get_block_balances must be implemented
def get_block_balances(
self, cached_data: Dict[int, Dict[ChecksumAddress, float]], blocks: List[int]
) -> Dict[int, Dict[str, float]]:
) -> Dict[int, Dict[ChecksumAddress, float]]:
raise NotImplementedError
8 changes: 8 additions & 0 deletions integrations/integration_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ class IntegrationID(Enum):
# Fluid
FLUID = ("Fluid_susde", "Fluid sUSDe", Token.SUSDE)

# Claimed ENA
CLAIMED_ENA_EXAMPLE = ("claimed_ena_example", "Claimed ENA Example", Token.ENA)
BEEFY_CACHED_BALANCE_EXAMPLE = (
"beefy_cached_balance_example",
"Beefy Cached Balance Example",
Token.USDE,
)

def __init__(self, column_name: str, description: str, token: Token = Token.USDE):
self.column_name = column_name
self.description = description
Expand Down
4 changes: 2 additions & 2 deletions integrations/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
# TODO: Implement this function
def get_block_balances(
self, cached_data: Dict[int, Dict[ChecksumAddress, float]], blocks: List[int]
) -> Dict[int, Dict[str, float]]:
) -> Dict[int, Dict[ChecksumAddress, float]]:
"""Get user balances for specified blocks, using cached data when available.
Args:
Expand All @@ -49,7 +49,7 @@ def get_block_balances(
blocks (List[int]): List of block numbers to get balances for.
Returns:
Dict[int, Dict[str, float]]: Dictionary mapping block numbers to user balances,
Dict[int, Dict[ChecksumAddress, float]]: Dictionary mapping block numbers to user balances,
where each inner dictionary maps user addresses to their token balance
at that block.
"""
Expand Down
23 changes: 23 additions & 0 deletions utils/request_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


def requests_retry_session(
retries=5,
backoff_factor=0.3,
status_forcelist=(400, 404, 500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
3 changes: 1 addition & 2 deletions utils/web3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
import os
import time
import traceback
from typing import Union, Literal

from dotenv import load_dotenv
from eth_abi.abi import decode

from web3 import Web3
from web3.types import HexStr, HexBytes, BlockIdentifier
from web3.types import BlockIdentifier

from utils.slack import slack_message
from constants.chains import Chain
Expand Down

0 comments on commit bcfceb3

Please sign in to comment.