From 5956b36bf886794ebbf16a2d976b7a4d5cfc57a4 Mon Sep 17 00:00:00 2001 From: "s. rannou" Date: Thu, 4 Apr 2024 13:30:07 +0200 Subject: [PATCH] feat: add basic handling of the main processing loop and introduced watched validators --- etc/config.local.yaml | 4 + eth_validator_watcher/beacon.py | 23 +++ eth_validator_watcher/entrypoint_v2.py | 34 +++- eth_validator_watcher/watched_validators.py | 162 ++++++++++++++++++++ poetry.lock | 2 +- pyproject.toml | 2 +- 6 files changed, 218 insertions(+), 9 deletions(-) create mode 100644 eth_validator_watcher/watched_validators.py diff --git a/etc/config.local.yaml b/etc/config.local.yaml index 6f43721..fb89ab4 100644 --- a/etc/config.local.yaml +++ b/etc/config.local.yaml @@ -10,6 +10,10 @@ slack_token: ~ relays: ~ liveness_file: ~ +# This mapping is reloaded dynamically at the beginning of each +# epoch. If the new mapping is invalid the watcher will crash, be sure +# to use atomic filesystem operations to have a completely updated +# configuration file if you dynamically watch keys. watched_keys: - public_key: '0x832b8286f5d6535fd941c6c4ed8b9b20d214fc6aa726ce4fba1c9dbb4f278132646304f550e557231b6932aa02cf08d3' labels: ['google'] diff --git a/eth_validator_watcher/beacon.py b/eth_validator_watcher/beacon.py index 8fa9f1a..35386f4 100644 --- a/eth_validator_watcher/beacon.py +++ b/eth_validator_watcher/beacon.py @@ -82,6 +82,14 @@ def __init__(self, url: str, timeout_sec: int) -> None: self.__http.mount("http://", adapter) self.__http.mount("https://", adapter) + def get_url(self) -> str: + """Return the URL of the beacon.""" + return self.__url + + def get_timeout_sec(self) -> int: + """Return the timeout in seconds used to query the beacon.""" + return self.__timeout_sec + @retry( stop=stop_after_attempt(5), wait=wait_fixed(3), @@ -208,6 +216,21 @@ def get_status_to_index_to_validator( return result + def get_validators(self) -> Validators: + response = self.__get_retry_not_found( + f"{self.__url}/eth/v1/beacon/states/head/validators", timeout=self.__timeout_sec + ) + + # Unsure if explicit del help with memory here, let's keep it + # for now and benchmark this in real conditions. + response.raise_for_status() + validators_dict = response.json() + del response + validators = Validators(**validators_dict) + del validators_dict + + return validators + @lru_cache(maxsize=1) def get_duty_slot_to_committee_index_to_validators_index( self, epoch: int diff --git a/eth_validator_watcher/entrypoint_v2.py b/eth_validator_watcher/entrypoint_v2.py index 6fc5477..2cdcb49 100644 --- a/eth_validator_watcher/entrypoint_v2.py +++ b/eth_validator_watcher/entrypoint_v2.py @@ -5,13 +5,15 @@ from pathlib import Path from typing import Optional +import logging import typer import time +from .beacon import Beacon from .config import load_config, WatchedKeyConfig +from .watched_validators import WatchedValidators -print = partial(print, flush=True) app = typer.Typer(add_completion=False) @@ -27,23 +29,37 @@ def __init__(self, cfg_path: Path) -> None: cfg_path: Path Path to the configuration file. """ - self.cfg_path = cfg_path - self.cfg = None + self._cfg_path = cfg_path + self._cfg = None + self._beacon = None - def reload_config(self) -> None: + self._reload_config() + + def _reload_config(self) -> None: """Reload the configuration file. """ try: - self.cfg = load_config(self.cfg_path) + self._cfg = load_config(self._cfg_path) except ValidationError as err: raise typer.BadParameter(f'Invalid configuration file: {err}') + if self._beacon is None or self._beacon.get_url() != self._cfg.beacon_url or self._beacon.get_timeout_sec() != self._cfg.beacon_timeout_sec: + self._beacon = Beacon(self._cfg.beacon_url, self._cfg.beacon_timeout_sec) + def run(self) -> None: """Run the Ethereum Validator Watcher. """ + watched_validators = WatchedValidators() while True: - print('Reloading configuration...') - self.reload_config() + logging.info('Processing new epoch') + beacon_validators = self._beacon.get_validators() + watched_validators.process_epoch(beacon_validators) + + logging.info('Processing configuration update') + self._reload_config() + watched_validators.process_config(self._cfg) + + logging.info('Waiting for next iteration') time.sleep(1) @@ -60,6 +76,10 @@ def handler( ), ) -> None: """Run the Ethereum Validator Watcher.""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)-8s %(message)s' + ) watcher = ValidatorWatcher(config) watcher.run() diff --git a/eth_validator_watcher/watched_validators.py b/eth_validator_watcher/watched_validators.py new file mode 100644 index 0000000..d486bf5 --- /dev/null +++ b/eth_validator_watcher/watched_validators.py @@ -0,0 +1,162 @@ +"""Watched validators. + +This module provides a wrapper around per-validator computations +before exposing them later to prometheus. There are 4 types of +processing performed: + +- process_config: configuration update (per-key labels) +- process_epoch: new epoch processing (beacon chain status update) +- process_missed_attestations: missed attestation processing (slot 16) +- process_rewards: rewards processing (slot 17) + +WatchedValidator which holds the state of a validator while +WatchedValidators handles the collection of all validators, providing +efficient ways to access them which are then used by the prometheus +exporter. +""" + +import logging + +from typing import Optional + +from .config import Config, WatchedKeyConfig +from .models import Validators + + +class WatchedValidator: + """Watched validator abstraction. + + This needs to be optimized for both CPU and memory usage as it + will be instantiated for every validator of the network. + + Attributes: + index: Validator index + pubkey: Validator public key + status: Validator status for the current epoch + previous_status: Validator previous status for the previous epoch + labels: Validator labels + missed_attestation: Validator missed attestation for the current epoch + previous_missed_attestation: Validator missed previous attestation for the previous epoch + suboptimal_source: Validator suboptimal source for the current epoch + suboptimal_target: Validator suboptimal target for the current epoch + suboptimal_head: Validator suboptimal head for the current epoch + beacon_validator: Latest state of the validator from the beacon chain + """ + + def __init__(self): + self.index : int = 0 + self.previous_status : Validators.DataItem.StatusEnum | None = None + self.labels : Optional[list[str]] = None + self.missed_attestation : bool | None = None + self.previous_missed_attestation : bool | None = None + self.suboptimal_source : bool | None = None + self.suboptimal_target : bool | None = None + self.suboptimal_head : bool | None = None + self.beacon_validator : Validators.DataItem | None = None + + @property + def pubkey(self) -> str: + return self.beacon_validator.validator.pubkey + + @property + def status(self) -> Validators.DataItem.StatusEnum: + return self.beacon_validator.status + + def process_config(self, config: WatchedKeyConfig): + """Processes a new configuration. + + Parameters: + config: New configuration + """ + self.labels = config.labels + + def process_epoch(self, validator: Validators.DataItem): + """Processes a new epoch. + + Parameters: + validator: Validator beacon state + """ + if self.beacon_validator is not None: + self.previous_status = self.status + + self.previous_missed_attestation = self.missed_attestation + self.missed_attestation = None + + self.suboptimal_source = None + self.suboptimal_target = None + self.suboptimal_head = None + + self.beacon_validator = validator + + +class WatchedValidators: + """Wrapper around watched validators. + + Provides facilities to retrieve a validator by index or public + key. This needs to be efficient both in terms of CPU and memory as + there are about ~1 million validators on the network. + """ + + def __init__(self): + self._validators: dict[int, WatchedValidator] = {} + self._pubkey_to_index: dict[str, int] = {} + + def get_validator_by_index(self, index: int) -> Optional[WatchedValidator]: + """Get a validator by index. + + Parameters: + index: Index of the validator to retrieve + """ + return self._validators.get(index) + + def get_validator_by_pubkey(self, pubkey: str) -> Optional[WatchedValidator]: + """Get a validator by public key. + + Parameters: + pubkey: Public key of the validator to retrieve + """ + index = self._pubkey_to_index.get(pubkey) + if index is None: + return None + return self._validators.get(index) + + def process_config(self, config: Config): + """Process a config update + + Parameters: + config: Updated configuration + """ + logging.info('Processing config & validator labels') + + unknown = 0 + for item in config.watched_keys: + updated = False + index = self._pubkey_to_index.get(item.public_key, None) + if index: + validator = self._validators.get(index) + if validator: + validator.process_config(item) + updated = True + if not updated: + unknown += 1 + + logging.info(f'Config processed ({unknown} unknown validators were skipped)') + + def process_epoch(self, validators: Validators): + """Process a new epoch + + Parameters: + validators: New validator state for the epoch from the beaconchain. + """ + logging.info('Processing new epoch') + + for item in validators.data: + validator = self._validators.get(item.index) + if validator is None: + validator = WatchedValidator() + self._validators[item.index] = validator + self._pubkey_to_index[item.validator.pubkey] = item.index + + validator.process_epoch(item) + + logging.info(f'New epoch processed ({len(validators.data)} validators)') diff --git a/poetry.lock b/poetry.lock index b970356..c234659 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "annotated-types" diff --git a/pyproject.toml b/pyproject.toml index b7630bb..4400286 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,4 +32,4 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] -eth-validator-watcher = "eth_validator_watcher.entrypoint:app" +eth-validator-watcher = "eth_validator_watcher.entrypoint_v2:app"