Skip to content

Commit

Permalink
improved but not fixed yet
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Nov 4, 2024
1 parent 9397108 commit be3d293
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 139 deletions.
10 changes: 5 additions & 5 deletions horao/api/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def synchronize(request: Request) -> JSONResponse:
"""
logging.debug(f"Calling Synchronize ({request})")
try:
data = await request.body()
data = await request.json()
logical_infrastructure = json.loads(data, cls=HoraoDecoder)
except Exception as e:
logging.error(f"Error parsing request: {e}")
Expand All @@ -45,14 +45,14 @@ async def synchronize(request: Request) -> JSONResponse:
try:
session = init_session()
for k, v in logical_infrastructure.infrastructure.items():
local_dc = session.load(k.identity)
local_dc = await session.load(k.name)
if not local_dc:
session.save(k.identity, k)
await session.save(k.name, k)
else:
local_dc.merge(k)
local_dc_content = session.load(f"{k.identity}.content")
local_dc_content = await session.load(f"{k.name}.content")
if not local_dc_content:
session.save(f"{k.identity}.content", v)
await session.save(f"{k.name}.content", v)
else:
local_dc_content.merge(v)
except Exception as e:
Expand Down
92 changes: 31 additions & 61 deletions horao/conceptual/crdt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Set,
Tuple,
TypeVar,
Any,
)

from horao.conceptual.decorators import instrument_class_function
Expand Down Expand Up @@ -472,13 +473,15 @@ def __init__(
self,
names: Optional[ObservedRemovedSet] = None,
registers: Optional[Dict[Hashable, LastWriterWinsRegister]] = None,
clock: Optional[LogicalClock] = None,
listeners: Optional[List[Callable]] = None,
) -> None:
"""
Initialize an LastWriterWinsMap from an ObservedRemovedSet of names, a dict mapping
names to LastWriterWinsRegisters, and a shared clock.
:param names: ObservedRemovedSet
:param registers: dict of names (keys) to LastWriterWinsRegisters (values)
:param clock: LogicalClock
:param listeners: list[Callable]
:raises TypeError: registers not filled with correct values (and/or types)
"""
Expand All @@ -487,7 +490,7 @@ def __init__(

names = ObservedRemovedSet() if names is None else names
registers = {} if registers is None else registers
clock = LogicalClock()
clock = LogicalClock() if clock is None else clock

names.clock = clock

Expand All @@ -499,12 +502,6 @@ def __init__(
self.clock = clock
self.listeners = listeners

def __getstate__(self):
return self.__dict__.copy()

def __setstate__(self, state):
self.__dict__ = state

def read(self) -> dict:
"""
Return the eventually consistent data view.
Expand Down Expand Up @@ -744,7 +741,7 @@ def __eq__(self, other) -> bool:
T = TypeVar("T", bound=Hashable)


class CRDTList(Generic[T]):
class CRDTList(List[T]):
"""CRDTList behaves as a list of instances T that can be updated concurrently."""

def __init__(
Expand All @@ -757,8 +754,9 @@ def __init__(
:param content: list of T instances
:param items: LastWriterWinsMap of T items
"""
super().__init__()
self.log = logging.getLogger(__name__)
self.hardware = LastWriterWinsMap() if not items else items
self.items = LastWriterWinsMap() if not items else items
if content:
self.extend(content)
self.iterator = 0
Expand All @@ -770,42 +768,30 @@ def append(self, item: T) -> T:
:param item: instance of Hardware
:return: inserted item
"""
self.hardware.set(len(self), item, hash(item))
self.items.set(len(self), item, hash(item))
return item

def clear(self) -> None:
"""
Clear the list, not the history
:return: None
"""
# todo check history is consistent
self.iterator = 0
self.hardware = LastWriterWinsMap()

def copy(self) -> CRDTList[T]:
results = CRDTList[T]()
for _, item in self.hardware.read().items():
for _, item in self.items.read().items():
results.append(item.copy())
return results

def count(self):
return len(self)

def extend(self, other: Iterable[T]) -> CRDTList[T]:
def extend(self, other: Iterable[T]) -> None:
for item in other:
if item not in self.hardware.read():
self.hardware.set(len(self), item, hash(item))
return self
if item not in self.items.read():
self.items.set(len(self), item, hash(item))

def index(self, item: T) -> int:
def index(self, item: T, **kwargs: Any) -> int:
"""
Return the index of the hardware instance
:param item: instance to search for
:param kwargs: additional arguments
:return: int
:raises ValueError: item not found
"""
result = next(
iter([i for i, h in self.hardware.read() if h == item]),
iter([i for i, h in self.items.read() if h == item]),
None,
)
if result is None:
Expand All @@ -814,15 +800,15 @@ def index(self, item: T) -> int:
return result

def insert(self, index: int, item: T) -> None:
self.hardware.set(index, item, hash(item))
self.items.set(index, item, hash(item))

@instrument_class_function(name="pop", level=logging.DEBUG)
def pop(self, index: int, default=None) -> Optional[T]:
if index >= len(self):
self.log.debug(f"Index {index} out of bounds, returning default.")
return default
item = self.hardware.read()[index]
self.hardware.unset(item, hash(item))
item = self.items.read()[index]
self.items.unset(item, hash(item))
return item

@instrument_class_function(name="remove", level=logging.DEBUG)
Expand All @@ -834,61 +820,45 @@ def remove(self, item: T) -> None:
:raises ValueError: item not found
"""
local_item = next(
iter([h for _, h in self.hardware.read() if h == item]),
iter([h for _, h in self.items.read() if h == item]),
None,
)
if not local_item:
self.log.debug(f"{item} not found.")
raise ValueError(f"{item} not found.")
self.hardware.unset(local_item, hash(item))

def reverse(self) -> None:
"""
cannot reverse a list inplace in a CRDT
:return: None
:raises: NotImplementedError
"""
raise NotImplementedError("Cannot reverse a list inplace in a CRDT")

def sort(self, item=None, reverse: bool = False) -> None:
"""
cannot sort a list inplace in a CRDT
:return: None
:raises: NotImplementedError
"""
raise NotImplementedError("Cannot sort a list inplace in a CRDT")
self.items.unset(local_item, hash(item))

def __len__(self) -> int:
return len(self.hardware.read())
return len(self.items.read())

def __eq__(self, other) -> bool:
if not isinstance(other, CRDTList):
return False
return self.hardware.read() == other.hardware.read()
return self.items.read() == other.items.read()

def __contains__(self, item: T) -> bool:
return item in self.hardware.read()
return item in self.items.read()

def __delitem__(self, item: T) -> None:
if item not in self.hardware.read():
if item not in self.items.read():
raise KeyError(f"{item} not found.")
self.remove(item)

def __getitem__(self, index: int) -> T:
return self.hardware.read()[index]
return self.items.read()[index]

def __setitem__(self, index: int, value: T) -> None:
self.hardware.set(index, value, hash(value))
self.items.set(index, value, hash(value))

def __iter__(self) -> Iterable[T]:
for _, item in self.hardware.read().items():
for _, item in self.items.read().items():
yield item

def __next__(self) -> T:
if self.iterator >= len(self):
self.iterator = 0
raise StopIteration
item = self.hardware.read()[self.iterator]
item = self.items.read()[self.iterator]
self.iterator += 1
return item

Expand All @@ -901,13 +871,13 @@ def __sub__(self, other: CRDTList[T]) -> CRDTList[T]:
return self

def __repr__(self) -> str:
return f"HardwareList({self.hardware.read()})"
return f"HardwareList({self.items.read()})"

def __reversed__(self) -> CRDTList[T]:
return self.hardware.read()[::-1]
return self.items.read()[::-1]

def __sizeof__(self) -> int:
return self.count()

def __hash__(self):
return hash(self.hardware)
return hash(self.items)
2 changes: 1 addition & 1 deletion horao/conceptual/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Internal classes for the CRDT implementation."""
from __future__ import annotations

from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum, auto
from hashlib import sha256
Expand Down
24 changes: 11 additions & 13 deletions horao/persistance/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from datetime import date, datetime
from json import JSONDecodeError

from networkx.algorithms.structuralholes import constraint
from networkx.convert import from_dict_of_dicts, to_dict_of_dicts # type: ignore

from horao.conceptual.claim import Reservation
from horao.conceptual.crdt import (
LastWriterWinsMap,
LastWriterWinsRegister,
ObservedRemovedSet,
CRDTList,
)
from horao.conceptual.osi_layers import LinkLayer
from horao.conceptual.support import LogicalClock, Update, UpdateType
Expand All @@ -21,7 +21,7 @@
from horao.logical.resource import Compute, Storage
from horao.physical.component import CPU, RAM, Accelerator, Disk
from horao.physical.composite import Blade, Cabinet, Chassis, Node
from horao.physical.computer import Module, Server
from horao.physical.computer import Module, Server, ComputerList
from horao.physical.hardware import HardwareList
from horao.physical.network import (
NIC,
Expand All @@ -32,6 +32,7 @@
RouterType,
Switch,
SwitchType,
NetworkList,
)
from horao.physical.status import DeviceStatus
from horao.physical.storage import StorageType
Expand Down Expand Up @@ -104,17 +105,13 @@ def default(self, obj):
if obj.registers
else None
),
"clock": json.dumps(obj.clock, cls=HoraoEncoder) if obj.clock else None,
"listeners": (
json.dumps(obj.listeners, cls=HoraoEncoder)
if obj.listeners
else None
),
}
if isinstance(obj, HardwareList):
return {
"type": "HardwareList",
"hardware": json.dumps(obj.hardware, cls=HoraoEncoder),
}
if isinstance(obj, Port):
return {
"type": "Port",
Expand Down Expand Up @@ -480,14 +477,15 @@ def object_hook(obj):
if obj["registers"]
else None
),
clock=(
json.loads(obj["clock"], cls=HoraoDecoder) if obj["clock"] else None
),
listeners=(
json.loads(obj["listeners"], cls=HoraoDecoder)
if obj["listeners"]
else None
),
)
if "type" in obj and obj["type"] == "HardwareList":
return HardwareList(items=json.loads(obj["hardware"], cls=HoraoDecoder))
if "type" in obj and obj["type"] == "Port":
return Port(
serial_number=obj["serial_number"],
Expand Down Expand Up @@ -703,21 +701,21 @@ def object_hook(obj):
tenants = json.loads(obj["tenants"], cls=HoraoDecoder)
data_centers = json.loads(obj["data_centers"], cls=HoraoDecoder)
infrastructure = {}
for k, v in json.loads(obj["infrastructure"], cls=HoraoDecoder):
for k, v in json.loads(obj["infrastructure"], cls=HoraoDecoder).items():
data_centre = next(
iter([dc for dc in data_centers if dc.name == k]), None
)
if not data_centre:
if data_centre is None:
raise JSONDecodeError(f"DataCenter {k} not found", obj, 0)
infrastructure[data_centre] = v
constraints = {}
for k, v in json.loads(obj["constraints"], cls=HoraoDecoder):
for k, v in json.loads(obj["constraints"], cls=HoraoDecoder).items():
tenant = next(iter([t for t in tenants if t.name == k]), None)
if not tenant:
raise JSONDecodeError(f"Tenant {k} not found", obj, 0)
constraints[tenant] = v
claims = {}
for k, v in json.loads(obj["claims"], cls=HoraoDecoder):
for k, v in json.loads(obj["claims"], cls=HoraoDecoder).items():
tenant = next(iter([t for t in tenants if t.name == k]), None)
if not tenant:
raise JSONDecodeError(f"Tenant {k} not found", obj, 0)
Expand Down
Loading

0 comments on commit be3d293

Please sign in to comment.