Skip to content

Commit

Permalink
fixed sync tree
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Dec 9, 2024
1 parent 398a110 commit 99b25b0
Show file tree
Hide file tree
Showing 16 changed files with 211 additions and 398 deletions.
14 changes: 7 additions & 7 deletions horao/api/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,23 @@ async def synchronize(request: Request) -> JSONResponse:
return JSONResponse(status_code=400, content={"error": "Error parsing request"})
try:
session = init_session()
for k, v in logical_infrastructure.logical_infrastructure.items():
local_dc = await session.load(k.name)
for k, v in logical_infrastructure.infrastructure.items():
local_dc = await session.async_load(k.name)
if not local_dc:
await session.save(k.name, k)
await session.async_save(k.name, k)
else:
local_dc.merge(k)
local_dc_content = await session.load(f"{k.name}.content")
local_dc_content = await session.async_load(f"{k.name}.content")
if not local_dc_content:
await session.save(f"{k.name}.content", v)
await session.async_save(f"{k.name}.content", v)
else:
local_dc_content.merge(v)
if logical_infrastructure.claims:
for k, v in logical_infrastructure.claims.items():
await session.save(k.name, k)
await session.async_save(k.name, k)
if logical_infrastructure.constraints:
for k, v in logical_infrastructure.contraints.items():
await session.save(k.name, k)
await session.async_save(k.name, k)
except Exception as e:
logging.error(f"Error synchronizing: {e}")
if os.getenv("DEBUG", "False") == "True":
Expand Down
32 changes: 8 additions & 24 deletions horao/conceptual/crdt.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,11 +764,9 @@ def __init__(
super().__init__()
self.log = logging.getLogger(__name__)
self.listeners = listeners if listeners else []
self._change_count = 0
self.changes: List[Update] = []
self.items = (
LastWriterWinsMap(listeners=[self.increase_change_count])
if not items
else items
LastWriterWinsMap(listeners=[self.invoke_listeners]) if not items else items
)
if content:
self.extend(content)
Expand Down Expand Up @@ -797,31 +795,17 @@ def invoke_listeners(self, update: Optional[Update] = None) -> None:
Invokes all event listeners.
:return: None
"""
if update and not update in self.changes:
self.changes.append(update)
for listener in self.listeners:
listener(update)
listener(self.changes)

def increase_change_count(self, update: Optional[Update] = None) -> None:
def clear_history(self) -> None:
"""
Increase the change count.
:param update: change to process
Clear the history of changes.
:return: None
"""
self._change_count += 1
self.invoke_listeners(update)

def change_count(self) -> int:
"""
Return the number of changes.
:return: int
"""
return self._change_count

def reset_change_count(self) -> None:
"""
Reset the change count.
:return: None
"""
self._change_count = 0
self.changes.clear()

@instrument_class_function(name="append", level=logging.DEBUG)
def append(self, item: T) -> T:
Expand Down
10 changes: 5 additions & 5 deletions horao/controllers/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"""
from __future__ import annotations

import asyncio
import json
import logging
import os
Expand Down Expand Up @@ -54,26 +53,27 @@ def __init__(
for dc in self.logical_infrastructure.infrastructure.keys():
dc.add_listeners(self.synchronize)

def synchronize(self) -> None:
def synchronize(self, changes: Optional[List] = None) -> None:
"""
Synchronize with all peers, if one of the following conditions is met:
- there was no previous synchronization time stamp.
- the timedelta between previous synchronization and now is greater than set bound
- the amount of changes on the stack exceed the threshold that was set
will only call synchronize if there are any changes if the timer expires
note: currently only the infrastructure is tracked, changes to claims and constraints are not tracked
:param changes: optional list of changes
:return: None
"""
if not self.peers:
return None
timedelta_exceeded = False
last_sync = asyncio.run(self.session.load("last_sync"))
last_sync = self.session.load("last_sync")
if not last_sync or datetime.now() - last_sync < timedelta(
seconds=self.sync_delta
):
timedelta_exceeded = True
max_changes_exceeded = False
if self.logical_infrastructure.change_count() > self.max_changes:
if changes and len(changes) > self.max_changes:
max_changes_exceeded = True
if not timedelta_exceeded and not max_changes_exceeded:
return None
Expand All @@ -93,4 +93,4 @@ def synchronize(self) -> None:
lg.raise_for_status()
except httpx.HTTPError as e:
self.logger.error(f"Error synchronizing with {peer}: {e}")
asyncio.run(self.session.save("last_sync", datetime.now()))
self.session.save("last_sync", datetime.now())
166 changes: 55 additions & 111 deletions horao/logical/data_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from horao.conceptual.crdt import LastWriterWinsMap
from horao.conceptual.decorators import instrument_class_function
from horao.conceptual.support import Update
from horao.physical.component import Disk
from horao.physical.composite import Blade, Cabinet, Chassis, Server
from horao.physical.composite import Cabinet
from horao.physical.computer import Computer
from horao.physical.network import (
NIC,
Expand Down Expand Up @@ -60,6 +59,7 @@ def __init__(
if items:
for k, v in items.items():
self.rows.set(k, v, hash(k)) # type: ignore
self.changes: List[Update] = []

def add_listeners(self, listener: Callable) -> None:
"""
Expand All @@ -73,44 +73,25 @@ def add_listeners(self, listener: Callable) -> None:
def remove_listeners(self, listener: Callable) -> None:
"""
Removes a listener if it was previously added.
:param listener: Callable[[Update], None]
:param listener: Callable
:return: None
"""
if listener in self.listeners:
self.listeners.remove(listener)

def invoke_listeners(self, update: Optional[Update] = None) -> None:
def invoke_listeners(self, changes: Optional[List] = None) -> None:
"""
Invokes all event listeners.
:param changes: list of changes
:return: None
"""
if changes:
if isinstance(changes, List):
self.changes.extend(changes)
else:
self.changes.append(changes)
for listener in self.listeners:
listener(update)

def change_count(self) -> int:
"""
Sum the change count of all components in the data center
:return: int
"""
total = 0
for _, v in self.rows.read().items():
for cabinet in v:
for server in cabinet._servers:
total += server.change_count()
total += cabinet._servers.change_count()
for chassis in cabinet._chassis:
total += chassis.change_count()
for blade in chassis._blades:
total += blade.change_count()
for node in blade._nodes:
total += node.change_count()
for module in node._modules:
total += module.change_count()
total += node._modules.change_count()
total += chassis._blades.change_count()
total += cabinet._chassis.change_count()
total += cabinet._switches.change_count()
return total
listener(changes)

@instrument_class_function(name="copy", level=logging.DEBUG)
def copy(self) -> Dict[int, List[Cabinet]]:
Expand All @@ -127,7 +108,9 @@ def has_key(self, k: int) -> bool:
return False

def update(self, key: int, value: List[Cabinet]) -> None:
self.rows.set(key, value, hash(key)) # type: ignore
if key in self.keys():
self.__delitem__(key)
self.__setitem__(key, value, hash(key)) # type: ignore

def keys(self) -> List[int]:
return [k for k, _ in self.rows.read()]
Expand Down Expand Up @@ -163,30 +146,45 @@ def merge(self, other: DataCenter) -> None:
else:
self[number] = row

# reset change counters
# clear the change history
for _, v in self.rows.read().items():
for cabinet in v:
for server in cabinet.servers:
server._disks.reset_change_count()
cabinet.servers.reset_change_count()
server.disks.clear_history()
cabinet.servers.clear_history()
for chassis in cabinet.chassis:
for blade in chassis.blades:
for node in blade.nodes:
for module in node.modules:
module._disks.reset_change_count()
node.modules.reset_change_count()
blade.nodes.reset_change_count()
chassis.blades.reset_change_count()
cabinet.chassis.reset_change_count()
cabinet.switches.reset_change_count()
v.reset_change_count()
module.disks.clear_history()
node.modules.clear_history()
blade.nodes.clear_history()
chassis.blades.clear_history()
cabinet.chassis.clear_history()
cabinet.switches.clear_history()

def __eq__(self, other) -> bool:
if not isinstance(other, DataCenter):
return False
return self.name == other.name

def __setitem__(self, key: int, item: List[Cabinet]) -> None:
# glue all handlers to event invocation
for cabinet in item:
for server in cabinet.servers:
server.disks.add_listeners(self.invoke_listeners)
cabinet.servers.add_listeners(self.invoke_listeners)
for chassis in cabinet.chassis:
for blade in chassis.blades:
for node in blade.nodes:
for module in node.modules:
module.disks.add_listeners(self.invoke_listeners)
node.modules.add_listeners(self.invoke_listeners)
blade.nodes.add_listeners(self.invoke_listeners)
chassis.blades.add_listeners(self.invoke_listeners)
cabinet.chassis.add_listeners(self.invoke_listeners)
cabinet.switches.add_listeners(self.invoke_listeners)
# insert the row
self.rows.set(key, item, hash(key)) # type: ignore

@instrument_class_function(name="getitem", level=logging.DEBUG)
Expand All @@ -200,6 +198,22 @@ def __getitem__(self, key) -> List[Cabinet]:
def __delitem__(self, key) -> None:
for k, v in self.rows.read().items():
if k == key:
# remove all listeners
for cabinet in v:
for server in cabinet.servers:
server.disks.remove_listeners(self.invoke_listeners)
cabinet.servers.remove_listeners(self.invoke_listeners)
for chassis in cabinet.chassis:
for blade in chassis.blades:
for node in blade.nodes:
for module in node.modules:
module.disks.remove_listeners(self.invoke_listeners)
node.modules.remove_listeners(self.invoke_listeners)
blade.nodes.remove_listeners(self.invoke_listeners)
chassis.blades.remove_listeners(self.invoke_listeners)
cabinet.chassis.remove_listeners(self.invoke_listeners)
cabinet.switches.remove_listeners(self.invoke_listeners)
# remove the row
self.rows.unset(key, hash(key))
return
raise KeyError(f"Key {key} not found")
Expand All @@ -223,76 +237,6 @@ def __iter__(self) -> Iterable[Tuple[int, List[Cabinet]]]:
def __hash__(self) -> int:
return hash((self.name, self.number))

@staticmethod
@instrument_class_function(name="move_server", level=logging.DEBUG)
def move_server(server: Server, from_cabinet: Cabinet, to_cabinet: Cabinet) -> None:
"""
Move a server from one cabinet to another
:param server: server to move
:param from_cabinet: from
:param to_cabinet: to
:return: None
:raises: ValueError if you try to remove a server that doesn't exist
"""
if not server in from_cabinet.servers:
raise ValueError("Cannot move servers that are not installed.")
from_cabinet.servers.remove(server)
to_cabinet.servers.append(server)

@staticmethod
@instrument_class_function(name="move_chassis", level=logging.DEBUG)
def move_chassis_server(
server: Server, from_chassis: Chassis, to_chassis: Chassis
) -> None:
"""
Move a server from one cabinet to another
:param server: server to move
:param from_chassis: from
:param to_chassis: to
:return: None
:raises: ValueError if you try to remove a server that doesn't exist
"""
if not server in from_chassis.servers:
raise ValueError("Cannot move servers that are not installed.")
from_chassis.servers.remove(server)
to_chassis.servers.append(server)

@staticmethod
@instrument_class_function(name="move_blade", level=logging.DEBUG)
def move_blade(blade: Blade, from_chassis: Chassis, to_chassis: Chassis) -> None:
"""
Move a server from one chassis to another
:param blade: blade to move
:param from_chassis: from
:param to_chassis: to
:return: None
:raises: ValueError if you try to remove a blade that is not installed
"""
if not blade in from_chassis.blades:
raise ValueError("Cannot move blades that are not installed.")
from_chassis.blades.remove(blade)
to_chassis.blades.append(blade)

@staticmethod
@instrument_class_function(name="swap_disk", level=logging.DEBUG)
def swap_disk(
server: Server, old_disk: Optional[Disk], new_disk: Optional[Disk]
) -> None:
"""
Swap a (broken) disk in a server
:param server: server to swap disk from/in
:param old_disk: old disk to remove (if any)
:param new_disk: new disk to add (if any)
:return: None
:raises: ValueError if you try to remove a disk that doesn't exist
"""
if new_disk is not None:
server.disks.append(new_disk)
if old_disk:
if not server.disks:
raise ValueError("Cannot remove disks that are not installed.")
server.disks.remove(old_disk)

def fetch_server_nic(
self, row: int, cabinet: int, server: int, nic: int, chassis: Optional[int]
) -> NIC:
Expand Down
6 changes: 3 additions & 3 deletions horao/logical/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import logging
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple, Any

from horao.conceptual.claim import Claim, Reservation
from horao.conceptual.decorators import instrument_class_function
Expand Down Expand Up @@ -38,12 +38,12 @@ def __init__(
self.constraints = constraints or {}
self.claims = claims or {}

def change_count(self) -> int:
def changes(self) -> List[Any]:
"""
Count the number of changes in the infrastructure.
:return: number of changes
"""
return sum([d.change_count() for d in self.infrastructure.keys()])
return [d.changes for d in self.infrastructure.keys()]

def clear(self) -> None:
self.infrastructure.clear()
Expand Down
Loading

0 comments on commit 99b25b0

Please sign in to comment.