Skip to content

Commit

Permalink
indexing side effects (#11492)
Browse files Browse the repository at this point in the history
  • Loading branch information
alecsavvy authored Mar 4, 2025
1 parent 1ec83cc commit 3656b57
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 26 deletions.
25 changes: 1 addition & 24 deletions packages/discovery-provider/src/api_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# pylint: disable=R0401
from src.utils import helpers, web3_provider
from src.utils.config import shared_config
from src.utils.helpers import generate_signature
from src.utils.redis_connection import get_redis
from src.utils.redis_constants import most_recent_indexed_block_redis_key

Expand Down Expand Up @@ -108,30 +109,6 @@ def response_dict_with_metadata(response_dictionary, sign_response):
return response_dictionary


# Generate signature and timestamp using data
def generate_signature(data):
# convert sorted dictionary to string with no white spaces
to_sign_str = json.dumps(
data,
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
cls=DateTimeEncoder,
)

# generate hash for if data contains unicode chars
to_sign_hash = Web3.keccak(text=to_sign_str).hex()

# generate SignableMessage for sign_message()
encoded_to_sign = encode_defunct(hexstr=to_sign_hash)

# sign to get signature
signed_message = w3.eth.account.sign_message(
encoded_to_sign, private_key=shared_config["delegate"]["private_key"]
)
return signed_message.signature.hex()


# Accepts raw data with timestamp key and relevant fields, converts data to hash, and recovers the wallet
def recover_wallet(data, signature):
json_dump = json.dumps(
Expand Down
2 changes: 1 addition & 1 deletion packages/discovery-provider/src/gated_content/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from typing_extensions import NotRequired

from src.api_helpers import generate_signature
from src.gated_content.types import GatedContentType
from src.utils.helpers import generate_signature


class GatedContentSignatureArgs(TypedDict):
Expand Down
15 changes: 14 additions & 1 deletion packages/discovery-provider/src/tasks/index_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from redis import Redis
from sqlalchemy import desc
from web3 import Web3

from src.challenges.challenge_event_bus import ChallengeEventBus
from src.models.core.core_indexed_blocks import CoreIndexedBlocks
Expand All @@ -24,6 +25,7 @@
index_core_entity_manager,
)
from src.tasks.index_core_plays import index_core_plays
from src.tasks.index_core_side_effects import run_side_effects
from src.utils.config import shared_config
from src.utils.core import (
core_health_check_cache_key,
Expand Down Expand Up @@ -126,6 +128,7 @@ def index_core(self):
redis: Redis = index_core.redis
db: SessionManager = index_core.db
challenge_bus: ChallengeEventBus = index_core.challenge_event_bus
web3: Web3 = index_core.web3

update_lock = redis.lock(index_core_lock_key, blocking_timeout=25, timeout=600)
have_lock = False
Expand Down Expand Up @@ -238,12 +241,22 @@ def index_core(self):
indexed_em_block = index_core_entity_manager(
logger=logger,
update_task=self,
web3=self.web3,
web3=web3,
session=session,
indexing_entity_manager=indexing_entity_manager,
block=block,
)

if indexing_entity_manager:
run_side_effects(
logger=logger,
block=block,
session=session,
web3=web3,
redis=redis,
challenge_bus=challenge_bus,
)

# get block parenthash, in none case also use None
# this would be the case in solana cutover where the previous
# block to the cutover isn't indexed either
Expand Down
58 changes: 58 additions & 0 deletions packages/discovery-provider/src/tasks/index_core_side_effects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from logging import LoggerAdapter

from redis import Redis
from sqlalchemy.orm.session import Session
from web3 import Web3

from src.challenges.challenge_event_bus import ChallengeEventBus
from src.challenges.trending_challenge import should_trending_challenge_update
from src.tasks.calculate_trending_challenges import enqueue_trending_challenges
from src.tasks.celery_app import celery
from src.tasks.core.gen.protocol_pb2 import BlockResponse


def run_side_effects(
logger: LoggerAdapter,
block: BlockResponse,
session: Session,
web3: Web3,
redis: Redis,
challenge_bus: ChallengeEventBus,
):
block_number = block.height
# should be an int like before
block_time = block.timestamp.seconds

try:
# Only dispatch trending challenge computation on a similar block, modulo 100
# so things are consistent. Note that if a discovery node is behind, this will be
# inconsistent.
# TODO: Consider better alternatives for consistency with behind nodes. Maybe this
# should not be calculated.
if block_number % 100 == 0:
# Check the last block's timestamp for updating the trending challenge
[should_update, date] = should_trending_challenge_update(
session, block_time
)
if should_update and date is not None:
enqueue_trending_challenges(session, web3, redis, challenge_bus, date)

except Exception as e:
# Do not throw error, as this should not stop indexing
logger.error(
f"Error in calling update trending challenge {e}",
exc_info=True,
)
try:
# Every 100 blocks, poll and apply delist statuses from trusted notifier
if block_number % 100 == 0:
celery.send_task(
"update_delist_statuses",
kwargs={"current_block_timestamp": block_time},
)
except Exception as e:
# Do not throw error, as this should not stop indexing
logger.error(
f"Error in calling update_delist_statuses {e}",
exc_info=True,
)
30 changes: 30 additions & 0 deletions packages/discovery-provider/src/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import base58
import psutil
import requests

# pylint: disable=no-name-in-module
from eth_account.messages import encode_defunct
from flask import g, request
from hashids import Hashids
from jsonformatter import JsonFormatter
Expand All @@ -23,9 +26,11 @@
from solders.transaction_status import UiTransactionStatusMeta
from sqlalchemy import inspect
from web3 import Web3
from web3.auto import w3

from src import exceptions
from src.solana.solana_helpers import MEMO_PROGRAM_ID, MEMO_V2_PROGRAM_ID
from src.utils.config import shared_config

from . import multihash

Expand Down Expand Up @@ -631,3 +636,28 @@ def get_final_poa_block() -> int:

def format_total_audio_balance(balance: str) -> int:
return int(int(balance) / 1e18)


# Generate signature and timestamp using data
# copied from src/api_helpers.py because of circular imports
def generate_signature(data):
# convert sorted dictionary to string with no white spaces
to_sign_str = json.dumps(
data,
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
cls=DateTimeEncoder,
)

# generate hash for if data contains unicode chars
to_sign_hash = Web3.keccak(text=to_sign_str).hex()

# generate SignableMessage for sign_message()
encoded_to_sign = encode_defunct(hexstr=to_sign_hash)

# sign to get signature
signed_message = w3.eth.account.sign_message(
encoded_to_sign, private_key=shared_config["delegate"]["private_key"]
)
return signed_message.signature.hex()

0 comments on commit 3656b57

Please sign in to comment.