Skip to content

Commit

Permalink
feat: state listeners & logger overhaul (#10)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: changed the api for logger & initialization for endpoints
  • Loading branch information
Mettwasser authored Sep 1, 2023
1 parent e14fa66 commit f657dad
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 135 deletions.
9 changes: 4 additions & 5 deletions examples/worldstate/custom_logger.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
import logging

from warframe.worldstate import WorldstateClient, WorldstateLogger
from warframe.worldstate import WorldstateClient, utils
from warframe.worldstate.models import Cetus


async def main():
# Note that the default logger is pretty much empty (nothing will be logged)
# so if you want to make use of the logger, make your own:
logger = WorldstateLogger("name whatever you want", logging.DEBUG)
logger.addHandler(logging.StreamHandler())
# so if you want to make use of the logger, you can use this helper function:
utils.setup_logging(handler=logging.StreamHandler(), level=logging.DEBUG, root=True)

async with WorldstateClient(logger=logger) as client: # pass the logger
async with WorldstateClient() as client:
cetus = await client.query(Cetus)

print(cetus)
Expand Down
8 changes: 3 additions & 5 deletions examples/worldstate/listeners.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import asyncio
import logging

from warframe.worldstate import WorldstateClient, WorldstateLogger
from warframe.worldstate import WorldstateClient, utils
from warframe.worldstate.models import Cetus, OrbVallis

# define logger
logger = WorldstateLogger("main_wsclient", logging.DEBUG)
logger.addHandler(logging.StreamHandler())
utils.setup_logging(level=logging.DEBUG)

# define client
client = WorldstateClient(logger=logger)
client = WorldstateClient()


# listed to any type of SingleQueryModel and TimedEvent
Expand Down
2 changes: 1 addition & 1 deletion warframe/worldstate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from . import utils
from .client import *
from .common.logger import *
from .endpoints import *
from .enums import *
from .exceptions import *
from .utils import *
156 changes: 56 additions & 100 deletions warframe/worldstate/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
import asyncio
import logging
from datetime import datetime, timezone
from functools import wraps
from typing import (
Any,
Callable,
ClassVar,
Coroutine,
List,
Optional,
Protocol,
Type,
TypeVar,
)
from typing import Any, Callable, Coroutine, List, Optional, Type, TypeVar, Union

import aiohttp
import msgspec
Expand All @@ -20,11 +11,12 @@
MultiQueryModel,
SingleQueryModel,
TimedEvent,
WorldstateLogger,
WorldstateObject,
_TimedAndSingleQuery,
)
from .endpoints import Language, build_endpoint
from .exceptions import ErrorMessage, SessionNotFound, WorldstateAPIError
from .listeners import TypeListener
from .models import Alert, CambionDrift, Cetus, OrbVallis

__all__ = ["WorldstateClient"]
Expand All @@ -33,60 +25,10 @@
SupportsMultiQuery = TypeVar("SupportsMultiQuery", bound=MultiQueryModel)


def _get_default_logger() -> WorldstateLogger:
logger = WorldstateLogger("main_wsclient_logger")

return logger


class _TaskHelper:
def __init__(self, loop: Callable[..., Coroutine[Any, Any, None]]) -> None:
self._loop_function = loop
self._task: Optional[asyncio.Task] = None

def start(self) -> None:
if self._task is None:
self._task = asyncio.create_task(self._loop_function())

def stop(self) -> None:
if self._task:
self._task.cancel()
self._task = None


T = TypeVar("T", bound=WorldstateObject)


class _SingleQueryTimedEvent(Protocol):
__endpoint__: ClassVar[str]

activation: datetime
"The time the event began"

expiry: datetime
"The time the event ends"

@property
def start_string(self) -> str: # type: ignore
"Short-time-formatted duration string representing the start of the event"
pass

@property
def eta(self) -> str: # type: ignore
"Short-time-formatted duration string representing the end of the event / cycle"
pass

@property
def expired(self) -> bool: # type: ignore
pass

@property
def active(self) -> bool: # type: ignore
pass

@classmethod
def _from_json(cls: Type[T], response: str) -> T: # type: ignore
pass
SingleQueryTimedEvent = TypeVar("SingleQueryTimedEvent", bound=_TimedAndSingleQuery)


class WorldstateClient:
Expand All @@ -101,7 +43,6 @@ def __init__(
*,
session: Optional[aiohttp.ClientSession] = None,
default_language: Language = Language.EN,
logger: Optional[WorldstateLogger] = None,
) -> None:
"""
Parameters
Expand All @@ -117,14 +58,17 @@ def __init__(
self._session_created = False

self._default_lang = default_language

self._logger = logger or _get_default_logger()
self._listeners: List[TypeListener] = []

#
# Request
#

async def _request(self, endpoint: str, language: Optional[Language]) -> str:
async def _request(
self,
type: Type[Union[SingleQueryModel, MultiQueryModel]],
language: Optional[Language],
) -> str:
"""
Sends a request to the given `endpoint` and returns its JSON content as string.
Expand Down Expand Up @@ -157,9 +101,11 @@ async def _request(self, endpoint: str, language: Optional[Language]) -> str:

language = language or self._default_lang

url = build_endpoint(endpoint, language)
url = build_endpoint(type, language)

self._logger.debug(f"Sending request to {url}...")
logging.getLogger(__name__).debug(
f"Sending request to the {type.__name__} endpoint"
)

async with self._session.get(url) as response:
response_text = await response.text()
Expand All @@ -169,7 +115,9 @@ async def _request(self, endpoint: str, language: Optional[Language]) -> str:
msgspec.json.decode(response_text, type=ErrorMessage)
)

self._logger.debug(f"Got request:\n{response_text}")
logging.getLogger(__name__).debug(
f"Got request from the {type.__name__} endpoint"
)

return response_text

Expand Down Expand Up @@ -200,13 +148,7 @@ async def query(
SupportsSingleQuery
The queried model.
"""

if not issubclass(cls, SingleQueryModel):
raise TypeError(
f"{cls.__name__} is required to be of type SingleQueryModel."
)
json = await self._request(cls.__endpoint__, language)

json = await self._request(cls, language)
return cls._from_json(json)

async def query_list_of(
Expand All @@ -232,48 +174,56 @@ async def query_list_of(
Optional[List[SupportsMultiQuery]]
A list of the queried model.
"""

if not issubclass(cls, MultiQueryModel):
raise TypeError(
f"{cls.__name__} is required to be of type MultiQueryModel."
)
json = await self._request(cls.__endpoint__, language)

json = await self._request(cls, language)
return cls._from_json(json)

#
# Type-Specific commands
#

async def get_cetus(self, language: Optional[Language] = None) -> Cetus:
json = await self._request(Cetus.__endpoint__, language)
json = await self._request(Cetus, language)
return Cetus._from_json(json)

async def get_cambion_drift(
self, language: Optional[Language] = None
) -> CambionDrift:
json = await self._request(CambionDrift.__endpoint__, language)
json = await self._request(CambionDrift, language)
return CambionDrift._from_json(json)

async def get_orb_vallis(self, language: Optional[Language] = None) -> OrbVallis:
json = await self._request(OrbVallis.__endpoint__, language)
json = await self._request(OrbVallis, language)
return OrbVallis._from_json(json)

async def get_alerts(
self, language: Optional[Language] = None
) -> Optional[List[Alert]]:
json = await self._request(OrbVallis.__endpoint__, language)
json = await self._request(OrbVallis, language)
return Alert._from_json(json)

#
# Event-hook related
#

def listen_to(self, type: Type[_SingleQueryTimedEvent]):
def register_listener(self, listener: TypeListener):
"""
Registers an event listener.
Parameters
----------
listener : _TaskHelper
The listener you want to register.
"""
logging.getLogger(__name__ + "_listeners").debug(
f"Registered a listener of type {listener._type.__name__}"
)
self._listeners.append(listener)

def listen_to(self, type: Type[SingleQueryTimedEvent]):
"""A decorator that makes a function an event listener. This will trigger on state changes (e.g. on Cetus: Day -> Night / Night -> Day)
Args:
type (Type[IsTimed]): Any type that inherits SingleQueryObject and TimedEvent
type (Type[SingleQueryTimedEvent]): Any type that inherits SingleQueryObject and TimedEvent
Raises:
TypeError: If the type requirements are not met
Expand All @@ -288,24 +238,28 @@ def listen_to(self, type: Type[_SingleQueryTimedEvent]):
f"{type.__name__} has to implement SingleQueryModel and TimedEvent"
)

def decorator(func: Callable[..., Coroutine[Any, Any, None]]) -> _TaskHelper:
def decorator(
func: Callable[[SingleQueryTimedEvent], Coroutine[Any, Any, None]]
) -> TypeListener:
@wraps(func)
async def inner() -> None:
item: _SingleQueryTimedEvent = await self.query(type) # type: ignore
item: SingleQueryTimedEvent = await self.query(type) # type: ignore
while True:
try:
# check if the event is over, if so, retry in 1 minute (API doesn't refresh at the exact point of expiry)
if item.expiry <= datetime.now(tz=timezone.utc):
self._logger.listener_debug(
"Retry started. Looking for state change from the API",
type,
logging.getLogger(__name__ + "_listeners").debug(
f"{type.__name__} :: Looking for state change from the API"
)

await asyncio.sleep(60)

new_item: _SingleQueryTimedEvent = await self.query(type) # type: ignore
new_item: SingleQueryTimedEvent = await self.query(type) # type: ignore

if not item.expiry < new_item.expiry:
continue

if item.expiry < new_item.expiry:
else:
# we now know that it is a different event, so we can call the callback function
# with the new item after the last one's expiry
await func(new_item)
Expand All @@ -319,8 +273,8 @@ async def inner() -> None:
).total_seconds()

if seconds_to_wait > 0:
self._logger.listener_debug(
f"Sleeping {seconds_to_wait} seconds", type
logging.getLogger(__name__ + "_listeners").debug(
f"{type.__name__} :: Sleeping {seconds_to_wait} seconds"
)
await asyncio.sleep(
seconds_to_wait
Expand All @@ -330,7 +284,9 @@ async def inner() -> None:
except asyncio.CancelledError:
break

return _TaskHelper(inner)
t_helper = TypeListener(inner, type)
self.register_listener(t_helper)
return t_helper

return decorator

Expand Down
1 change: 0 additions & 1 deletion warframe/worldstate/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .core import *
from .logger import *
from .types_ import *
Loading

0 comments on commit f657dad

Please sign in to comment.