Skip to content

Commit

Permalink
fixing serialization, not done yet
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Nov 4, 2024
1 parent 2b3a791 commit 9397108
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 155 deletions.
15 changes: 9 additions & 6 deletions horao/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from opentelemetry.sdk.metrics.export import (
PeriodicExportingMetricReader, # type: ignore
)
from starlette.authentication import AuthenticationBackend

if os.getenv("OLTP_HTTP", "False") == "False":
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
Expand All @@ -45,8 +46,7 @@
import horao.api
import horao.api.synchronization
import horao.auth
from horao.auth.basic import BasicAuthBackend
from horao.auth.peer import PeerAuthBackend
from horao.auth.multi import MultiAuthBackend

LoggingInstrumentor().instrument(set_logging_format=True)

Expand Down Expand Up @@ -121,9 +121,10 @@ async def docs(request):
return HTMLResponse(html)


def init() -> Starlette:
def init(authorization: AuthenticationBackend = None) -> Starlette:
"""
Initialize the API
authorization: optional authorization backend to overwrite default behavior (useful for testing)
:return: app instance
"""
if os.getenv("DEBUG", "False") == "True":
Expand All @@ -148,11 +149,13 @@ def init() -> Starlette:
routes.append(Route("/docs", endpoint=docs, methods=["GET"]))
middleware = [
Middleware(CORSMiddleware, allow_origins=[cors]),
Middleware(AuthenticationMiddleware, backend=PeerAuthBackend()),
]
if os.getenv("AUTH", "basic") == "basic":
if authorization:
logger.warning(f"Using custom authorization backend: {type(authorization)}")
middleware.append(Middleware(AuthenticationMiddleware, backend=authorization))
else:
middleware.append(
Middleware(AuthenticationMiddleware, backend=BasicAuthBackend())
Middleware(AuthenticationMiddleware, backend=MultiAuthBackend())
)
app = Starlette(
routes=routes,
Expand Down
23 changes: 16 additions & 7 deletions horao/api/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""All calls needed for synchronizing HORAO instances."""
import json
import logging
import os

from starlette.authentication import requires
from starlette.requests import Request
Expand Down Expand Up @@ -32,25 +33,33 @@ async def synchronize(request: Request) -> JSONResponse:
"""
logging.debug(f"Calling Synchronize ({request})")
try:
data = await request.json()
data = await request.body()
logical_infrastructure = json.loads(data, cls=HoraoDecoder)
except Exception as e:
logging.error(f"Error parsing request: {e}")
if os.getenv("DEBUG", "False") == "True":
return JSONResponse(
status_code=400, content={"error": f"Error parsing request {str(e)}"}
)
return JSONResponse(status_code=400, content={"error": "Error parsing request"})
try:
logical_infrastructure = json.loads(data, cls=HoraoDecoder)
session = init_session()
for k, v in logical_infrastructure.infrastructure.items():
local_dc = session.load(k.id)
local_dc = session.load(k.identity)
if not local_dc:
session.save(k.id, k)
session.save(k.identity, k)
else:
local_dc.merge(k)
local_dc_content = session.load(f"{k.id}.content")
local_dc_content = session.load(f"{k.identity}.content")
if not local_dc_content:
session.save(f"{k.id}.content", v)
session.save(f"{k.identity}.content", v)
else:
local_dc_content.merge(v)
except Exception as e:
logging.error(f"Error synchronizing: {e}")
return JSONResponse(status_code=500, content={"error": "Error synchronizing"})
if os.getenv("DEBUG", "False") == "True":
return JSONResponse(
status_code=500, content={"error": f"Error synchronizing {str(e)}"}
)
return JSONResponse(status_code=500, content={"error": f"Error synchronizing"})
return JSONResponse(status_code=200, content={"status": "is alive"})
2 changes: 1 addition & 1 deletion horao/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
various implementations that can be used, but some are only meant for development purpose.
"""
from horao.auth.basic import BasicAuthBackend
from horao.auth.peer import PeerAuthBackend, Peer
from horao.auth.multi import MultiAuthBackend, Peer
96 changes: 96 additions & 0 deletions horao/auth/multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-#
"""Authorization for peers.
Digest authentication using pre-shared key.
"""
import binascii
import logging
import os
from typing import Tuple, Union

import jwt
from starlette.authentication import (
AuthCredentials,
AuthenticationBackend,
AuthenticationError,
BaseUser,
)
from starlette.requests import HTTPConnection


class Peer(BaseUser):
def __init__(self, identity: str, token: str, payload, origin: str) -> None:
self.id = identity
self.token = token
self.payload = payload
self.origin = origin

@property
def is_authenticated(self) -> bool:
return True

@property
def display_name(self) -> str:
return self.origin

@property
def identity(self) -> str:
return self.id

def is_true(self) -> bool:
"""
Check if the identity matches the origin.
:return: bool
"""
return self.identity == self.origin

def __str__(self) -> str:
return f"{self.origin} -> {self.identity}"


class MultiAuthBackend(AuthenticationBackend):
logger = logging.getLogger(__name__)

def digest_authentication(
self, conn: HTTPConnection, token: str
) -> Union[None, Tuple[AuthCredentials, BaseUser]]:
peer_match_source = False
for peer in os.getenv("PEERS").split(","): # type: ignore
if peer in conn.client.host:
self.logger.debug(f"Peer {peer} is trying to authenticate")
peer_match_source = True
if not peer_match_source and os.getenv("PEER_STRICT", "True") == "True":
raise AuthenticationError(f"access not allowed for {conn.client.host}")
payload = jwt.decode(token, os.getenv("PEER_SECRET"), algorithms=["HS256"]) # type: ignore
self.logger.debug(f"valid token for {payload['peer']}")
return AuthCredentials(["authenticated_peer"]), Peer(
identity=payload["peer"],
token=token,
payload=payload,
origin=conn.client.host,
)

async def authenticate(
self, conn: HTTPConnection
) -> Union[None, Tuple[AuthCredentials, BaseUser]]:
if "Authorization" not in conn.headers:
return None
if "PEERS" not in os.environ:
return None
if "PEER_SECRET" not in os.environ:
return None

auth = conn.headers["Authorization"]
try:
scheme, token = auth.split()
if scheme.lower() != "bearer":
return None
return self.digest_authentication(conn, token)
except (
ValueError,
UnicodeDecodeError,
jwt.InvalidTokenError,
binascii.Error,
) as exc:
self.logger.error(f"Invalid token for peer ({exc})")
raise AuthenticationError(f"access not allowed for {conn.client.host}")
85 changes: 0 additions & 85 deletions horao/auth/peer.py

This file was deleted.

11 changes: 11 additions & 0 deletions horao/conceptual/crdt.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,17 @@ def invoke_listeners(self, state_update: Update) -> None:
for listener in self.listeners:
listener(state_update)

def __eq__(self, other):
if not isinstance(other, LastWriterWinsRegister):
return False
return (
self.name == other.name
and self.clock == other.clock
and self.value == other.value
and self.last_update == other.last_update
and self.last_writer == other.last_writer
)


class LastWriterWinsMap(CRDT):
"""Last Writer Wins Map CRDT."""
Expand Down
Loading

0 comments on commit 9397108

Please sign in to comment.