diff --git a/horao/api/synchronization.py b/horao/api/synchronization.py index 586cea8..e55a1c9 100644 --- a/horao/api/synchronization.py +++ b/horao/api/synchronization.py @@ -33,7 +33,7 @@ async def synchronize(request: Request) -> JSONResponse: """ logging.debug(f"Calling Synchronize ({request})") try: - data = await request.body() + data = await request.json() logical_infrastructure = json.loads(data, cls=HoraoDecoder) except Exception as e: logging.error(f"Error parsing request: {e}") @@ -45,14 +45,14 @@ async def synchronize(request: Request) -> JSONResponse: try: session = init_session() for k, v in logical_infrastructure.infrastructure.items(): - local_dc = session.load(k.identity) + local_dc = await session.load(k.name) if not local_dc: - session.save(k.identity, k) + await session.save(k.name, k) else: local_dc.merge(k) - local_dc_content = session.load(f"{k.identity}.content") + local_dc_content = await session.load(f"{k.name}.content") if not local_dc_content: - session.save(f"{k.identity}.content", v) + await session.save(f"{k.name}.content", v) else: local_dc_content.merge(v) except Exception as e: diff --git a/horao/conceptual/crdt.py b/horao/conceptual/crdt.py index 757b2f0..39a538b 100644 --- a/horao/conceptual/crdt.py +++ b/horao/conceptual/crdt.py @@ -14,6 +14,7 @@ Set, Tuple, TypeVar, + Any, ) from horao.conceptual.decorators import instrument_class_function @@ -472,6 +473,7 @@ def __init__( self, names: Optional[ObservedRemovedSet] = None, registers: Optional[Dict[Hashable, LastWriterWinsRegister]] = None, + clock: Optional[LogicalClock] = None, listeners: Optional[List[Callable]] = None, ) -> None: """ @@ -479,6 +481,7 @@ def __init__( names to LastWriterWinsRegisters, and a shared clock. :param names: ObservedRemovedSet :param registers: dict of names (keys) to LastWriterWinsRegisters (values) + :param clock: LogicalClock :param listeners: list[Callable] :raises TypeError: registers not filled with correct values (and/or types) """ @@ -487,7 +490,7 @@ def __init__( names = ObservedRemovedSet() if names is None else names registers = {} if registers is None else registers - clock = LogicalClock() + clock = LogicalClock() if clock is None else clock names.clock = clock @@ -499,12 +502,6 @@ def __init__( self.clock = clock self.listeners = listeners - def __getstate__(self): - return self.__dict__.copy() - - def __setstate__(self, state): - self.__dict__ = state - def read(self) -> dict: """ Return the eventually consistent data view. @@ -744,7 +741,7 @@ def __eq__(self, other) -> bool: T = TypeVar("T", bound=Hashable) -class CRDTList(Generic[T]): +class CRDTList(List[T]): """CRDTList behaves as a list of instances T that can be updated concurrently.""" def __init__( @@ -757,8 +754,9 @@ def __init__( :param content: list of T instances :param items: LastWriterWinsMap of T items """ + super().__init__() self.log = logging.getLogger(__name__) - self.hardware = LastWriterWinsMap() if not items else items + self.items = LastWriterWinsMap() if not items else items if content: self.extend(content) self.iterator = 0 @@ -770,42 +768,30 @@ def append(self, item: T) -> T: :param item: instance of Hardware :return: inserted item """ - self.hardware.set(len(self), item, hash(item)) + self.items.set(len(self), item, hash(item)) return item - def clear(self) -> None: - """ - Clear the list, not the history - :return: None - """ - # todo check history is consistent - self.iterator = 0 - self.hardware = LastWriterWinsMap() - def copy(self) -> CRDTList[T]: results = CRDTList[T]() - for _, item in self.hardware.read().items(): + for _, item in self.items.read().items(): results.append(item.copy()) return results - def count(self): - return len(self) - - def extend(self, other: Iterable[T]) -> CRDTList[T]: + def extend(self, other: Iterable[T]) -> None: for item in other: - if item not in self.hardware.read(): - self.hardware.set(len(self), item, hash(item)) - return self + if item not in self.items.read(): + self.items.set(len(self), item, hash(item)) - def index(self, item: T) -> int: + def index(self, item: T, **kwargs: Any) -> int: """ Return the index of the hardware instance :param item: instance to search for + :param kwargs: additional arguments :return: int :raises ValueError: item not found """ result = next( - iter([i for i, h in self.hardware.read() if h == item]), + iter([i for i, h in self.items.read() if h == item]), None, ) if result is None: @@ -814,15 +800,15 @@ def index(self, item: T) -> int: return result def insert(self, index: int, item: T) -> None: - self.hardware.set(index, item, hash(item)) + self.items.set(index, item, hash(item)) @instrument_class_function(name="pop", level=logging.DEBUG) def pop(self, index: int, default=None) -> Optional[T]: if index >= len(self): self.log.debug(f"Index {index} out of bounds, returning default.") return default - item = self.hardware.read()[index] - self.hardware.unset(item, hash(item)) + item = self.items.read()[index] + self.items.unset(item, hash(item)) return item @instrument_class_function(name="remove", level=logging.DEBUG) @@ -834,61 +820,45 @@ def remove(self, item: T) -> None: :raises ValueError: item not found """ local_item = next( - iter([h for _, h in self.hardware.read() if h == item]), + iter([h for _, h in self.items.read() if h == item]), None, ) if not local_item: self.log.debug(f"{item} not found.") raise ValueError(f"{item} not found.") - self.hardware.unset(local_item, hash(item)) - - def reverse(self) -> None: - """ - cannot reverse a list inplace in a CRDT - :return: None - :raises: NotImplementedError - """ - raise NotImplementedError("Cannot reverse a list inplace in a CRDT") - - def sort(self, item=None, reverse: bool = False) -> None: - """ - cannot sort a list inplace in a CRDT - :return: None - :raises: NotImplementedError - """ - raise NotImplementedError("Cannot sort a list inplace in a CRDT") + self.items.unset(local_item, hash(item)) def __len__(self) -> int: - return len(self.hardware.read()) + return len(self.items.read()) def __eq__(self, other) -> bool: if not isinstance(other, CRDTList): return False - return self.hardware.read() == other.hardware.read() + return self.items.read() == other.items.read() def __contains__(self, item: T) -> bool: - return item in self.hardware.read() + return item in self.items.read() def __delitem__(self, item: T) -> None: - if item not in self.hardware.read(): + if item not in self.items.read(): raise KeyError(f"{item} not found.") self.remove(item) def __getitem__(self, index: int) -> T: - return self.hardware.read()[index] + return self.items.read()[index] def __setitem__(self, index: int, value: T) -> None: - self.hardware.set(index, value, hash(value)) + self.items.set(index, value, hash(value)) def __iter__(self) -> Iterable[T]: - for _, item in self.hardware.read().items(): + for _, item in self.items.read().items(): yield item def __next__(self) -> T: if self.iterator >= len(self): self.iterator = 0 raise StopIteration - item = self.hardware.read()[self.iterator] + item = self.items.read()[self.iterator] self.iterator += 1 return item @@ -901,13 +871,13 @@ def __sub__(self, other: CRDTList[T]) -> CRDTList[T]: return self def __repr__(self) -> str: - return f"HardwareList({self.hardware.read()})" + return f"HardwareList({self.items.read()})" def __reversed__(self) -> CRDTList[T]: - return self.hardware.read()[::-1] + return self.items.read()[::-1] def __sizeof__(self) -> int: return self.count() def __hash__(self): - return hash(self.hardware) + return hash(self.items) diff --git a/horao/conceptual/support.py b/horao/conceptual/support.py index 49e19da..ca1e3d6 100644 --- a/horao/conceptual/support.py +++ b/horao/conceptual/support.py @@ -2,7 +2,7 @@ """Internal classes for the CRDT implementation.""" from __future__ import annotations -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum, auto from hashlib import sha256 diff --git a/horao/persistance/serialize.py b/horao/persistance/serialize.py index 418caa5..5344c74 100644 --- a/horao/persistance/serialize.py +++ b/horao/persistance/serialize.py @@ -4,7 +4,6 @@ from datetime import date, datetime from json import JSONDecodeError -from networkx.algorithms.structuralholes import constraint from networkx.convert import from_dict_of_dicts, to_dict_of_dicts # type: ignore from horao.conceptual.claim import Reservation @@ -12,6 +11,7 @@ LastWriterWinsMap, LastWriterWinsRegister, ObservedRemovedSet, + CRDTList, ) from horao.conceptual.osi_layers import LinkLayer from horao.conceptual.support import LogicalClock, Update, UpdateType @@ -21,7 +21,7 @@ from horao.logical.resource import Compute, Storage from horao.physical.component import CPU, RAM, Accelerator, Disk from horao.physical.composite import Blade, Cabinet, Chassis, Node -from horao.physical.computer import Module, Server +from horao.physical.computer import Module, Server, ComputerList from horao.physical.hardware import HardwareList from horao.physical.network import ( NIC, @@ -32,6 +32,7 @@ RouterType, Switch, SwitchType, + NetworkList, ) from horao.physical.status import DeviceStatus from horao.physical.storage import StorageType @@ -104,17 +105,13 @@ def default(self, obj): if obj.registers else None ), + "clock": json.dumps(obj.clock, cls=HoraoEncoder) if obj.clock else None, "listeners": ( json.dumps(obj.listeners, cls=HoraoEncoder) if obj.listeners else None ), } - if isinstance(obj, HardwareList): - return { - "type": "HardwareList", - "hardware": json.dumps(obj.hardware, cls=HoraoEncoder), - } if isinstance(obj, Port): return { "type": "Port", @@ -480,14 +477,15 @@ def object_hook(obj): if obj["registers"] else None ), + clock=( + json.loads(obj["clock"], cls=HoraoDecoder) if obj["clock"] else None + ), listeners=( json.loads(obj["listeners"], cls=HoraoDecoder) if obj["listeners"] else None ), ) - if "type" in obj and obj["type"] == "HardwareList": - return HardwareList(items=json.loads(obj["hardware"], cls=HoraoDecoder)) if "type" in obj and obj["type"] == "Port": return Port( serial_number=obj["serial_number"], @@ -703,21 +701,21 @@ def object_hook(obj): tenants = json.loads(obj["tenants"], cls=HoraoDecoder) data_centers = json.loads(obj["data_centers"], cls=HoraoDecoder) infrastructure = {} - for k, v in json.loads(obj["infrastructure"], cls=HoraoDecoder): + for k, v in json.loads(obj["infrastructure"], cls=HoraoDecoder).items(): data_centre = next( iter([dc for dc in data_centers if dc.name == k]), None ) - if not data_centre: + if data_centre is None: raise JSONDecodeError(f"DataCenter {k} not found", obj, 0) infrastructure[data_centre] = v constraints = {} - for k, v in json.loads(obj["constraints"], cls=HoraoDecoder): + for k, v in json.loads(obj["constraints"], cls=HoraoDecoder).items(): tenant = next(iter([t for t in tenants if t.name == k]), None) if not tenant: raise JSONDecodeError(f"Tenant {k} not found", obj, 0) constraints[tenant] = v claims = {} - for k, v in json.loads(obj["claims"], cls=HoraoDecoder): + for k, v in json.loads(obj["claims"], cls=HoraoDecoder).items(): tenant = next(iter([t for t in tenants if t.name == k]), None) if not tenant: raise JSONDecodeError(f"Tenant {k} not found", obj, 0) diff --git a/horao/physical/computer.py b/horao/physical/computer.py index 0f3f5b3..b1b4ed1 100644 --- a/horao/physical/computer.py +++ b/horao/physical/computer.py @@ -6,9 +6,8 @@ from typing import List, Optional, TypeVar from horao.conceptual.crdt import CRDTList, LastWriterWinsMap -from horao.logical.resource import Compute from horao.physical.component import CPU, RAM, Accelerator, Disk -from horao.physical.hardware import Hardware, HardwareList +from horao.physical.hardware import HardwareList from horao.physical.network import NIC from horao.physical.status import DeviceStatus @@ -20,21 +19,36 @@ def __init__( name: str, model: str, number: int, - cpus: List[CPU], - rams: List[RAM], - nics: List[NIC], - disks: Optional[List[Disk]], - accelerators: Optional[List[Accelerator]], + cpus: List[CPU] | HardwareList[CPU], + rams: List[RAM] | HardwareList[RAM], + nics: List[NIC] | HardwareList[NIC], + disks: Optional[List[Disk]] | Optional[HardwareList[Disk]], + accelerators: Optional[List[Accelerator]] | Optional[HardwareList[Accelerator]], ): self.serial_number = serial_number self.name = name self.model = model self.number = number - self.cpus = HardwareList[CPU](cpus) - self.rams = HardwareList[RAM](rams) - self.nics = HardwareList[NIC](nics) - self.disks = HardwareList[Disk](disks) - self.accelerators = HardwareList[Accelerator](accelerators) + self.cpus = HardwareList[CPU]( + hardware=cpus if isinstance(cpus, list) else None, + items=cpus if isinstance(cpus, HardwareList) else None, + ) + self.rams = HardwareList[RAM]( + hardware=rams if isinstance(rams, list) else None, + items=rams if isinstance(rams, HardwareList) else None, + ) + self.nics = HardwareList[NIC]( + hardware=nics if isinstance(nics, list) else None, + items=nics if isinstance(nics, HardwareList) else None, + ) + self.disks = HardwareList[Disk]( + hardware=disks if isinstance(disks, list) else None, + items=disks if isinstance(disks, HardwareList) else None, + ) + self.accelerators = HardwareList[Accelerator]( + hardware=accelerators if isinstance(accelerators, list) else None, + items=accelerators if isinstance(accelerators, HardwareList) else None, + ) def __copy__(self): return Computer( @@ -57,11 +71,16 @@ def __eq__(self, other) -> bool: """ if not isinstance(other, Computer): return False - if self.serial_number != other.serial_number: - return False - if not self.name == other.name: - return False - return True + return ( + self.serial_number == other.serial_number + and self.name == other.name + and self.model == other.model + and self.cpus == other.cpus + and self.rams == other.rams + and self.nics == other.nics + and self.disks == other.disks + and self.accelerators == other.accelerators + ) def __gt__(self, other) -> bool: return self.number > other.number @@ -80,11 +99,11 @@ def __init__( name: str, model: str, number: int, - cpus: List[CPU], - rams: List[RAM], - nics: List[NIC], - disks: Optional[List[Disk]], - accelerators: Optional[List[Accelerator]], + cpus: List[CPU] | HardwareList[CPU], + rams: List[RAM] | HardwareList[RAM], + nics: List[NIC] | HardwareList[NIC], + disks: Optional[List[Disk]] | Optional[HardwareList[Disk]], + accelerators: Optional[List[Accelerator]] | Optional[HardwareList[Accelerator]], status: DeviceStatus, ): super().__init__( @@ -116,11 +135,11 @@ def __init__( name: str, model: str, number: int, - cpus: List[CPU], - rams: List[RAM], - nics: List[NIC], - disks: Optional[List[Disk]], - accelerators: Optional[List[Accelerator]], + cpus: List[CPU] | HardwareList[CPU], + rams: List[RAM] | HardwareList[RAM], + nics: List[NIC] | HardwareList[NIC], + disks: Optional[List[Disk]] | Optional[HardwareList[Disk]], + accelerators: Optional[List[Accelerator]] | Optional[HardwareList[Accelerator]], status: DeviceStatus, ): super().__init__( diff --git a/horao/physical/network.py b/horao/physical/network.py index e193d2a..2f43e3d 100644 --- a/horao/physical/network.py +++ b/horao/physical/network.py @@ -14,10 +14,28 @@ from horao.conceptual.crdt import CRDTList, LastWriterWinsMap from horao.conceptual.osi_layers import LinkLayer -from horao.physical.hardware import Hardware +from horao.physical.hardware import Hardware, HardwareList from horao.physical.status import DeviceStatus +class Port(Hardware): + def __init__( + self, + serial_number: str, + model: str, + number: int, + mac: str, + status: DeviceStatus, + connected: bool, + speed_gb: int, + ): + super().__init__(serial_number, model, number) + self.mac = mac + self.status = status + self.connected = connected + self.speed_gb = speed_gb + + class NetworkTopology(Enum): """Network topologies that should be able to manage.""" @@ -95,14 +113,22 @@ class SwitchType(Enum): class NetworkDevice(Hardware): - def __init__(self, serial_number, model, number, ports: List[Port]): + def __init__( + self, serial_number, model, number, ports: List[Port] | HardwareList[Port] + ): super().__init__(serial_number, model, number) - self.ports = ports + self.ports = ( + ports if isinstance(ports, HardwareList) else HardwareList[Port](ports) + ) def __eq__(self, other) -> bool: if not isinstance(other, NetworkDevice): return False - return self.serial_number == other.serial_number and self.model == other.model + return ( + self.serial_number == other.serial_number + and self.model == other.model + and len(self.ports) == len(other.ports) + ) def __gt__(self, other) -> bool: return self.number > other.number @@ -115,7 +141,13 @@ def __hash__(self) -> int: class NIC(NetworkDevice): - def __init__(self, serial_number: str, model: str, number: int, ports: List[Port]): + def __init__( + self, + serial_number: str, + model: str, + number: int, + ports: List[Port] | HardwareList[Port], + ): super().__init__(serial_number, model, number, ports) @@ -127,14 +159,31 @@ def __init__( model: str, number: int, status: DeviceStatus, - lan_ports: List[Port], - wan_ports: Optional[List[Port]], + lan_ports: List[Port] | HardwareList[Port], + wan_ports: Optional[List[Port]] | Optional[HardwareList[Port]], ): super().__init__(serial_number, model, number, lan_ports) self.name = name self.status = status - self.wan_ports = wan_ports + self.wan_ports = ( + wan_ports + if isinstance(wan_ports, HardwareList) + else HardwareList[Port](wan_ports) + ) + + def __eq__(self, other): + if not isinstance(other, Firewall): + return False + return ( + self.serial_number == other.serial_number + and self.model == other.model + and len(self.ports) == len(other.ports) + and len(self.wan_ports) == len(other.wan_ports) + ) + + def __hash__(self): + return super().__hash__() class Router(NetworkDevice): @@ -146,14 +195,32 @@ def __init__( number: int, router_type: RouterType, status: DeviceStatus, - lan_ports: List[Port], - wan_ports: Optional[List[Port]], + lan_ports: List[Port] | HardwareList[Port], + wan_ports: Optional[List[Port]] | Optional[HardwareList[Port]], ): super().__init__(serial_number, model, number, lan_ports) self.name = name self.router_type = router_type self.status = status - self.wan_ports = wan_ports + self.wan_ports = ( + wan_ports + if isinstance(wan_ports, HardwareList) + else HardwareList[Port](wan_ports) + ) + + def __eq__(self, other): + if not isinstance(other, Router): + return + return ( + self.serial_number == other.serial_number + and self.model == other.model + and self.router_type == other.router_type + and len(self.ports) == len(other.ports) + and len(self.wan_ports) == len(other.wan_ports) + ) + + def __hash__(self): + return hash((self.serial_number, self.model, self.router_type)) class Switch(NetworkDevice): @@ -167,8 +234,8 @@ def __init__( switch_type: SwitchType, status: DeviceStatus, managed: bool, - lan_ports: List[Port], - uplink_ports: Optional[List[Port]], + lan_ports: List[Port] | HardwareList[Port], + uplink_ports: Optional[List[Port]] | Optional[HardwareList[Port]], ): super().__init__(serial_number, model, number, lan_ports) self.name = name @@ -176,7 +243,26 @@ def __init__( self.switch_type = switch_type self.status = status self.managed = managed - self.uplink_ports = uplink_ports + self.uplink_ports = ( + uplink_ports + if isinstance(uplink_ports, HardwareList) + else HardwareList[Port](uplink_ports) + ) + + def __eq__(self, other): + if not isinstance(other, Switch): + return False + return ( + self.serial_number == other.serial_number + and self.model == other.model + and self.layer == other.layer + and self.switch_type == other.switch_type + and len(self.ports) == len(other.ports) + and len(self.uplink_ports) == len(other.uplink_ports) + ) + + def __hash__(self): + return hash((self.serial_number, self.model, self.layer, self.switch_type)) T = TypeVar("T", bound=NetworkDevice) @@ -189,21 +275,3 @@ def __init__( items: Optional[LastWriterWinsMap] = None, ): super().__init__(devices, items) - - -class Port(Hardware): - def __init__( - self, - serial_number: str, - model: str, - number: int, - mac: str, - status: DeviceStatus, - connected: bool, - speed_gb: int, - ): - super().__init__(serial_number, model, number) - self.mac = mac - self.status = status - self.connected = connected - self.speed_gb = speed_gb diff --git a/tests/test_persistance.py b/tests/test_persistance.py index f91fead..6ad1a95 100644 --- a/tests/test_persistance.py +++ b/tests/test_persistance.py @@ -1,14 +1,40 @@ +import json + import pytest from horao.conceptual.crdt import LastWriterWinsMap, LastWriterWinsRegister +from horao.conceptual.osi_layers import LinkLayer from horao.conceptual.support import LogicalClock from horao.logical.infrastructure import LogicalInfrastructure +from horao.persistance import HoraoDecoder, HoraoEncoder from horao.persistance.store import Store +from horao.physical.component import CPU, RAM +from horao.physical.computer import Server +from horao.physical.hardware import HardwareList +from horao.physical.network import Switch, Port, SwitchType, NIC +from horao.physical.status import DeviceStatus from tests.logical.test_scheduler import initialize_logical_infrastructure pytest_plugins = ("pytest_asyncio",) +def test_direct_encode_decode_lists(): + l = HardwareList[CPU]() + l.append( + CPU( + serial_number="123", + model="bar", + number=1, + clock_speed=1.0, + cores=1, + features=None, + ) + ) + ser = json.dumps(l, cls=HoraoEncoder) + deser = json.loads(ser, cls=HoraoDecoder) + assert list(l) == deser + + @pytest.mark.asyncio async def test_storing_loading_logical_clock(): clock = LogicalClock() @@ -53,6 +79,109 @@ async def test_storing_loading_last_writer_wins_map(): assert lww_map == loaded_lww_map +@pytest.mark.asyncio +async def test_storing_loading_switch(): + switch = Switch( + serial_number="123", + name="foo", + model="bar", + number=1, + layer=LinkLayer.Layer2, + switch_type=SwitchType.Access, + status=DeviceStatus.Up, + managed=False, + lan_ports=[ + Port( + serial_number="123", + model="bar", + number=1, + mac="00:00:00:00:00:00", + status=DeviceStatus.Up, + connected=False, + speed_gb=1, + ) + ], + uplink_ports=[], + ) + store = Store(None) + await store.save("switch", switch) + loaded_switch = await store.load("switch") + assert switch == loaded_switch + + +@pytest.mark.asyncio +async def test_storing_loading_server(): + server = Server( + serial_number="123", + name="foo", + model="bar", + number=1, + cpus=[ + CPU( + serial_number="123", + model="bar", + number=1, + clock_speed=1.0, + cores=1, + features=None, + ) + ], + rams=[ + RAM( + serial_number="123", + model="bar", + number=1, + size_gb=1, + speed_mhz=1, + ) + ], + nics=[ + NIC( + serial_number="123", + model="bar", + number=1, + ports=[ + Port( + serial_number="123", + model="bar", + number=1, + mac="00:00:00:00:00:00", + status=DeviceStatus.Up, + connected=False, + speed_gb=1, + ) + ], + ) + ], + disks=[], + accelerators=[], + status=DeviceStatus.Up, + ) + store = Store(None) + await store.save("server", server) + loaded_server = await store.load("server") + assert server == loaded_server + + +@pytest.mark.asyncio +async def test_storing_loading_list(): + l = HardwareList[CPU]() + l.append( + CPU( + serial_number="123", + model="bar", + number=1, + clock_speed=1.0, + cores=1, + features=None, + ) + ) + store = Store(None) + await store.save("list", l) + loaded_list = await store.load("list") + assert list(l) == loaded_list + + @pytest.mark.asyncio async def test_storing_loading_logical_infrastructure(): dc, dcn = initialize_logical_infrastructure()