diff --git a/horao/api/synchronization.py b/horao/api/synchronization.py index f85102b..09f6d76 100644 --- a/horao/api/synchronization.py +++ b/horao/api/synchronization.py @@ -47,23 +47,23 @@ async def synchronize(request: Request) -> JSONResponse: return JSONResponse(status_code=400, content={"error": "Error parsing request"}) try: session = init_session() - for k, v in logical_infrastructure.logical_infrastructure.items(): - local_dc = await session.load(k.name) + for k, v in logical_infrastructure.infrastructure.items(): + local_dc = await session.async_load(k.name) if not local_dc: - await session.save(k.name, k) + await session.async_save(k.name, k) else: local_dc.merge(k) - local_dc_content = await session.load(f"{k.name}.content") + local_dc_content = await session.async_load(f"{k.name}.content") if not local_dc_content: - await session.save(f"{k.name}.content", v) + await session.async_save(f"{k.name}.content", v) else: local_dc_content.merge(v) if logical_infrastructure.claims: for k, v in logical_infrastructure.claims.items(): - await session.save(k.name, k) + await session.async_save(k.name, k) if logical_infrastructure.constraints: for k, v in logical_infrastructure.contraints.items(): - await session.save(k.name, k) + await session.async_save(k.name, k) except Exception as e: logging.error(f"Error synchronizing: {e}") if os.getenv("DEBUG", "False") == "True": diff --git a/horao/conceptual/crdt.py b/horao/conceptual/crdt.py index 1c07d6b..7ca6430 100644 --- a/horao/conceptual/crdt.py +++ b/horao/conceptual/crdt.py @@ -764,11 +764,9 @@ def __init__( super().__init__() self.log = logging.getLogger(__name__) self.listeners = listeners if listeners else [] - self._change_count = 0 + self.changes: List[Update] = [] self.items = ( - LastWriterWinsMap(listeners=[self.increase_change_count]) - if not items - else items + LastWriterWinsMap(listeners=[self.invoke_listeners]) if not items else items ) if content: self.extend(content) @@ -797,31 +795,17 @@ def invoke_listeners(self, update: Optional[Update] = None) -> None: Invokes all event listeners. :return: None """ + if update and not update in self.changes: + self.changes.append(update) for listener in self.listeners: - listener(update) + listener(self.changes) - def increase_change_count(self, update: Optional[Update] = None) -> None: + def clear_history(self) -> None: """ - Increase the change count. - :param update: change to process + Clear the history of changes. :return: None """ - self._change_count += 1 - self.invoke_listeners(update) - - def change_count(self) -> int: - """ - Return the number of changes. - :return: int - """ - return self._change_count - - def reset_change_count(self) -> None: - """ - Reset the change count. - :return: None - """ - self._change_count = 0 + self.changes.clear() @instrument_class_function(name="append", level=logging.DEBUG) def append(self, item: T) -> T: diff --git a/horao/controllers/synchronization.py b/horao/controllers/synchronization.py index e8f47e5..ed196ed 100644 --- a/horao/controllers/synchronization.py +++ b/horao/controllers/synchronization.py @@ -6,7 +6,6 @@ """ from __future__ import annotations -import asyncio import json import logging import os @@ -54,7 +53,7 @@ def __init__( for dc in self.logical_infrastructure.infrastructure.keys(): dc.add_listeners(self.synchronize) - def synchronize(self) -> None: + def synchronize(self, changes: Optional[List] = None) -> None: """ Synchronize with all peers, if one of the following conditions is met: - there was no previous synchronization time stamp. @@ -62,18 +61,19 @@ def synchronize(self) -> None: - the amount of changes on the stack exceed the threshold that was set will only call synchronize if there are any changes if the timer expires note: currently only the infrastructure is tracked, changes to claims and constraints are not tracked + :param changes: optional list of changes :return: None """ if not self.peers: return None timedelta_exceeded = False - last_sync = asyncio.run(self.session.load("last_sync")) + last_sync = self.session.load("last_sync") if not last_sync or datetime.now() - last_sync < timedelta( seconds=self.sync_delta ): timedelta_exceeded = True max_changes_exceeded = False - if self.logical_infrastructure.change_count() > self.max_changes: + if changes and len(changes) > self.max_changes: max_changes_exceeded = True if not timedelta_exceeded and not max_changes_exceeded: return None @@ -93,4 +93,4 @@ def synchronize(self) -> None: lg.raise_for_status() except httpx.HTTPError as e: self.logger.error(f"Error synchronizing with {peer}: {e}") - asyncio.run(self.session.save("last_sync", datetime.now())) + self.session.save("last_sync", datetime.now()) diff --git a/horao/logical/data_center.py b/horao/logical/data_center.py index 2fdca9d..a7b6956 100644 --- a/horao/logical/data_center.py +++ b/horao/logical/data_center.py @@ -11,8 +11,7 @@ from horao.conceptual.crdt import LastWriterWinsMap from horao.conceptual.decorators import instrument_class_function from horao.conceptual.support import Update -from horao.physical.component import Disk -from horao.physical.composite import Blade, Cabinet, Chassis, Server +from horao.physical.composite import Cabinet from horao.physical.computer import Computer from horao.physical.network import ( NIC, @@ -60,6 +59,7 @@ def __init__( if items: for k, v in items.items(): self.rows.set(k, v, hash(k)) # type: ignore + self.changes: List[Update] = [] def add_listeners(self, listener: Callable) -> None: """ @@ -73,44 +73,25 @@ def add_listeners(self, listener: Callable) -> None: def remove_listeners(self, listener: Callable) -> None: """ Removes a listener if it was previously added. - :param listener: Callable[[Update], None] + :param listener: Callable :return: None """ if listener in self.listeners: self.listeners.remove(listener) - def invoke_listeners(self, update: Optional[Update] = None) -> None: + def invoke_listeners(self, changes: Optional[List] = None) -> None: """ Invokes all event listeners. + :param changes: list of changes :return: None """ + if changes: + if isinstance(changes, List): + self.changes.extend(changes) + else: + self.changes.append(changes) for listener in self.listeners: - listener(update) - - def change_count(self) -> int: - """ - Sum the change count of all components in the data center - :return: int - """ - total = 0 - for _, v in self.rows.read().items(): - for cabinet in v: - for server in cabinet._servers: - total += server.change_count() - total += cabinet._servers.change_count() - for chassis in cabinet._chassis: - total += chassis.change_count() - for blade in chassis._blades: - total += blade.change_count() - for node in blade._nodes: - total += node.change_count() - for module in node._modules: - total += module.change_count() - total += node._modules.change_count() - total += chassis._blades.change_count() - total += cabinet._chassis.change_count() - total += cabinet._switches.change_count() - return total + listener(changes) @instrument_class_function(name="copy", level=logging.DEBUG) def copy(self) -> Dict[int, List[Cabinet]]: @@ -127,7 +108,9 @@ def has_key(self, k: int) -> bool: return False def update(self, key: int, value: List[Cabinet]) -> None: - self.rows.set(key, value, hash(key)) # type: ignore + if key in self.keys(): + self.__delitem__(key) + self.__setitem__(key, value, hash(key)) # type: ignore def keys(self) -> List[int]: return [k for k, _ in self.rows.read()] @@ -163,23 +146,22 @@ def merge(self, other: DataCenter) -> None: else: self[number] = row - # reset change counters + # clear the change history for _, v in self.rows.read().items(): for cabinet in v: for server in cabinet.servers: - server._disks.reset_change_count() - cabinet.servers.reset_change_count() + server.disks.clear_history() + cabinet.servers.clear_history() for chassis in cabinet.chassis: for blade in chassis.blades: for node in blade.nodes: for module in node.modules: - module._disks.reset_change_count() - node.modules.reset_change_count() - blade.nodes.reset_change_count() - chassis.blades.reset_change_count() - cabinet.chassis.reset_change_count() - cabinet.switches.reset_change_count() - v.reset_change_count() + module.disks.clear_history() + node.modules.clear_history() + blade.nodes.clear_history() + chassis.blades.clear_history() + cabinet.chassis.clear_history() + cabinet.switches.clear_history() def __eq__(self, other) -> bool: if not isinstance(other, DataCenter): @@ -187,6 +169,22 @@ def __eq__(self, other) -> bool: return self.name == other.name def __setitem__(self, key: int, item: List[Cabinet]) -> None: + # glue all handlers to event invocation + for cabinet in item: + for server in cabinet.servers: + server.disks.add_listeners(self.invoke_listeners) + cabinet.servers.add_listeners(self.invoke_listeners) + for chassis in cabinet.chassis: + for blade in chassis.blades: + for node in blade.nodes: + for module in node.modules: + module.disks.add_listeners(self.invoke_listeners) + node.modules.add_listeners(self.invoke_listeners) + blade.nodes.add_listeners(self.invoke_listeners) + chassis.blades.add_listeners(self.invoke_listeners) + cabinet.chassis.add_listeners(self.invoke_listeners) + cabinet.switches.add_listeners(self.invoke_listeners) + # insert the row self.rows.set(key, item, hash(key)) # type: ignore @instrument_class_function(name="getitem", level=logging.DEBUG) @@ -200,6 +198,22 @@ def __getitem__(self, key) -> List[Cabinet]: def __delitem__(self, key) -> None: for k, v in self.rows.read().items(): if k == key: + # remove all listeners + for cabinet in v: + for server in cabinet.servers: + server.disks.remove_listeners(self.invoke_listeners) + cabinet.servers.remove_listeners(self.invoke_listeners) + for chassis in cabinet.chassis: + for blade in chassis.blades: + for node in blade.nodes: + for module in node.modules: + module.disks.remove_listeners(self.invoke_listeners) + node.modules.remove_listeners(self.invoke_listeners) + blade.nodes.remove_listeners(self.invoke_listeners) + chassis.blades.remove_listeners(self.invoke_listeners) + cabinet.chassis.remove_listeners(self.invoke_listeners) + cabinet.switches.remove_listeners(self.invoke_listeners) + # remove the row self.rows.unset(key, hash(key)) return raise KeyError(f"Key {key} not found") @@ -223,76 +237,6 @@ def __iter__(self) -> Iterable[Tuple[int, List[Cabinet]]]: def __hash__(self) -> int: return hash((self.name, self.number)) - @staticmethod - @instrument_class_function(name="move_server", level=logging.DEBUG) - def move_server(server: Server, from_cabinet: Cabinet, to_cabinet: Cabinet) -> None: - """ - Move a server from one cabinet to another - :param server: server to move - :param from_cabinet: from - :param to_cabinet: to - :return: None - :raises: ValueError if you try to remove a server that doesn't exist - """ - if not server in from_cabinet.servers: - raise ValueError("Cannot move servers that are not installed.") - from_cabinet.servers.remove(server) - to_cabinet.servers.append(server) - - @staticmethod - @instrument_class_function(name="move_chassis", level=logging.DEBUG) - def move_chassis_server( - server: Server, from_chassis: Chassis, to_chassis: Chassis - ) -> None: - """ - Move a server from one cabinet to another - :param server: server to move - :param from_chassis: from - :param to_chassis: to - :return: None - :raises: ValueError if you try to remove a server that doesn't exist - """ - if not server in from_chassis.servers: - raise ValueError("Cannot move servers that are not installed.") - from_chassis.servers.remove(server) - to_chassis.servers.append(server) - - @staticmethod - @instrument_class_function(name="move_blade", level=logging.DEBUG) - def move_blade(blade: Blade, from_chassis: Chassis, to_chassis: Chassis) -> None: - """ - Move a server from one chassis to another - :param blade: blade to move - :param from_chassis: from - :param to_chassis: to - :return: None - :raises: ValueError if you try to remove a blade that is not installed - """ - if not blade in from_chassis.blades: - raise ValueError("Cannot move blades that are not installed.") - from_chassis.blades.remove(blade) - to_chassis.blades.append(blade) - - @staticmethod - @instrument_class_function(name="swap_disk", level=logging.DEBUG) - def swap_disk( - server: Server, old_disk: Optional[Disk], new_disk: Optional[Disk] - ) -> None: - """ - Swap a (broken) disk in a server - :param server: server to swap disk from/in - :param old_disk: old disk to remove (if any) - :param new_disk: new disk to add (if any) - :return: None - :raises: ValueError if you try to remove a disk that doesn't exist - """ - if new_disk is not None: - server.disks.append(new_disk) - if old_disk: - if not server.disks: - raise ValueError("Cannot remove disks that are not installed.") - server.disks.remove(old_disk) - def fetch_server_nic( self, row: int, cabinet: int, server: int, nic: int, chassis: Optional[int] ) -> NIC: diff --git a/horao/logical/infrastructure.py b/horao/logical/infrastructure.py index 6595d18..5d0f5eb 100644 --- a/horao/logical/infrastructure.py +++ b/horao/logical/infrastructure.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple, Any from horao.conceptual.claim import Claim, Reservation from horao.conceptual.decorators import instrument_class_function @@ -38,12 +38,12 @@ def __init__( self.constraints = constraints or {} self.claims = claims or {} - def change_count(self) -> int: + def changes(self) -> List[Any]: """ Count the number of changes in the infrastructure. :return: number of changes """ - return sum([d.change_count() for d in self.infrastructure.keys()]) + return [d.changes for d in self.infrastructure.keys()] def clear(self) -> None: self.infrastructure.clear() diff --git a/horao/persistance/serialize.py b/horao/persistance/serialize.py index 34883c0..2b1afe9 100644 --- a/horao/persistance/serialize.py +++ b/horao/persistance/serialize.py @@ -9,7 +9,6 @@ from horao.auth.roles import TenantController from horao.conceptual.claim import Reservation from horao.conceptual.crdt import ( - CRDTList, LastWriterWinsMap, LastWriterWinsRegister, ObservedRemovedSet, @@ -22,8 +21,7 @@ from horao.logical.resource import Compute, Storage from horao.physical.component import CPU, RAM, Accelerator, Disk from horao.physical.composite import Blade, Cabinet, Chassis, Node -from horao.physical.computer import ComputerList, Module, Server -from horao.physical.hardware import HardwareList +from horao.physical.computer import Module, Server from horao.physical.network import ( NIC, Firewall, @@ -117,7 +115,7 @@ def default(self, obj): "serial_number": obj.serial_number, "model": obj.model, "number": obj.number, - "ports": json.dumps(obj._ports, cls=HoraoEncoder), + "ports": json.dumps(obj.ports, cls=HoraoEncoder), } if isinstance(obj, Switch): return { @@ -130,10 +128,10 @@ def default(self, obj): "switch_type": obj.switch_type.value, "status": obj.status.value, "managed": obj.managed, - "lan_ports": json.dumps(obj._ports, cls=HoraoEncoder), + "lan_ports": json.dumps(obj.ports, cls=HoraoEncoder), "uplink_ports": ( - json.dumps(obj._uplink_ports, cls=HoraoEncoder) - if obj._uplink_ports + json.dumps(obj.uplink_ports, cls=HoraoEncoder) + if obj.uplink_ports else None ), } @@ -146,10 +144,10 @@ def default(self, obj): "number": obj.number, "router_type": obj.router_type.value, "status": obj.status.value, - "lan_ports": json.dumps(obj._ports, cls=HoraoEncoder), + "lan_ports": json.dumps(obj.ports, cls=HoraoEncoder), "wan_ports": ( - json.dumps(obj._wan_ports, cls=HoraoEncoder) - if obj._wan_ports + json.dumps(obj.wan_ports, cls=HoraoEncoder) + if obj.wan_ports else None ), } @@ -161,10 +159,10 @@ def default(self, obj): "model": obj.model, "number": obj.number, "status": obj.status.value, - "lan_ports": json.dumps(obj._ports, cls=HoraoEncoder), + "lan_ports": json.dumps(obj.ports, cls=HoraoEncoder), "wan_ports": ( - json.dumps(obj._wan_ports, cls=HoraoEncoder) - if obj._wan_ports + json.dumps(obj.wan_ports, cls=HoraoEncoder) + if obj.wan_ports else None ), } @@ -212,15 +210,15 @@ def default(self, obj): "name": obj.name, "model": obj.model, "number": obj.number, - "cpus": json.dumps(obj._cpus, cls=HoraoEncoder), - "rams": json.dumps(obj._rams, cls=HoraoEncoder), - "nics": json.dumps(obj._nics, cls=HoraoEncoder), + "cpus": json.dumps(obj.cpus, cls=HoraoEncoder), + "rams": json.dumps(obj.rams, cls=HoraoEncoder), + "nics": json.dumps(obj.nics, cls=HoraoEncoder), "disks": ( - json.dumps(obj._disks, cls=HoraoEncoder) if obj._disks else None + json.dumps(obj.disks, cls=HoraoEncoder) if obj.disks else None ), "accelerators": ( - json.dumps(obj._accelerators, cls=HoraoEncoder) - if obj._accelerators + json.dumps(obj.accelerators, cls=HoraoEncoder) + if obj.accelerators else None ), "status": obj.status.value, @@ -232,15 +230,15 @@ def default(self, obj): "name": obj.name, "model": obj.model, "number": obj.number, - "cpus": json.dumps(obj._cpus, cls=HoraoEncoder), - "rams": json.dumps(obj._rams, cls=HoraoEncoder), - "nics": json.dumps(obj._nics, cls=HoraoEncoder), + "cpus": json.dumps(obj.cpus, cls=HoraoEncoder), + "rams": json.dumps(obj.rams, cls=HoraoEncoder), + "nics": json.dumps(obj.nics, cls=HoraoEncoder), "disks": ( - json.dumps(obj._disks, cls=HoraoEncoder) if obj._disks else None + json.dumps(obj.disks, cls=HoraoEncoder) if obj.disks else None ), "accelerators": ( - json.dumps(obj._accelerators, cls=HoraoEncoder) - if obj._accelerators + json.dumps(obj.accelerators, cls=HoraoEncoder) + if obj.accelerators else None ), "status": obj.status.value, @@ -253,7 +251,7 @@ def default(self, obj): "model": obj.model, "number": obj.number, "modules": ( - json.dumps(obj._modules, cls=HoraoEncoder) if obj._modules else None + json.dumps(obj.modules, cls=HoraoEncoder) if obj.modules else None ), } if isinstance(obj, Blade): @@ -264,7 +262,7 @@ def default(self, obj): "model": obj.model, "number": obj.number, "nodes": ( - json.dumps(obj._nodes, cls=HoraoEncoder) if obj._nodes else None + json.dumps(obj.nodes, cls=HoraoEncoder) if obj.nodes else None ), } if isinstance(obj, Chassis): @@ -275,10 +273,10 @@ def default(self, obj): "model": obj.model, "number": obj.number, "servers": ( - json.dumps(obj._servers, cls=HoraoEncoder) if obj._servers else None + json.dumps(obj.servers, cls=HoraoEncoder) if obj.servers else None ), "blades": ( - json.dumps(obj._blades, cls=HoraoEncoder) if obj._blades else None + json.dumps(obj.blades, cls=HoraoEncoder) if obj.blades else None ), } if isinstance(obj, Cabinet): @@ -289,15 +287,13 @@ def default(self, obj): "model": obj.model, "number": obj.number, "servers": ( - json.dumps(obj._servers, cls=HoraoEncoder) if obj._servers else None + json.dumps(obj.servers, cls=HoraoEncoder) if obj.servers else None ), "chassis": ( - json.dumps(obj._chassis, cls=HoraoEncoder) if obj._chassis else None + json.dumps(obj.chassis, cls=HoraoEncoder) if obj.chassis else None ), "switches": ( - json.dumps(obj._switches, cls=HoraoEncoder) - if obj._switches - else None + json.dumps(obj.switches, cls=HoraoEncoder) if obj.switches else None ), } if isinstance(obj, DataCenter): diff --git a/horao/persistance/store.py b/horao/persistance/store.py index c3643a3..1fca482 100644 --- a/horao/persistance/store.py +++ b/horao/persistance/store.py @@ -4,7 +4,8 @@ import logging from typing import Any, Dict, Optional -from redis import asyncio as redis +from redis.asyncio import Redis as RedisAIO +from redis import Redis as Redis from horao.conceptual.decorators import instrument_class_function from horao.persistance.serialize import HoraoDecoder, HoraoEncoder @@ -20,7 +21,8 @@ def __init__(self, url: Optional[str] = None) -> None: :return: None """ if url: - self.redis = redis.Redis.from_url(url) + self.redis_aio = RedisAIO.from_url(url) + self.redis = Redis.from_url(url) self.memory: Dict[str, Any] = {} async def keys(self) -> Dict[str, Any] | Any: @@ -29,7 +31,7 @@ async def keys(self) -> Dict[str, Any] | Any: :return: keys """ if hasattr(self, "redis"): - return await self.redis.keys() + return await self.redis_aio.keys() return self.memory.keys() async def values(self) -> Dict[str, Any] | Any: @@ -38,7 +40,7 @@ async def values(self) -> Dict[str, Any] | Any: :return: values """ if hasattr(self, "redis"): - return await self.redis.values() + return await self.redis_aio.values() return self.memory.values() async def items(self) -> Dict[str, Any] | Any: @@ -47,25 +49,51 @@ async def items(self) -> Dict[str, Any] | Any: :return: items """ if hasattr(self, "redis"): - return await self.redis.items() + return await self.redis_aio.items() return self.memory.items() - @instrument_class_function(name="load", level=logging.DEBUG) - async def load(self, key: str) -> Any | None: + @instrument_class_function(name="async_load", level=logging.DEBUG) + async def async_load(self, key: str) -> Any | None: """ Load the object from memory or redis :param key: key to structure :return: structure or None """ if hasattr(self, "redis"): - structure = await self.redis.get(key) + structure = await self.redis_aio.get(key) return json.loads(structure, cls=HoraoDecoder) if structure else None if key not in self.memory: return None return json.loads(self.memory[key], cls=HoraoDecoder) + @instrument_class_function(name="load", level=logging.DEBUG) + def load(self, key: str) -> Any | None: + """ + Load the object from memory or redis + :param key: key to structure + :return: structure or None + """ + if hasattr(self, "redis"): + structure = self.redis.get(key) + return json.loads(structure, cls=HoraoDecoder) if structure else None # type: ignore + if key not in self.memory: + return None + return json.loads(self.memory[key], cls=HoraoDecoder) + + @instrument_class_function(name="async_save", level=logging.DEBUG) + async def async_save(self, key: str, value: Any) -> None: + """ + Save the object to memory or redis + :param key: key to structure + :param value: structure + :return: None + """ + if hasattr(self, "redis"): + await self.redis_aio.set(key, json.dumps(value, cls=HoraoEncoder)) + self.memory[key] = json.dumps(value, cls=HoraoEncoder) + @instrument_class_function(name="save", level=logging.DEBUG) - async def save(self, key: str, value: Any) -> None: + def save(self, key: str, value: Any) -> None: """ Save the object to memory or redis :param key: key to structure @@ -73,5 +101,15 @@ async def save(self, key: str, value: Any) -> None: :return: None """ if hasattr(self, "redis"): - await self.redis.set(key, json.dumps(value, cls=HoraoEncoder)) + self.redis.set(key, json.dumps(value, cls=HoraoEncoder)) self.memory[key] = json.dumps(value, cls=HoraoEncoder) + + def __del__(self): + """ + Close the redis connection + :return: None + """ + if hasattr(self, "redis"): + self.redis.close() + if hasattr(self, "redis_aio"): + self.redis_aio.close() diff --git a/horao/physical/composite.py b/horao/physical/composite.py index 1e44ca4..f2d7760 100644 --- a/horao/physical/composite.py +++ b/horao/physical/composite.py @@ -22,11 +22,7 @@ def __init__( ): super().__init__(serial_number, model, number) self.name = name - self._modules = ComputerList[Module](modules) - - @property - def modules(self): - return list(iter(self._modules)) + self.modules = ComputerList[Module](modules) def __copy__(self): return Node( @@ -51,22 +47,7 @@ def __init__( ): super().__init__(serial_number, model, number) self.name = name - self._nodes = HardwareList[Node](nodes) - - def add_listener(self, listener): - if listener not in self._nodes.listeners: - self._nodes.add_listeners(listener) - - def remove_listener(self, listener): - if listener in self._nodes.listeners: - self._nodes.remove_listeners(listener) - - @property - def nodes(self): - return list(iter(self._nodes)) - - def change_count(self) -> int: - return self._nodes.change_count() + self.nodes = HardwareList[Node](nodes) def __copy__(self): return Blade( @@ -92,31 +73,8 @@ def __init__( ): super().__init__(serial_number, model, number) self.name = name - self._servers = ComputerList[Server](servers) - self._blades = HardwareList[Blade](blades) - - def add_listener(self, listener): - if listener not in self._servers.listeners: - self._servers.add_listeners(listener) - if listener not in self._blades.listeners: - self._blades.add_listeners(listener) - - def remove_listener(self, listener): - if listener in self._servers.listeners: - self._servers.remove_listeners(listener) - if listener in self._blades.listeners: - self._blades.remove_listeners(listener) - - @property - def servers(self): - return list(iter(self._servers)) - - @property - def blades(self): - return list(iter(self._blades)) - - def change_count(self) -> int: - return self._servers.change_count() + self._blades.change_count() + self.servers = ComputerList[Server](servers) + self.blades = HardwareList[Blade](blades) def __copy__(self): return Chassis( @@ -144,53 +102,24 @@ def __init__( ): super().__init__(serial_number, model, number) self.name = name - self._servers = ComputerList[Server](servers) - self._chassis = HardwareList[Chassis](chassis) - self._switches = NetworkList[Switch](switches) - - def add_listener(self, listener): - if listener not in self._servers.listeners: - self._servers.add_listeners(listener) - if listener not in self._chassis.listeners: - self._chassis.add_listeners(listener) - if listener not in self._switches.listeners: - self._switches.add_listeners(listener) - - def remove_listener(self, listener): - if listener in self._servers.listeners: - self._servers.remove_listeners(listener) - if listener in self._chassis.listeners: - self._chassis.remove_listeners(listener) - if listener in self._switches.listeners: - self._switches.remove_listeners(listener) - - @property - def servers(self): - return list(iter(self._servers)) - - @property - def chassis(self): - return list(iter(self._chassis)) - - @property - def switches(self): - return list(iter(self._switches)) - - def change_count(self) -> int: - return ( - self._servers.change_count() - + self._chassis.change_count() - + self._switches.change_count() - ) - - def merge(self, other: Cabinet, reset_counters: bool = False) -> None: + self.servers = ComputerList[Server](servers) + self.chassis = HardwareList[Chassis](chassis) + self.switches = NetworkList[Switch](switches) + + def merge(self, other: Cabinet, clear_history: bool = True) -> None: + """ + Merge the cabinet with another cabinet + :param other: cabinet to merge with + :param clear_history: clear change history + :return: None + """ self.servers.extend(iter(other.servers)) self.chassis.extend(iter(other.chassis)) self.switches.extend(iter(other.switches)) - if reset_counters: - self._servers.reset_change_count() - self._chassis.reset_change_count() - self._switches.reset_change_count() + if clear_history: + self.servers.clear_history() + self.chassis.clear_history() + self.switches.clear_history() def __copy__(self): return Cabinet( diff --git a/horao/physical/computer.py b/horao/physical/computer.py index c35f4e9..d71f18b 100644 --- a/horao/physical/computer.py +++ b/horao/physical/computer.py @@ -29,68 +29,27 @@ def __init__( self.name = name self.model = model self.number = number - self._cpus = HardwareList[CPU]( + self.cpus = HardwareList[CPU]( hardware=cpus if isinstance(cpus, list) else None, items=cpus if isinstance(cpus, HardwareList) else None, # type: ignore ) - self._rams = HardwareList[RAM]( + self.rams = HardwareList[RAM]( hardware=rams if isinstance(rams, list) else None, items=rams if isinstance(rams, HardwareList) else None, # type: ignore ) - self._nics = HardwareList[NIC]( + self.nics = HardwareList[NIC]( hardware=nics if isinstance(nics, list) else None, items=nics if isinstance(nics, HardwareList) else None, # type: ignore ) - self._disks = HardwareList[Disk]( + self.disks = HardwareList[Disk]( hardware=disks if isinstance(disks, list) else None, items=disks if isinstance(disks, HardwareList) else None, # type: ignore ) - self._accelerators = HardwareList[Accelerator]( + self.accelerators = HardwareList[Accelerator]( hardware=accelerators if isinstance(accelerators, list) else None, items=accelerators if isinstance(accelerators, HardwareList) else None, # type: ignore ) - def add_listener(self, listener): - if listener not in self._disks.listeners: - self._disks.add_listeners(listener) - - def remove_listener(self, listener): - if listener in self._disks.listeners: - self._disks.remove_listeners(listener) - - @property - def cpus(self) -> List[CPU]: - return list(iter(self._cpus)) - - @property - def rams(self) -> List[RAM]: - return list(iter(self._rams)) - - @property - def nics(self) -> List[NIC]: - return list(iter(self._nics)) - - @property - def disks(self) -> List[Disk]: - return list(iter(self._disks)) - - @property - def accelerators(self) -> List[Accelerator]: - return list(iter(self._accelerators)) - - def change_count(self) -> int: - """ - Return the number of changes in the stack - :return: int - """ - return ( - self._cpus.change_count() - + self._rams.change_count() - + self._nics.change_count() - + self._disks.change_count() - + self._accelerators.change_count() - ) - def __copy__(self): return Computer( self.serial_number, diff --git a/horao/physical/hardware.py b/horao/physical/hardware.py index d4988de..a613dd0 100644 --- a/horao/physical/hardware.py +++ b/horao/physical/hardware.py @@ -2,7 +2,7 @@ """Somewhat abstract physical 'hardware'.""" from __future__ import annotations -from abc import ABC +from abc import ABC, abstractmethod from typing import List, TypeVar, Optional from horao.conceptual.crdt import CRDTList, LastWriterWinsMap diff --git a/horao/physical/network.py b/horao/physical/network.py index 598a2c8..cde83a4 100644 --- a/horao/physical/network.py +++ b/horao/physical/network.py @@ -117,14 +117,10 @@ def __init__( self, serial_number, model, number, ports: List[Port] | HardwareList[Port] ): super().__init__(serial_number, model, number) - self._ports = ( + self.ports = ( ports if isinstance(ports, HardwareList) else HardwareList[Port](ports) ) - @property - def ports(self) -> List[Port]: - return list(iter(self._ports)) - def __eq__(self, other) -> bool: if not isinstance(other, NetworkDevice): return False @@ -173,16 +169,12 @@ def __init__( super().__init__(serial_number, model, number, lan_ports) self.name = name self.status = status - self._wan_ports = ( + self.wan_ports = ( wan_ports if isinstance(wan_ports, HardwareList) else HardwareList[Port](wan_ports) ) - @property - def wan_ports(self) -> List[Port]: - return list(iter(self._wan_ports)) - def __eq__(self, other): if not isinstance(other, Firewall): return False @@ -213,16 +205,12 @@ def __init__( self.name = name self.router_type = router_type self.status = status - self._wan_ports = ( + self.wan_ports = ( wan_ports if isinstance(wan_ports, HardwareList) else HardwareList[Port](wan_ports) ) - @property - def wan_ports(self) -> List[Port]: - return list(iter(self._wan_ports)) - def __eq__(self, other): if not isinstance(other, Router): return @@ -258,16 +246,12 @@ def __init__( self.switch_type = switch_type self.status = status self.managed = managed - self._uplink_ports = ( + self.uplink_ports = ( uplink_ports if isinstance(uplink_ports, HardwareList) else HardwareList[Port](uplink_ports) ) - @property - def uplink_ports(self) -> List[Port]: - return list(iter(self._uplink_ports)) - def __eq__(self, other): if not isinstance(other, Switch): return False diff --git a/poetry.lock b/poetry.lock index 518658a..6fdd81a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -37,13 +37,13 @@ tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"] [[package]] name = "astroid" -version = "3.3.5" +version = "3.3.6" description = "An abstract syntax tree for Python with inference support." optional = false python-versions = ">=3.9.0" files = [ - {file = "astroid-3.3.5-py3-none-any.whl", hash = "sha256:a9d1c946ada25098d790e079ba2a1b112157278f3fb7e718ae6a9252f5835dc8"}, - {file = "astroid-3.3.5.tar.gz", hash = "sha256:5cfc40ae9f68311075d27ef68a4841bdc5cc7f6cf86671b49f00607d30188e2d"}, + {file = "astroid-3.3.6-py3-none-any.whl", hash = "sha256:db676dc4f3ae6bfe31cda227dc60e03438378d7a896aec57422c95634e8d722f"}, + {file = "astroid-3.3.6.tar.gz", hash = "sha256:6aaea045f938c735ead292204afdb977a36e989522b7833ef6fea94de743f442"}, ] [[package]] @@ -1352,23 +1352,6 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] -[[package]] -name = "pytest-mock" -version = "3.14.0" -description = "Thin-wrapper around the mock package for easier use with pytest" -optional = false -python-versions = ">=3.8" -files = [ - {file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"}, - {file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"}, -] - -[package.dependencies] -pytest = ">=6.2.5" - -[package.extras] -dev = ["pre-commit", "pytest-asyncio", "tox"] - [[package]] name = "pywin32" version = "308" @@ -1785,4 +1768,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "c9fa5f95b9d5fd13ced3e9b31144c2db5d21c06c24a064a78d14386d870000b7" +content-hash = "0132eba0ce49f6d19327ea3da602993a582c470aa9d8a32679d82786d065665b" diff --git a/pyproject.toml b/pyproject.toml index 7647c48..9c66836 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,6 @@ pytest-cov = "^5.0.0" pytest-asyncio = "^0.23.8" httpx = "^0.27.2" testcontainers = "^4.8.2" -pytest-mock = "^3.14.0" [tool.poetry.group.test] optional = true diff --git a/tests/conceptual/test_crdt.py b/tests/conceptual/test_crdt.py index f123dfd..c94ff79 100644 --- a/tests/conceptual/test_crdt.py +++ b/tests/conceptual/test_crdt.py @@ -477,8 +477,8 @@ def add_log(update: Update): def test_crdtlist_changes_counted(): crdtl = CRDTList() - assert crdtl.change_count() == 0 + assert len(crdtl.changes) == 0 crdtl.append(1) - assert crdtl.change_count() == 1 + assert len(crdtl.changes) == 1 crdtl.append(2) - assert crdtl.change_count() == 2 + assert len(crdtl.changes) == 2 diff --git a/tests/test_persistance.py b/tests/test_persistance.py index 6ad1a95..ea09662 100644 --- a/tests/test_persistance.py +++ b/tests/test_persistance.py @@ -39,8 +39,8 @@ def test_direct_encode_decode_lists(): async def test_storing_loading_logical_clock(): clock = LogicalClock() store = Store(None) - await store.save("clock", clock) - loaded_clock = await store.load("clock") + await store.async_save("clock", clock) + loaded_clock = await store.async_load("clock") assert clock == loaded_clock @@ -50,8 +50,8 @@ async def test_storing_loading_observed_removed_set(): observed_removed_set.add("foo") observed_removed_set.add("bar") store = Store(None) - await store.save("observed_removed_set", observed_removed_set) - loaded_observed_removed_set = await store.load("observed_removed_set") + await store.async_save("observed_removed_set", observed_removed_set) + loaded_observed_removed_set = await store.async_load("observed_removed_set") assert observed_removed_set == loaded_observed_removed_set @@ -62,8 +62,8 @@ async def test_storing_loading_last_writer_wins_register(): lww_register.update(update) store = Store(None) - await store.save("lww_register", lww_register) - loaded_lww_register = await store.load("lww_register") + await store.async_save("lww_register", lww_register) + loaded_lww_register = await store.async_load("lww_register") assert lww_register == loaded_lww_register @@ -74,8 +74,8 @@ async def test_storing_loading_last_writer_wins_map(): value = "bar" lww_map.set(name, value, 1) store = Store(None) - await store.save("lww_map", lww_map) - loaded_lww_map = await store.load("lww_map") + await store.async_save("lww_map", lww_map) + loaded_lww_map = await store.async_load("lww_map") assert lww_map == loaded_lww_map @@ -104,8 +104,8 @@ async def test_storing_loading_switch(): uplink_ports=[], ) store = Store(None) - await store.save("switch", switch) - loaded_switch = await store.load("switch") + await store.async_save("switch", switch) + loaded_switch = await store.async_load("switch") assert switch == loaded_switch @@ -158,8 +158,8 @@ async def test_storing_loading_server(): status=DeviceStatus.Up, ) store = Store(None) - await store.save("server", server) - loaded_server = await store.load("server") + await store.async_save("server", server) + loaded_server = await store.async_load("server") assert server == loaded_server @@ -177,8 +177,8 @@ async def test_storing_loading_list(): ) ) store = Store(None) - await store.save("list", l) - loaded_list = await store.load("list") + await store.async_save("list", l) + loaded_list = await store.async_load("list") assert list(l) == loaded_list @@ -187,6 +187,6 @@ async def test_storing_loading_logical_infrastructure(): dc, dcn = initialize_logical_infrastructure() infrastructure = LogicalInfrastructure({dc: [dcn]}) store = Store(None) - await store.save("infrastructure", infrastructure) - loaded_infrastructure = await store.load("infrastructure") + await store.async_save("infrastructure", infrastructure) + loaded_infrastructure = await store.async_load("infrastructure") assert infrastructure == loaded_infrastructure diff --git a/tests/test_synchronize_call.py b/tests/test_synchronize_call.py index e216a9d..f96414c 100644 --- a/tests/test_synchronize_call.py +++ b/tests/test_synchronize_call.py @@ -20,12 +20,9 @@ async def test_backpressure_timed(mocker): os.environ["PEER_SECRET"] = "secret" dc, dcn = initialize_logical_infrastructure() infrastructure = LogicalInfrastructure(infrastructure={dc: [dcn]}) - sync_peers = SynchronizePeers(infrastructure) - spy = mocker.spy(sync_peers, "synchronize") - assert dc.change_count() == 15 - assert spy.call_count == 0 + sp = SynchronizePeers(infrastructure) + assert len(dc.changes) == 1 dc[1][0].servers.append( Server("123", "foo", "42", 42, [], [], [], [], [], DeviceStatus.Up) ) - assert dc.change_count() == 16 - assert spy.call_count == 1 + assert len(dc.changes) == 4