Skip to content

Commit

Permalink
Merge pull request #1090 from skalenetwork/fix-archive-rotation
Browse files Browse the repository at this point in the history
Fix schain config on archive node after rotation
  • Loading branch information
DmytroNazarenko authored Jul 11, 2024
2 parents 1c28151 + 6310af1 commit 3f60fac
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 148 deletions.
154 changes: 67 additions & 87 deletions core/schains/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
get_base_port_from_config,
get_node_ips_from_config,
get_own_ip_from_config,
get_local_schain_http_endpoint_from_config
get_local_schain_http_endpoint_from_config,
)
from core.schains.config.main import (
get_skaled_config_rotations_ids,
get_upstream_config_rotation_ids
get_upstream_config_rotation_ids,
)
from core.schains.dkg.utils import get_secret_key_share_filepath
from core.schains.firewall.types import IRuleController
Expand All @@ -45,14 +45,14 @@
from core.schains.rpc import (
check_endpoint_alive,
check_endpoint_blocks,
get_endpoint_alive_check_timeout
get_endpoint_alive_check_timeout,
)
from core.schains.external_config import ExternalConfig, ExternalState
from core.schains.runner import (
get_container_name,
get_ima_container_time_frame,
get_image_name,
is_new_image_pulled
is_new_image_pulled,
)
from core.schains.skaled_exit_codes import SkaledExitCodes
from core.schains.volume import is_volume_exists
Expand All @@ -79,7 +79,7 @@
'rpc',
'blocks',
'process',
'ima_container'
'ima_container',
]

TG_ALLOWED_CHECKS = [
Expand All @@ -90,7 +90,7 @@
'rpc',
'blocks',
'process',
'ima_container'
'ima_container',
]


Expand All @@ -111,11 +111,13 @@ class IChecks(ABC):
def get_name(self) -> str:
pass

def get_all(self,
log: bool = True,
save: bool = False,
expose: bool = False,
needed: Optional[List[str]] = None) -> Dict:
def get_all(
self,
log: bool = True,
save: bool = False,
expose: bool = False,
needed: Optional[List[str]] = None,
) -> Dict:
if needed:
names = needed
else:
Expand All @@ -140,37 +142,38 @@ def is_healthy(self) -> bool:

@classmethod
def get_check_names(cls):
return list(filter(
lambda c: not c.startswith('_') and isinstance(
getattr(cls, c), property),
dir(cls)
))
return list(
filter(
lambda c: not c.startswith('_') and isinstance(getattr(cls, c), property), dir(cls)
)
)


class ConfigChecks(IChecks):
def __init__(self,
schain_name: str,
node_id: int,
schain_record: SChainRecord,
rotation_id: int,
stream_version: str,
current_nodes: list[ExtendedManagerNodeInfo],
estate: ExternalState,
sync_node: bool = False,
econfig: Optional[ExternalConfig] = None
) -> None:
def __init__(
self,
schain_name: str,
node_id: int,
schain_record: SChainRecord,
rotation_id: int,
stream_version: str,
current_nodes: list[ExtendedManagerNodeInfo],
estate: ExternalState,
last_dkg_successful: bool,
sync_node: bool = False,
econfig: Optional[ExternalConfig] = None,
) -> None:
self.name = schain_name
self.node_id = node_id
self.schain_record = schain_record
self.rotation_id = rotation_id
self.stream_version = stream_version
self.current_nodes = current_nodes
self.estate = estate
self._last_dkg_successful = last_dkg_successful
self.sync_node = sync_node
self.econfig = econfig or ExternalConfig(schain_name)
self.cfm: ConfigFileManager = ConfigFileManager(
schain_name=schain_name
)
self.cfm: ConfigFileManager = ConfigFileManager(schain_name=schain_name)
self.statsd_client = get_statsd_client()

def get_name(self) -> str:
Expand All @@ -182,13 +185,15 @@ def config_dir(self) -> CheckRes:
dir_path = self.cfm.dirname
return CheckRes(os.path.isdir(dir_path))

@property
def last_dkg_successful(self) -> CheckRes:
"""Checks that last dkg was successfuly completed"""
return CheckRes(self._last_dkg_successful)

@property
def dkg(self) -> CheckRes:
"""Checks that DKG procedure is completed"""
secret_key_share_filepath = get_secret_key_share_filepath(
self.name,
self.rotation_id
)
secret_key_share_filepath = get_secret_key_share_filepath(self.name, self.rotation_id)
return CheckRes(os.path.isfile(secret_key_share_filepath))

@property
Expand Down Expand Up @@ -227,17 +232,14 @@ def upstream_config(self) -> CheckRes:
exists,
node_ips_updated,
stream_updated,
triggered
triggered,
)
return CheckRes(exists and node_ips_updated and stream_updated and not triggered)

@property
def external_state(self) -> CheckRes:
actual_state = self.econfig.get()
logger.debug(
'Checking external config. Current %s. Saved %s',
self.estate, actual_state
)
logger.debug('Checking external config. Current %s. Saved %s', self.estate, actual_state)
return CheckRes(self.econfig.synced(self.estate))


Expand All @@ -250,7 +252,7 @@ def __init__(
*,
econfig: Optional[ExternalConfig] = None,
dutils: Optional[DockerUtils] = None,
sync_node: bool = False
sync_node: bool = False,
):
self.name = schain_name
self.schain_record = schain_record
Expand All @@ -259,9 +261,7 @@ def __init__(
self.econfig = econfig or ExternalConfig(name=schain_name)
self.sync_node = sync_node
self.rc = rule_controller
self.cfm: ConfigFileManager = ConfigFileManager(
schain_name=schain_name
)
self.cfm: ConfigFileManager = ConfigFileManager(schain_name=schain_name)
self.statsd_client = get_statsd_client()

def get_name(self) -> str:
Expand All @@ -278,9 +278,7 @@ def rotation_id_updated(self) -> CheckRes:
upstream_rotations = get_upstream_config_rotation_ids(self.cfm)
config_rotations = get_skaled_config_rotations_ids(self.cfm)
logger.debug(
'Comparing rotation_ids. Upstream: %s. Config: %s',
upstream_rotations,
config_rotations
'Comparing rotation_ids. Upstream: %s. Config: %s', upstream_rotations, config_rotations
)
return CheckRes(upstream_rotations == config_rotations)

Expand All @@ -292,19 +290,14 @@ def config_updated(self) -> CheckRes:

@property
def config(self) -> CheckRes:
""" Checks that sChain config file exists """
"""Checks that sChain config file exists"""
return CheckRes(self.cfm.skaled_config_exists())

@property
def volume(self) -> CheckRes:
"""Checks that sChain volume exists"""

return CheckRes(
is_volume_exists(
self.name,
sync_node=self.sync_node,
dutils=self.dutils)
)
return CheckRes(is_volume_exists(self.name, sync_node=self.sync_node, dutils=self.dutils))

@property
def firewall_rules(self) -> CheckRes:
Expand All @@ -316,10 +309,7 @@ def firewall_rules(self) -> CheckRes:
own_ip = get_own_ip_from_config(conf)
ranges = self.econfig.ranges
self.rc.configure(
base_port=base_port,
own_ip=own_ip,
node_ips=node_ips,
sync_ip_ranges=ranges
base_port=base_port, own_ip=own_ip, node_ips=node_ips, sync_ip_ranges=ranges
)
logger.debug(f'Rule controller {self.rc.expected_rules()}')
return CheckRes(self.rc.is_rules_synced())
Expand Down Expand Up @@ -364,19 +354,19 @@ def ima_container(self) -> CheckRes:
updated_time_frame = time_frame == container_time_frame
logger.debug(
'IMA image %s, container image %s, time frame %d, container_time_frame %d',
expected_image, image, time_frame, container_time_frame
expected_image,
image,
time_frame,
container_time_frame,
)

data = {
'container_running': container_running,
'updated_image': updated_image,
'new_image_pulled': new_image_pulled,
'updated_time_frame': updated_time_frame
'updated_time_frame': updated_time_frame,
}
logger.debug(
'%s, IMA check - %s',
self.name, data
)
logger.debug('%s, IMA check - %s', self.name, data)
result: bool = all(data.values())
return CheckRes(result, data=data)

Expand All @@ -387,9 +377,7 @@ def rpc(self) -> CheckRes:
if self.config:
config = self.cfm.skaled_config
http_endpoint = get_local_schain_http_endpoint_from_config(config)
timeout = get_endpoint_alive_check_timeout(
self.schain_record.failed_rpc_count
)
timeout = get_endpoint_alive_check_timeout(self.schain_record.failed_rpc_count)
res = check_endpoint_alive(http_endpoint, timeout=timeout)
return CheckRes(res)

Expand Down Expand Up @@ -426,11 +414,12 @@ def __init__(
stream_version: str,
estate: ExternalState,
current_nodes: list[ExtendedManagerNodeInfo],
last_dkg_successful: bool,
rotation_id: int = 0,
*,
econfig: Optional[ExternalConfig] = None,
dutils: DockerUtils = None,
sync_node: bool = False
sync_node: bool = False,
):
self._subjects = [
ConfigChecks(
Expand All @@ -440,18 +429,19 @@ def __init__(
rotation_id=rotation_id,
stream_version=stream_version,
current_nodes=current_nodes,
last_dkg_successful=last_dkg_successful,
estate=estate,
econfig=econfig,
sync_node=sync_node
sync_node=sync_node,
),
SkaledChecks(
schain_name=schain_name,
schain_record=schain_record,
rule_controller=rule_controller,
econfig=econfig,
dutils=dutils,
sync_node=sync_node
)
sync_node=sync_node,
),
]

def __getattr__(self, attr: str) -> Any:
Expand All @@ -469,11 +459,7 @@ def get_all(self, log: bool = True, save: bool = False, needed: Optional[List[st
plain_checks = {}
for subj in self._subjects:
logger.debug('Running checks for %s', subj)
subj_checks = subj.get_all(
log=False,
save=False,
needed=needed
)
subj_checks = subj.get_all(log=False, save=False, needed=needed)
plain_checks.update(subj_checks)
if not self.estate or not self.estate.ima_linked:
if 'ima_container' in plain_checks:
Expand All @@ -492,13 +478,9 @@ def get_api_checks_status(status: Dict, allowed: List = API_ALLOWED_CHECKS) -> D

def save_checks_dict(schain_name, checks_dict):
schain_check_path = get_schain_check_filepath(schain_name)
logger.info(
f'Saving checks for the chain {schain_name}: {schain_check_path}')
logger.info(f'Saving checks for the chain {schain_name}: {schain_check_path}')
try:
write_json(schain_check_path, {
'time': time.time(),
'checks': checks_dict
})
write_json(schain_check_path, {'time': time.time(), 'checks': checks_dict})
except Exception:
logger.exception(f'Failed to save checks: {schain_check_path}')

Expand All @@ -510,14 +492,12 @@ def log_checks_dict(schain_name, checks_dict):
if not checks_dict[check]:
failed_checks.append(check)
if len(failed_checks) != 0:
failed_checks_str = ", ".join(failed_checks)
failed_checks_str = ', '.join(failed_checks)
logger.info(
arguments_list_string(
{
'sChain name': schain_name,
'Failed checks': failed_checks_str
},
'Failed sChain checks', 'error'
{'sChain name': schain_name, 'Failed checks': failed_checks_str},
'Failed sChain checks',
'error',
)
)

Expand Down
2 changes: 2 additions & 0 deletions core/schains/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def cleanup_schain(
schain_name,
sync_agent_ranges,
rotation_id,
last_dkg_successful,
current_nodes,
estate,
dutils=None
Expand All @@ -245,6 +246,7 @@ def cleanup_schain(
current_nodes=current_nodes,
rotation_id=rotation_id,
estate=estate,
last_dkg_successful=last_dkg_successful,
dutils=dutils,
sync_node=SYNC_NODE
)
Expand Down
10 changes: 3 additions & 7 deletions core/schains/monitor/config_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@


class BaseConfigMonitor(IMonitor):
def __init__(
self,
action_manager: ConfigActionManager,
checks: ConfigChecks
) -> None:
def __init__(self, action_manager: ConfigActionManager, checks: ConfigChecks) -> None:
self.am = action_manager
self.checks = checks

Expand Down Expand Up @@ -73,7 +69,7 @@ def execute(self) -> None:
self.am.config_dir()
if not self.checks.external_state:
self.am.external_state()
if not self.checks.upstream_config:
if self.checks.last_dkg_successful and not self.checks.upstream_config:
self.am.upstream_config()
self.am.update_reload_ts(self.checks.skaled_node_ips, sync_node=True)
self.am.update_reload_ts(self.checks.skaled_node_ips, sync_node=True)
self.am.reset_config_record()
Loading

0 comments on commit 3f60fac

Please sign in to comment.