Skip to content

Commit

Permalink
Use monitored types where necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
Askaholic committed Jan 30, 2022
1 parent b819c3f commit bf42d83
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 25 deletions.
3 changes: 2 additions & 1 deletion server/game_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy import select

from server.config import config
from server.metrics import MonitoredDict

from . import metrics
from .core import Service
Expand Down Expand Up @@ -60,7 +61,7 @@ def __init__(
self.ranked_mods: set[str] = set()

# The set of active games
self._games: dict[int, Game] = dict()
self._games: dict[int, Game] = MonitoredDict("game_service._games")

async def initialize(self) -> None:
await self.initialise_game_counter()
Expand Down
16 changes: 11 additions & 5 deletions server/ladder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import aiocron
from sqlalchemy import and_, func, select, text, true

from server.metrics import MonitoredDefaultDict, MonitoredDict, MonitoredSet

from .config import config
from .core import Service
from .db import FAFDatabase
Expand Down Expand Up @@ -52,11 +54,16 @@ def __init__(
game_service: GameService,
):
self._db = database
self._informed_players: set[Player] = set()
self._informed_players: set[Player] = MonitoredSet(
"ladder_service._informed_players"
)
self.game_service = game_service
self.queues = {}
self.queues = MonitoredDict("ladder_service.queues")

self._searches: dict[Player, dict[str, Search]] = defaultdict(dict)
self._searches = MonitoredDefaultDict(
"ladder_service._searches",
dict
) # type: defaultdict[Player, dict[str, Search]]

async def initialize(self) -> None:
await self.update_data()
Expand Down Expand Up @@ -557,8 +564,7 @@ def on_connection_lost(self, conn: "LobbyConnection") -> None:
player = conn.player
self.cancel_search(player)
del self._searches[player]
if player in self._informed_players:
self._informed_players.remove(player)
self._informed_players.discard(player)

async def shutdown(self):
for queue in self.queues.values():
Expand Down
25 changes: 12 additions & 13 deletions server/matchmaker/matchmaker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Callable, Iterable, Optional

import server.metrics as metrics
from server.metrics import MonitoredSet

from ..asyncio_extensions import SpinLock, synchronized
from ..decorators import with_logger
Expand Down Expand Up @@ -61,7 +62,9 @@ def __init__(
self.params = params or {}
self.map_pools = {info[0].id: info for info in map_pools}

self._queue: dict[Search, None] = OrderedDict()
self._queue: set[Search] = MonitoredSet(
f"MatchmakerQueue({self.name})._queue"
)
self.on_match_found = on_match_found
self._is_running = True

Expand Down Expand Up @@ -93,7 +96,7 @@ def initialize(self):

@property
def num_players(self) -> int:
return sum(len(search.players) for search in self._queue.keys())
return sum(len(search.players) for search in self._queue)

async def queue_pop_timer(self) -> None:
""" Periodically tries to match all Searches in the queue. The amount
Expand Down Expand Up @@ -145,8 +148,7 @@ async def search(self, search: Search) -> None:
# If the queue was cancelled, or some other error occurred,
# make sure to clean up.
self.game_service.mark_dirty(self)
if search in self._queue:
del self._queue[search]
self._queue.discard(search)

@synchronized(SpinLock(sleep_duration=1))
async def find_matches(self) -> None:
Expand All @@ -159,7 +161,7 @@ async def find_matches(self) -> None:
"""
self._logger.info("Searching for matches: %s", self.name)

searches = list(self._queue.keys())
searches = list(self._queue)

if self.num_players < 2 * self.team_size:
self._register_unmatched_searches(searches)
Expand Down Expand Up @@ -215,8 +217,7 @@ def _register_unmatched_searches(

def push(self, search: Search):
""" Push the given search object onto the queue """

self._queue[search] = None
self._queue.add(search)
self.game_service.mark_dirty(self)

def match(self, s1: Search, s2: Search) -> bool:
Expand Down Expand Up @@ -246,10 +247,8 @@ def match(self, s1: Search, s2: Search) -> bool:

s1.match(s2)
s2.match(s1)
if s1 in self._queue:
del self._queue[s1]
if s2 in self._queue:
del self._queue[s2]
self._queue.discard(s1)
self._queue.discard(s2)

return True

Expand All @@ -267,8 +266,8 @@ def to_dict(self):
).isoformat(),
"queue_pop_time_delta": self.timer.next_queue_pop - time.time(),
"num_players": self.num_players,
"boundary_80s": [search.boundary_80 for search in self._queue.keys()],
"boundary_75s": [search.boundary_75 for search in self._queue.keys()],
"boundary_80s": [search.boundary_80 for search in self._queue],
"boundary_75s": [search.boundary_75 for search in self._queue],
# TODO: Remove, the client should query the API for this
"team_size": self.team_size,
}
Expand Down
6 changes: 4 additions & 2 deletions server/message_queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from aio_pika import DeliveryMode, ExchangeType
from aio_pika.exceptions import ProbableAuthenticationError

from server.metrics import MonitoredDict

from .asyncio_extensions import synchronizedmethod
from .config import TRACE, config
from .core import Service
Expand All @@ -29,8 +31,8 @@ class MessageQueueService(Service):
def __init__(self) -> None:
self._connection = None
self._channel = None
self._exchanges = {}
self._exchange_types = {}
self._exchanges = MonitoredDict("message_queue_service._exchanges")
self._exchange_types = MonitoredDict("message_queue_service._exchange_types")
self._is_ready = False

config.register_callback("MQ_USER", self.reconnect)
Expand Down
6 changes: 5 additions & 1 deletion server/party_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Manages interactions between players and parties
"""

from server.metrics import MonitoredDict

from .core import Service
from .decorators import with_logger
from .exceptions import ClientError
Expand All @@ -23,7 +25,9 @@ class PartyService(Service):

def __init__(self, game_service: GameService):
self.game_service = game_service
self.player_parties: dict[Player, PlayerParty] = {}
self.player_parties: dict[Player, PlayerParty] = MonitoredDict(
"party_service.player_parties"
)
self._dirty_parties: set[PlayerParty] = set()

async def initialize(self):
Expand Down
5 changes: 3 additions & 2 deletions server/player_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import server.metrics as metrics
from server.db import FAFDatabase
from server.decorators import with_logger
from server.metrics import MonitoredDict
from server.players import Player
from server.rating import RatingType

Expand All @@ -37,10 +38,10 @@
class PlayerService(Service):
def __init__(self, database: FAFDatabase):
self._db = database
self._players = dict()
self._players: dict[int, Player] = MonitoredDict("player_service._players")

# Static-ish data fields.
self.uniqueid_exempt = {}
self.uniqueid_exempt = frozenset()
self._dirty_players = set()

async def initialize(self) -> None:
Expand Down
5 changes: 4 additions & 1 deletion server/servercontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import humanize

import server.metrics as metrics
from server.metrics import MonitoredDict

from .core import Service
from .decorators import with_logger
Expand All @@ -36,7 +37,9 @@ def __init__(
self._server = None
self._connection_factory = connection_factory
self._services = services
self.connections: dict[LobbyConnection, Protocol] = {}
self.connections: dict[LobbyConnection, Protocol] = MonitoredDict(
f"{repr(self)}.connections"
)
self.protocol_class = protocol_class

def __repr__(self):
Expand Down

0 comments on commit bf42d83

Please sign in to comment.