diff --git a/packages/discovery-provider/src/api_helpers.py b/packages/discovery-provider/src/api_helpers.py index b6dc57edb8b..424f975e206 100644 --- a/packages/discovery-provider/src/api_helpers.py +++ b/packages/discovery-provider/src/api_helpers.py @@ -15,6 +15,7 @@ # pylint: disable=R0401 from src.utils import helpers, web3_provider from src.utils.config import shared_config +from src.utils.helpers import generate_signature from src.utils.redis_connection import get_redis from src.utils.redis_constants import most_recent_indexed_block_redis_key @@ -108,30 +109,6 @@ def response_dict_with_metadata(response_dictionary, sign_response): return response_dictionary -# Generate signature and timestamp using data -def generate_signature(data): - # convert sorted dictionary to string with no white spaces - to_sign_str = json.dumps( - data, - sort_keys=True, - ensure_ascii=False, - separators=(",", ":"), - cls=DateTimeEncoder, - ) - - # generate hash for if data contains unicode chars - to_sign_hash = Web3.keccak(text=to_sign_str).hex() - - # generate SignableMessage for sign_message() - encoded_to_sign = encode_defunct(hexstr=to_sign_hash) - - # sign to get signature - signed_message = w3.eth.account.sign_message( - encoded_to_sign, private_key=shared_config["delegate"]["private_key"] - ) - return signed_message.signature.hex() - - # Accepts raw data with timestamp key and relevant fields, converts data to hash, and recovers the wallet def recover_wallet(data, signature): json_dump = json.dumps( diff --git a/packages/discovery-provider/src/gated_content/signature.py b/packages/discovery-provider/src/gated_content/signature.py index acad311cb0c..3b53ea96b72 100644 --- a/packages/discovery-provider/src/gated_content/signature.py +++ b/packages/discovery-provider/src/gated_content/signature.py @@ -4,8 +4,8 @@ from typing_extensions import NotRequired -from src.api_helpers import generate_signature from src.gated_content.types import GatedContentType +from src.utils.helpers import generate_signature class GatedContentSignatureArgs(TypedDict): diff --git a/packages/discovery-provider/src/tasks/index_core.py b/packages/discovery-provider/src/tasks/index_core.py index fe537d4bf8c..34c730d9d8a 100644 --- a/packages/discovery-provider/src/tasks/index_core.py +++ b/packages/discovery-provider/src/tasks/index_core.py @@ -5,6 +5,7 @@ from redis import Redis from sqlalchemy import desc +from web3 import Web3 from src.challenges.challenge_event_bus import ChallengeEventBus from src.models.core.core_indexed_blocks import CoreIndexedBlocks @@ -24,6 +25,7 @@ index_core_entity_manager, ) from src.tasks.index_core_plays import index_core_plays +from src.tasks.index_core_side_effects import run_side_effects from src.utils.config import shared_config from src.utils.core import ( core_health_check_cache_key, @@ -126,6 +128,7 @@ def index_core(self): redis: Redis = index_core.redis db: SessionManager = index_core.db challenge_bus: ChallengeEventBus = index_core.challenge_event_bus + web3: Web3 = index_core.web3 update_lock = redis.lock(index_core_lock_key, blocking_timeout=25, timeout=600) have_lock = False @@ -238,12 +241,22 @@ def index_core(self): indexed_em_block = index_core_entity_manager( logger=logger, update_task=self, - web3=self.web3, + web3=web3, session=session, indexing_entity_manager=indexing_entity_manager, block=block, ) + if indexing_entity_manager: + run_side_effects( + logger=logger, + block=block, + session=session, + web3=web3, + redis=redis, + challenge_bus=challenge_bus, + ) + # get block parenthash, in none case also use None # this would be the case in solana cutover where the previous # block to the cutover isn't indexed either diff --git a/packages/discovery-provider/src/tasks/index_core_side_effects.py b/packages/discovery-provider/src/tasks/index_core_side_effects.py new file mode 100644 index 00000000000..fdaaea30474 --- /dev/null +++ b/packages/discovery-provider/src/tasks/index_core_side_effects.py @@ -0,0 +1,58 @@ +from logging import LoggerAdapter + +from redis import Redis +from sqlalchemy.orm.session import Session +from web3 import Web3 + +from src.challenges.challenge_event_bus import ChallengeEventBus +from src.challenges.trending_challenge import should_trending_challenge_update +from src.tasks.calculate_trending_challenges import enqueue_trending_challenges +from src.tasks.celery_app import celery +from src.tasks.core.gen.protocol_pb2 import BlockResponse + + +def run_side_effects( + logger: LoggerAdapter, + block: BlockResponse, + session: Session, + web3: Web3, + redis: Redis, + challenge_bus: ChallengeEventBus, +): + block_number = block.height + # should be an int like before + block_time = block.timestamp.seconds + + try: + # Only dispatch trending challenge computation on a similar block, modulo 100 + # so things are consistent. Note that if a discovery node is behind, this will be + # inconsistent. + # TODO: Consider better alternatives for consistency with behind nodes. Maybe this + # should not be calculated. + if block_number % 100 == 0: + # Check the last block's timestamp for updating the trending challenge + [should_update, date] = should_trending_challenge_update( + session, block_time + ) + if should_update and date is not None: + enqueue_trending_challenges(session, web3, redis, challenge_bus, date) + + except Exception as e: + # Do not throw error, as this should not stop indexing + logger.error( + f"Error in calling update trending challenge {e}", + exc_info=True, + ) + try: + # Every 100 blocks, poll and apply delist statuses from trusted notifier + if block_number % 100 == 0: + celery.send_task( + "update_delist_statuses", + kwargs={"current_block_timestamp": block_time}, + ) + except Exception as e: + # Do not throw error, as this should not stop indexing + logger.error( + f"Error in calling update_delist_statuses {e}", + exc_info=True, + ) diff --git a/packages/discovery-provider/src/utils/helpers.py b/packages/discovery-provider/src/utils/helpers.py index d78d06f1aa3..2daf8b0f8a1 100644 --- a/packages/discovery-provider/src/utils/helpers.py +++ b/packages/discovery-provider/src/utils/helpers.py @@ -14,6 +14,9 @@ import base58 import psutil import requests + +# pylint: disable=no-name-in-module +from eth_account.messages import encode_defunct from flask import g, request from hashids import Hashids from jsonformatter import JsonFormatter @@ -23,9 +26,11 @@ from solders.transaction_status import UiTransactionStatusMeta from sqlalchemy import inspect from web3 import Web3 +from web3.auto import w3 from src import exceptions from src.solana.solana_helpers import MEMO_PROGRAM_ID, MEMO_V2_PROGRAM_ID +from src.utils.config import shared_config from . import multihash @@ -631,3 +636,28 @@ def get_final_poa_block() -> int: def format_total_audio_balance(balance: str) -> int: return int(int(balance) / 1e18) + + +# Generate signature and timestamp using data +# copied from src/api_helpers.py because of circular imports +def generate_signature(data): + # convert sorted dictionary to string with no white spaces + to_sign_str = json.dumps( + data, + sort_keys=True, + ensure_ascii=False, + separators=(",", ":"), + cls=DateTimeEncoder, + ) + + # generate hash for if data contains unicode chars + to_sign_hash = Web3.keccak(text=to_sign_str).hex() + + # generate SignableMessage for sign_message() + encoded_to_sign = encode_defunct(hexstr=to_sign_hash) + + # sign to get signature + signed_message = w3.eth.account.sign_message( + encoded_to_sign, private_key=shared_config["delegate"]["private_key"] + ) + return signed_message.signature.hex()