diff --git a/core/schains/monitor/main.py b/core/schains/monitor/main.py index 54ffbefe..f4222b62 100644 --- a/core/schains/monitor/main.py +++ b/core/schains/monitor/main.py @@ -17,11 +17,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import functools import logging import os import time -from typing import Callable, Optional +from typing import Optional from importlib import reload from skale import Skale, SkaleIma @@ -66,8 +65,8 @@ def run_config_pipeline( node_config: NodeConfig, stream_version: str, ) -> None: + logger.info('Gathering initial skale manager data') schain = skale.schains.get_by_name(schain_name) - schain_record = SChainRecord.get_by_name(schain_name) rotation_data = skale.node_rotation.get_rotation(schain_name) allowed_ranges = get_sync_agent_ranges(skale) ima_linked = not SYNC_NODE and skale_ima.linker.has_schain(schain_name) @@ -75,10 +74,14 @@ def run_config_pipeline( last_dkg_successful = skale.dkg.is_last_dkg_successful(group_index) current_nodes = get_current_nodes(skale, schain_name) + logger.info('Initing schain record') + schain_record = SChainRecord.get_by_name(schain_name) + estate = ExternalState( ima_linked=ima_linked, chain_id=skale_ima.web3.eth.chain_id, ranges=allowed_ranges ) econfig = ExternalConfig(schain_name) + logger.info('Initing config checks') config_checks = ConfigChecks( schain_name=schain_name, node_id=node_config.id, @@ -91,6 +94,7 @@ def run_config_pipeline( estate=estate, ) + logger.info('Initing config action manager') config_am = ConfigActionManager( skale=skale, schain=schain, @@ -103,8 +107,9 @@ def run_config_pipeline( econfig=econfig, ) + logger.info('Gathering config status') status = config_checks.get_all(log=False, expose=True) - logger.info('Config checks: %s', status) + logger.info('Config status: %s', status) if SYNC_NODE: logger.info( @@ -128,6 +133,7 @@ def run_skaled_pipeline( schain_name: str, skale: Skale, node_config: NodeConfig, dutils: DockerUtils ) -> None: schain = skale.schains.get_by_name(schain_name) + logger.info('Initing schain record') schain_record = SChainRecord.get_by_name(schain_name) logger.info('Record: %s', SChainRecord.to_dict(schain_record)) @@ -135,7 +141,7 @@ def run_skaled_pipeline( dutils = dutils or DockerUtils() rc = get_default_rule_controller(name=schain_name) - logger.info('Initing skaled checks manager') + logger.info('Initing skaled checks') skaled_checks = SkaledChecks( schain_name=schain.name, schain_record=schain_record, @@ -159,7 +165,7 @@ def run_skaled_pipeline( econfig=ExternalConfig(schain_name), dutils=dutils, ) - logger.info('Fetching skaled checks') + logger.info('Gathering skaled status') check_status = skaled_checks.get_all(log=False, expose=True) logger.info('Get automatic repair option') automatic_repair = get_automatic_repair_option() @@ -237,14 +243,16 @@ def needed(self) -> bool: not schain_record.sync_config_run or not schain_record.first_run ) - def create_pipeline(self) -> Callable: - return functools.partial( - run_skaled_pipeline, - schain_name=self.schain_name, - skale=self.skale, - node_config=self.node_config, - dutils=self.dutils, - ) + def run(self) -> None: + try: + run_skaled_pipeline( + schain_name=self.schain_name, + skale=self.skale, + node_config=self.node_config, + dutils=self.dutils, + ) + except Exception: + logger.exception('Task %s failed', self.name) class ConfigTask(ITask): @@ -296,15 +304,17 @@ def start_ts(self, value: int) -> None: def needed(self) -> bool: return SYNC_NODE or is_node_part_of_chain(self.skale, self.schain_name, self.node_config.id) - def create_pipeline(self) -> Callable: - return functools.partial( - run_config_pipeline, - schain_name=self.schain_name, - skale=self.skale, - skale_ima=self.skale_ima, - node_config=self.node_config, - stream_version=self.stream_version, - ) + def run(self) -> None: + try: + run_config_pipeline( + schain_name=self.schain_name, + skale=self.skale, + skale_ima=self.skale_ima, + node_config=self.node_config, + stream_version=self.stream_version, + ) + except Exception: + logger.exception('Task %s failed', self.name) def start_tasks( diff --git a/core/schains/monitor/tasks.py b/core/schains/monitor/tasks.py index 7fcc86f8..5a7b6563 100644 --- a/core/schains/monitor/tasks.py +++ b/core/schains/monitor/tasks.py @@ -2,7 +2,6 @@ import logging import time from concurrent.futures import Future, ThreadPoolExecutor -from typing import Callable from core.schains.process import ProcessReport @@ -25,7 +24,7 @@ def stuck_timeout(self) -> int: pass @abc.abstractmethod - def create_pipeline(self) -> Callable: + def run(self) -> None: pass @property @@ -63,11 +62,20 @@ def execute_tasks( stucked = [] while True: for index, task in enumerate(tasks): + logger.info( + 'Status of %s, running: %s needed: %s stucked: %s', + task.name, + task.future.running(), + task.needed, + len(stucked), + ) if not task.future.running() and task.needed and len(stucked) == 0: + if task.future.done(): + logger.info('Done') + logger.info('Result %s', task.future.result()) task.start_ts = int(time.time()) logger.info('Starting task %s at %d', task.name, task.start_ts) - pipeline = task.create_pipeline() - task.future = executor.submit(pipeline) + task.future = executor.submit(task.run) elif task.future.running(): if int(time.time()) - task.start_ts > task.stuck_timeout: logger.info('Canceling future for %s', task.name) diff --git a/tests/schains/monitor/main_test.py b/tests/schains/monitor/main_test.py index 242fbe43..c4f860d3 100644 --- a/tests/schains/monitor/main_test.py +++ b/tests/schains/monitor/main_test.py @@ -1,11 +1,9 @@ -import functools import logging import os import pathlib import shutil import time from concurrent.futures import Future -from typing import Callable from unittest import mock import pytest @@ -85,8 +83,7 @@ def get_monitor_mock(*args, **kwargs): return result with mock.patch('core.schains.monitor.main.RegularConfigMonitor', get_monitor_mock): - pipeline = config_task.create_pipeline() - pipeline() + config_task.run() def test_skaled_task(skale, schain_db, schain_on_contracts, node_config, dutils): @@ -114,8 +111,7 @@ def get_monitor_mock(*args, **kwargs): with mock.patch('core.schains.monitor.main.get_skaled_monitor', get_monitor_mock): with mock.patch('core.schains.monitor.main.notify_checks'): - pipeline = skaled_task.create_pipeline() - pipeline() + skaled_task.run() def test_execute_tasks(tmp_dir, _schain_name): @@ -167,8 +163,8 @@ def stuck_timeout(self) -> int: def needed(self) -> bool: return True - def create_pipeline(self) -> Callable: - return functools.partial(run_stuck_pipeline, index=self.index) + def run(self) -> None: + run_stuck_pipeline(index=self.index) class NotNeededTask(StuckedTask): def __init__(self, index: int) -> None: