Skip to content

Commit

Permalink
listeners on higher abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Nov 27, 2024
1 parent f07ddcc commit 2cd8ead
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 15 deletions.
2 changes: 1 addition & 1 deletion horao/conceptual/crdt.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ def __init__(
self.listeners = listeners if listeners else []
self._change_count = 0
self.items = (
LastWriterWinsMap(listeners=[self.increase_change_count])
LastWriterWinsMap(listeners=[self.increase_change_count] + self.listeners)
if not items
else items
)
Expand Down
91 changes: 91 additions & 0 deletions horao/controllers/synchronization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-#
"""Controller for synchronization calls to peers.
There are 2 mechanisms for synchronization:
- Time based: a preset interval to synchronize with all peers
- Event based: a preset number of changes to synchronize with all peers
"""
from __future__ import annotations

import json
import logging
import os
import platform
from datetime import datetime, timedelta
from typing import Dict, List, Optional

import httpx
import jwt

from horao.logical.infrastructure import LogicalInfrastructure
from horao.persistance import HoraoEncoder, init_session


class SynchronizePeers:
"""
Synchronize with all peers.
"""

def __init__(
self,
infrastructure: LogicalInfrastructure,
peers: Optional[List[str]] = None,
max_changes: Optional[int] = None,
sync_delta: Optional[int] = None,
) -> None:
"""
Create instance
:param infrastructure: LogicalInfrastructure
:param peers: list of peers
:param max_changes: number of changes to synchronize
:param sync_delta: interval in seconds between synchronizations
"""
self.logger = logging.getLogger(__name__)
self.infrastructure = infrastructure
self.peers = (
peers
if not os.getenv("PEERS", None)
else peers + os.getenv("PEERS", "").split(",") # type: ignore
)
self.max_changes = (
max_changes if max_changes else int(os.getenv("MAX_CHANGES", 100))
)
self.sync_delta = (
sync_delta if sync_delta else int(os.getenv("SYNC_DELTA", 300))
)
self.session = init_session()
# note that the event handler should support async
for dc, dcs in self.infrastructure.items():
dc.add_listeners(self.synchronize)

async def synchronize(self) -> None:
"""
Synchronize with all peers.
:return: Dict of Peer:Exception for each peer that failed to synchronize
"""
timedelta_exceeded = False
last_sync = await self.session.load("last_sync")
if last_sync and datetime.now() - last_sync < timedelta(
seconds=self.sync_delta
):
timedelta_exceeded = True
max_changes_exceeded = False
if self.infrastructure.change_count() > self.max_changes:
max_changes_exceeded = True
if not timedelta_exceeded and not max_changes_exceeded:
return None
for peer in self.peers: # type: ignore
token = jwt.encode(
dict(peer=os.getenv("HOST_ID", platform.node())),
os.environ["PEER_SECRET"],
algorithm="HS256",
)
try:
lg = httpx.post(
"/synchronize",
headers={"Peer": "true", "Authorization": f"Bearer {token}"},
json=json.dumps(await self.session.items(), cls=HoraoEncoder),
)
lg.raise_for_status()
except httpx.HTTPError as e:
self.logger.error(f"Error synchronizing with {peer}: {e}")
await self.session.save("last_sync", datetime.now())
58 changes: 44 additions & 14 deletions horao/logical/data_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"""Data Center composites"""
from __future__ import annotations

import asyncio
import logging
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Callable, Dict, Iterable, List, Optional, Tuple

import networkx as nx # type: ignore
from networkx.classes import Graph # type: ignore
Expand Down Expand Up @@ -39,49 +40,78 @@ def __init__(
number: int,
rows: Optional[LastWriterWinsMap] = None,
items: Optional[Dict[int, List[Cabinet]]] = None,
listeners: Optional[List[Callable]] = None,
) -> None:
"""
Initialize a data center
:param name: unique name
:param number: unique number referring to potential AZ
:param rows: optional LastWriterWinsMap of rows
:param items: optional dictionary of rows (number, list of cabinets)
:param listeners: optional list of listeners
"""
self.log = logging.getLogger(__name__)
self.name = name
self.number = number
self.listeners = listeners if listeners else []
self.rows = LastWriterWinsMap() if not rows else rows
if items:
for k, v in items.items():
self.rows.set(k, v, hash(k)) # type: ignore

def attach_change_listeners(c: Computer):
c.cpus.add_listeners(self.change_count)
c.rams.add_listeners(self.change_count)
c.disks.add_listeners(self.change_count)
c.nics.add_listeners(self.change_count)
c.accelerators.add_listeners(self.change_count)
c.cpus.add_listeners(self.invoke_listeners)
c.rams.add_listeners(self.invoke_listeners)
c.disks.add_listeners(self.invoke_listeners)
c.nics.add_listeners(self.invoke_listeners)
c.accelerators.add_listeners(self.invoke_listeners)

for _, v in self.rows.read().items():
for cabinet in v:
cabinet.servers.add_listeners(self.change_count)
cabinet.servers.add_listeners(self.invoke_listeners)
for server in cabinet.servers:
attach_change_listeners(server)
cabinet.chassis.add_listeners(self.change_count)
cabinet.chassis.add_listeners(self.invoke_listeners)
for chassis in cabinet.chassis:
chassis.servers.add_listeners(self.change_count)
chassis.servers.add_listeners(self.invoke_listeners)
for server in chassis.servers:
attach_change_listeners(server)
chassis.blades.add_listeners(self.change_count)
chassis.blades.add_listeners(self.invoke_listeners)
for blade in chassis.blades:
blade.nodes.add_listeners(self.change_count)
blade.nodes.add_listeners(self.invoke_listeners)
for node in blade.nodes:
node.modules.add_listeners(self.change_count)
node.modules.add_listeners(self.invoke_listeners)
for module in node.modules:
attach_change_listeners(module)
cabinet.switches.add_listeners(self.change_count)
cabinet.switches.add_listeners(self.invoke_listeners)
for switch in cabinet.switches:
switch.ports.add_listeners(self.change_count)
switch.ports.add_listeners(self.invoke_listeners)

def add_listeners(self, listener: Callable) -> None:
"""
Adds an async listener that is called on each update.
:param listener: Callable
:return: None
"""
if not listener in self.listeners:
self.listeners.append(listener)

def remove_listeners(self, listener: Callable) -> None:
"""
Removes a listener if it was previously added.
:param listener: Callable[[Update], None]
:return: None
"""
if listener in self.listeners:
self.listeners.remove(listener)

def invoke_listeners(self) -> None:
"""
Invokes all async event listeners.
:return: None
"""
for listener in self.listeners:
asyncio.create_task(listener())

def change_count(self) -> int:
"""
Expand Down
7 changes: 7 additions & 0 deletions horao/logical/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ def __init__(
self.constraints = constraints or {}
self.claims = claims or {}

def change_count(self) -> int:
"""
Count the number of changes in the infrastructure.
:return: number of changes
"""
return sum([d.change_count() for d in self.infrastructure.keys()])

def clear(self) -> None:
self.infrastructure.clear()

Expand Down
27 changes: 27 additions & 0 deletions horao/persistance/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,33 @@ def __init__(self, url: Optional[str] = None) -> None:
self.redis = redis.Redis.from_url(url)
self.memory: Dict[str, Any] = {}

async def keys(self) -> Dict[str, Any] | Any:
"""
Return all keys in the store
:return: keys
"""
if hasattr(self, "redis"):
return await self.redis.keys()
return self.memory.keys()

async def values(self) -> Dict[str, Any] | Any:
"""
Return all values in the store
:return: values
"""
if hasattr(self, "redis"):
return await self.redis.values()
return self.memory.values()

async def items(self) -> Dict[str, Any] | Any:
"""
Return all items in the store
:return: items
"""
if hasattr(self, "redis"):
return await self.redis.items()
return self.memory.items()

@instrument_class_function(name="load", level=logging.DEBUG)
async def load(self, key: str) -> Any | None:
"""
Expand Down
9 changes: 9 additions & 0 deletions horao/physical/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def __init__(
else HardwareList[Port](wan_ports)
)

def change_count(self) -> int:
return super().change_count() + self.wan_ports.change_count()

def __eq__(self, other):
if not isinstance(other, Firewall):
return False
Expand Down Expand Up @@ -214,6 +217,9 @@ def __init__(
else HardwareList[Port](wan_ports)
)

def change_count(self) -> int:
return super().change_count() + self.wan_ports.change_count()

def __eq__(self, other):
if not isinstance(other, Router):
return
Expand Down Expand Up @@ -255,6 +261,9 @@ def __init__(
else HardwareList[Port](uplink_ports)
)

def change_count(self) -> int:
return super().change_count() + self.uplink_ports.change_count()

def __eq__(self, other):
if not isinstance(other, Switch):
return False
Expand Down

0 comments on commit 2cd8ead

Please sign in to comment.