From 2cd8ead7f76c8baa4a922bed38d03e8e7400e692 Mon Sep 17 00:00:00 2001 From: Pim Witlox Date: Wed, 27 Nov 2024 15:35:57 +0100 Subject: [PATCH] listeners on higher abstraction --- horao/conceptual/crdt.py | 2 +- horao/controllers/synchronization.py | 91 ++++++++++++++++++++++++++++ horao/logical/data_center.py | 58 +++++++++++++----- horao/logical/infrastructure.py | 7 +++ horao/persistance/store.py | 27 +++++++++ horao/physical/network.py | 9 +++ 6 files changed, 179 insertions(+), 15 deletions(-) create mode 100644 horao/controllers/synchronization.py diff --git a/horao/conceptual/crdt.py b/horao/conceptual/crdt.py index 8e47c50..1617309 100644 --- a/horao/conceptual/crdt.py +++ b/horao/conceptual/crdt.py @@ -760,7 +760,7 @@ def __init__( self.listeners = listeners if listeners else [] self._change_count = 0 self.items = ( - LastWriterWinsMap(listeners=[self.increase_change_count]) + LastWriterWinsMap(listeners=[self.increase_change_count] + self.listeners) if not items else items ) diff --git a/horao/controllers/synchronization.py b/horao/controllers/synchronization.py new file mode 100644 index 0000000..b0cb5a1 --- /dev/null +++ b/horao/controllers/synchronization.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*-# +"""Controller for synchronization calls to peers. +There are 2 mechanisms for synchronization: +- Time based: a preset interval to synchronize with all peers +- Event based: a preset number of changes to synchronize with all peers +""" +from __future__ import annotations + +import json +import logging +import os +import platform +from datetime import datetime, timedelta +from typing import Dict, List, Optional + +import httpx +import jwt + +from horao.logical.infrastructure import LogicalInfrastructure +from horao.persistance import HoraoEncoder, init_session + + +class SynchronizePeers: + """ + Synchronize with all peers. + """ + + def __init__( + self, + infrastructure: LogicalInfrastructure, + peers: Optional[List[str]] = None, + max_changes: Optional[int] = None, + sync_delta: Optional[int] = None, + ) -> None: + """ + Create instance + :param infrastructure: LogicalInfrastructure + :param peers: list of peers + :param max_changes: number of changes to synchronize + :param sync_delta: interval in seconds between synchronizations + """ + self.logger = logging.getLogger(__name__) + self.infrastructure = infrastructure + self.peers = ( + peers + if not os.getenv("PEERS", None) + else peers + os.getenv("PEERS", "").split(",") # type: ignore + ) + self.max_changes = ( + max_changes if max_changes else int(os.getenv("MAX_CHANGES", 100)) + ) + self.sync_delta = ( + sync_delta if sync_delta else int(os.getenv("SYNC_DELTA", 300)) + ) + self.session = init_session() + # note that the event handler should support async + for dc, dcs in self.infrastructure.items(): + dc.add_listeners(self.synchronize) + + async def synchronize(self) -> None: + """ + Synchronize with all peers. + :return: Dict of Peer:Exception for each peer that failed to synchronize + """ + timedelta_exceeded = False + last_sync = await self.session.load("last_sync") + if last_sync and datetime.now() - last_sync < timedelta( + seconds=self.sync_delta + ): + timedelta_exceeded = True + max_changes_exceeded = False + if self.infrastructure.change_count() > self.max_changes: + max_changes_exceeded = True + if not timedelta_exceeded and not max_changes_exceeded: + return None + for peer in self.peers: # type: ignore + token = jwt.encode( + dict(peer=os.getenv("HOST_ID", platform.node())), + os.environ["PEER_SECRET"], + algorithm="HS256", + ) + try: + lg = httpx.post( + "/synchronize", + headers={"Peer": "true", "Authorization": f"Bearer {token}"}, + json=json.dumps(await self.session.items(), cls=HoraoEncoder), + ) + lg.raise_for_status() + except httpx.HTTPError as e: + self.logger.error(f"Error synchronizing with {peer}: {e}") + await self.session.save("last_sync", datetime.now()) diff --git a/horao/logical/data_center.py b/horao/logical/data_center.py index 0159506..a6b3d74 100644 --- a/horao/logical/data_center.py +++ b/horao/logical/data_center.py @@ -2,8 +2,9 @@ """Data Center composites""" from __future__ import annotations +import asyncio import logging -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Tuple import networkx as nx # type: ignore from networkx.classes import Graph # type: ignore @@ -39,6 +40,7 @@ def __init__( number: int, rows: Optional[LastWriterWinsMap] = None, items: Optional[Dict[int, List[Cabinet]]] = None, + listeners: Optional[List[Callable]] = None, ) -> None: """ Initialize a data center @@ -46,42 +48,70 @@ def __init__( :param number: unique number referring to potential AZ :param rows: optional LastWriterWinsMap of rows :param items: optional dictionary of rows (number, list of cabinets) + :param listeners: optional list of listeners """ self.log = logging.getLogger(__name__) self.name = name self.number = number + self.listeners = listeners if listeners else [] self.rows = LastWriterWinsMap() if not rows else rows if items: for k, v in items.items(): self.rows.set(k, v, hash(k)) # type: ignore def attach_change_listeners(c: Computer): - c.cpus.add_listeners(self.change_count) - c.rams.add_listeners(self.change_count) - c.disks.add_listeners(self.change_count) - c.nics.add_listeners(self.change_count) - c.accelerators.add_listeners(self.change_count) + c.cpus.add_listeners(self.invoke_listeners) + c.rams.add_listeners(self.invoke_listeners) + c.disks.add_listeners(self.invoke_listeners) + c.nics.add_listeners(self.invoke_listeners) + c.accelerators.add_listeners(self.invoke_listeners) for _, v in self.rows.read().items(): for cabinet in v: - cabinet.servers.add_listeners(self.change_count) + cabinet.servers.add_listeners(self.invoke_listeners) for server in cabinet.servers: attach_change_listeners(server) - cabinet.chassis.add_listeners(self.change_count) + cabinet.chassis.add_listeners(self.invoke_listeners) for chassis in cabinet.chassis: - chassis.servers.add_listeners(self.change_count) + chassis.servers.add_listeners(self.invoke_listeners) for server in chassis.servers: attach_change_listeners(server) - chassis.blades.add_listeners(self.change_count) + chassis.blades.add_listeners(self.invoke_listeners) for blade in chassis.blades: - blade.nodes.add_listeners(self.change_count) + blade.nodes.add_listeners(self.invoke_listeners) for node in blade.nodes: - node.modules.add_listeners(self.change_count) + node.modules.add_listeners(self.invoke_listeners) for module in node.modules: attach_change_listeners(module) - cabinet.switches.add_listeners(self.change_count) + cabinet.switches.add_listeners(self.invoke_listeners) for switch in cabinet.switches: - switch.ports.add_listeners(self.change_count) + switch.ports.add_listeners(self.invoke_listeners) + + def add_listeners(self, listener: Callable) -> None: + """ + Adds an async listener that is called on each update. + :param listener: Callable + :return: None + """ + if not listener in self.listeners: + self.listeners.append(listener) + + def remove_listeners(self, listener: Callable) -> None: + """ + Removes a listener if it was previously added. + :param listener: Callable[[Update], None] + :return: None + """ + if listener in self.listeners: + self.listeners.remove(listener) + + def invoke_listeners(self) -> None: + """ + Invokes all async event listeners. + :return: None + """ + for listener in self.listeners: + asyncio.create_task(listener()) def change_count(self) -> int: """ diff --git a/horao/logical/infrastructure.py b/horao/logical/infrastructure.py index c90decb..6595d18 100644 --- a/horao/logical/infrastructure.py +++ b/horao/logical/infrastructure.py @@ -38,6 +38,13 @@ def __init__( self.constraints = constraints or {} self.claims = claims or {} + def change_count(self) -> int: + """ + Count the number of changes in the infrastructure. + :return: number of changes + """ + return sum([d.change_count() for d in self.infrastructure.keys()]) + def clear(self) -> None: self.infrastructure.clear() diff --git a/horao/persistance/store.py b/horao/persistance/store.py index 83afe87..c3643a3 100644 --- a/horao/persistance/store.py +++ b/horao/persistance/store.py @@ -23,6 +23,33 @@ def __init__(self, url: Optional[str] = None) -> None: self.redis = redis.Redis.from_url(url) self.memory: Dict[str, Any] = {} + async def keys(self) -> Dict[str, Any] | Any: + """ + Return all keys in the store + :return: keys + """ + if hasattr(self, "redis"): + return await self.redis.keys() + return self.memory.keys() + + async def values(self) -> Dict[str, Any] | Any: + """ + Return all values in the store + :return: values + """ + if hasattr(self, "redis"): + return await self.redis.values() + return self.memory.values() + + async def items(self) -> Dict[str, Any] | Any: + """ + Return all items in the store + :return: items + """ + if hasattr(self, "redis"): + return await self.redis.items() + return self.memory.items() + @instrument_class_function(name="load", level=logging.DEBUG) async def load(self, key: str) -> Any | None: """ diff --git a/horao/physical/network.py b/horao/physical/network.py index 1fada75..1bede34 100644 --- a/horao/physical/network.py +++ b/horao/physical/network.py @@ -178,6 +178,9 @@ def __init__( else HardwareList[Port](wan_ports) ) + def change_count(self) -> int: + return super().change_count() + self.wan_ports.change_count() + def __eq__(self, other): if not isinstance(other, Firewall): return False @@ -214,6 +217,9 @@ def __init__( else HardwareList[Port](wan_ports) ) + def change_count(self) -> int: + return super().change_count() + self.wan_ports.change_count() + def __eq__(self, other): if not isinstance(other, Router): return @@ -255,6 +261,9 @@ def __init__( else HardwareList[Port](uplink_ports) ) + def change_count(self) -> int: + return super().change_count() + self.uplink_ports.change_count() + def __eq__(self, other): if not isinstance(other, Switch): return False