Skip to content

Commit

Permalink
Issue/#771 send broadcast info to rabbitmq (#779)
Browse files Browse the repository at this point in the history
* Add timer that calls a function to get the next interval

Can be used to make intervals changable without restart

* Refactor message broadcasting to a service

* Add broadcasted messages to existing rabbitmq exchange

* Remove unused import

* Publish messages as non-mandatory

This aviods the "unhandled message" error

* Get dirties and clear them at the same time

* Use default connection filter

* Try to fix flaky test

* Add docstring for report_dirties
  • Loading branch information
Askaholic authored May 3, 2021
1 parent 5dba1f9 commit 38cc8fe
Show file tree
Hide file tree
Showing 18 changed files with 361 additions and 96 deletions.
55 changes: 4 additions & 51 deletions server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@

from .api.api_accessor import ApiAccessor
from .asyncio_extensions import synchronizedmethod
from .broadcast_service import BroadcastService
from .config import TRACE, config
from .configuration_service import ConfigurationService
from .control import run_control_server
from .core import Service, create_services
from .db import FAFDatabase
from .game_service import GameService
from .gameconnection import GameConnection
from .games import GameState
from .geoip_service import GeoIpService
from .ice_servers.nts import TwilioNTS
from .ladder_service import LadderService
Expand All @@ -114,14 +114,14 @@
from .rating_service.rating_service import RatingService
from .servercontext import ServerContext
from .stats.game_stats_service import GameStatsService
from .timing import at_interval

__author__ = "Askaholic, Chris Kitching, Dragonfire, Gael Honorez, Jeroen De Dauw, Crotalus, Michael Søndergaard, Michel Jung"
__contact__ = "[email protected]"
__license__ = "GPLv3"
__copyright__ = "Copyright (c) 2011-2015 " + __author__

__all__ = (
"BroadcastService",
"ConfigurationService",
"GameConnection",
"GameService",
Expand All @@ -130,6 +130,7 @@
"LadderService",
"MessageQueueService",
"PartyService",
"PlayerService",
"RatingService",
"RatingService",
"ServerInstance",
Expand All @@ -139,7 +140,6 @@
"run_control_server",
)

DIRTY_REPORT_INTERVAL = 1 # Seconds
logger = logging.getLogger("server")

if config.ENABLE_METRICS:
Expand Down Expand Up @@ -176,6 +176,7 @@ def __init__(
self.contexts: Set[ServerContext] = set()

self.services = _override_services or create_services({
"server": self,
"database": self.database,
"api_accessor": self.api_accessor,
"loop": self.loop,
Expand Down Expand Up @@ -220,54 +221,6 @@ async def _start_services(self) -> None:
service.initialize() for service in self.services.values()
])

game_service: GameService = self.services["game_service"]
player_service: PlayerService = self.services["player_service"]

@at_interval(DIRTY_REPORT_INTERVAL, loop=self.loop)
def do_report_dirties():
game_service.update_active_game_metrics()
dirty_games = game_service.dirty_games
dirty_queues = game_service.dirty_queues
dirty_players = player_service.dirty_players
game_service.clear_dirty()
player_service.clear_dirty()

if dirty_queues:
self.write_broadcast({
"command": "matchmaker_info",
"queues": [queue.to_dict() for queue in dirty_queues]
})

if dirty_players:
self.write_broadcast(
{
"command": "player_info",
"players": [player.to_dict() for player in dirty_players]
},
lambda lobby_conn: lobby_conn.authenticated
)

# TODO: This spams squillions of messages: we should implement per-
# connection message aggregation at the next abstraction layer down :P
for game in dirty_games:
if game.state == GameState.ENDED:
game_service.remove_game(game)

# So we're going to be broadcasting this to _somebody_...
message = game.to_dict()

self.write_broadcast(
message,
lambda conn: (
conn.authenticated
and game.is_visible_to_player(conn.player)
)
)

@at_interval(45, loop=self.loop)
def ping_broadcast():
self.write_broadcast({"command": "ping"})

self.started = True

async def listen(
Expand Down
123 changes: 123 additions & 0 deletions server/broadcast_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from aio_pika import DeliveryMode

from .config import config
from .core import Service
from .decorators import with_logger
from .game_service import GameService
from .games import GameState
from .message_queue_service import MessageQueueService
from .player_service import PlayerService
from .timing import LazyIntervalTimer


@with_logger
class BroadcastService(Service):
"""
Broadcast updates about changed entities.
"""

def __init__(
self,
server: "ServerInstance",
message_queue_service: MessageQueueService,
game_service: GameService,
player_service: PlayerService,
):
self.server = server
self.message_queue_service = message_queue_service
self.game_service = game_service
self.player_service = player_service

async def initialize(self):
await self.message_queue_service.declare_exchange(
config.MQ_EXCHANGE_NAME
)

# Using a lazy interval timer so that the intervals can be changed
# without restarting the server.
self._broadcast_dirties_timer = LazyIntervalTimer(
lambda: config.DIRTY_REPORT_INTERVAL,
self.report_dirties,
start=True
)
self._broadcast_ping_timer = LazyIntervalTimer(
lambda: config.PING_INTERVAL,
self.broadcast_ping,
start=True
)
self._logger.debug("Broadcast service initialized")

async def report_dirties(self):
"""
Send updates about any dirty (changed) entities to connected players.
This function is called at a fixed interval, which guarantees that any
given object will not be written out to the clients more than once per
interval.
"""
self.game_service.update_active_game_metrics()
dirty_games = self.game_service.pop_dirty_games()
dirty_queues = self.game_service.pop_dirty_queues()
dirty_players = self.player_service.pop_dirty_players()

if dirty_queues:
matchmaker_info = {
"command": "matchmaker_info",
"queues": [queue.to_dict() for queue in dirty_queues]
}
self.server.write_broadcast(matchmaker_info)

if dirty_players:
player_info = {
"command": "player_info",
"players": [player.to_dict() for player in dirty_players]
}
self.server.write_broadcast(player_info)

game_info = {
"command": "game_info",
"games": []
}
# TODO: This spams squillions of messages: we should implement per-
# connection message aggregation at the next abstraction layer down :P
for game in dirty_games:
if game.state == GameState.ENDED:
self.game_service.remove_game(game)

# So we're going to be broadcasting this to _somebody_...
message = game.to_dict()
game_info["games"].append(message)

self.server.write_broadcast(
message,
lambda conn: (
conn.authenticated
and game.is_visible_to_player(conn.player)
)
)

if dirty_queues:
await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"broadcast.matchmakerInfo.update",
matchmaker_info,
delivery_mode=DeliveryMode.NOT_PERSISTENT
)

if dirty_players:
await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"broadcast.playerInfo.update",
player_info,
delivery_mode=DeliveryMode.NOT_PERSISTENT
)

if dirty_games:
await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"broadcast.gameInfo.update",
game_info,
delivery_mode=DeliveryMode.NOT_PERSISTENT
)

def broadcast_ping(self):
self.server.write_broadcast({"command": "ping"})
3 changes: 3 additions & 0 deletions server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def __init__(self):
self.PROFILING_DURATION = 2
self.PROFILING_INTERVAL = -1

self.DIRTY_REPORT_INTERVAL = 1
self.PING_INTERVAL = 45

self.CONTROL_SERVER_PORT = 4000
self.METRICS_PORT = 8011
self.ENABLE_METRICS = False
Expand Down
21 changes: 10 additions & 11 deletions server/game_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from collections import Counter
from typing import Dict, List, Optional, Type, Union, ValuesView
from typing import Dict, List, Optional, Set, Type, Union, ValuesView

import aiocron

Expand All @@ -16,7 +16,6 @@
from .games import (
CustomGame,
FeaturedMod,
FeaturedModType,
Game,
GameState,
ValidityState,
Expand Down Expand Up @@ -105,24 +104,24 @@ async def update_data(self):
# Turn resultset into a list of uids
self.ranked_mods = set(map(lambda x: x[0], rows))

@property
def dirty_games(self):
return self._dirty_games

@property
def dirty_queues(self):
return self._dirty_queues

def mark_dirty(self, obj: Union[Game, MatchmakerQueue]):
if isinstance(obj, Game):
self._dirty_games.add(obj)
elif isinstance(obj, MatchmakerQueue):
self._dirty_queues.add(obj)

def clear_dirty(self):
def pop_dirty_games(self) -> Set[Game]:
dirty_games = self._dirty_games
self._dirty_games = set()

return dirty_games

def pop_dirty_queues(self) -> Set[MatchmakerQueue]:
dirty_queues = self._dirty_queues
self._dirty_queues = set()

return dirty_queues

def create_uid(self) -> int:
self.game_id_counter += 1

Expand Down
5 changes: 4 additions & 1 deletion server/lobbyconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,10 @@ async def command_hello(self, message):

old_player = self.player_service.get_player(self.player.id)
if old_player:
self._logger.debug("player {} already signed in: {}".format(self.player.id, old_player))
self._logger.debug(
"player %s already signed in: %s",
self.player.id, old_player
)
if old_player.lobby_connection is not None:
with contextlib.suppress(DisconnectedError):
old_player.lobby_connection.write_warning(
Expand Down
7 changes: 6 additions & 1 deletion server/message_queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async def publish(
exchange_name: str,
routing: str,
payload: Dict,
mandatory: bool = False,
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
) -> None:
if not self._is_ready:
Expand All @@ -142,7 +143,11 @@ async def publish(
)

async with self._channel.transaction():
await exchange.publish(message, routing_key=routing)
await exchange.publish(
message,
routing_key=routing,
mandatory=mandatory
)
self._logger.log(
TRACE, "Published message %s to %s/%s", payload, exchange_name, routing
)
Expand Down
9 changes: 4 additions & 5 deletions server/player_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ def __setitem__(self, player_id: int, player: Player):
def all_players(self) -> ValuesView[Player]:
return self._players.values()

@property
def dirty_players(self) -> Set[Player]:
return self._dirty_players

def mark_dirty(self, player: Player):
self._dirty_players.add(player)

def clear_dirty(self):
def pop_dirty_players(self) -> Set[Player]:
dirty_players = self._dirty_players
self._dirty_players = set()

return dirty_players

async def fetch_player_data(self, player):
async with self._db.acquire() as conn:
result = await conn.execute(
Expand Down
4 changes: 2 additions & 2 deletions server/timing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
Helpers for executing async functions on a timer
"""

from .timer import Timer, at_interval
from .timer import LazyIntervalTimer, Timer, at_interval

__all__ = ("Timer", "at_interval")
__all__ = ("LazyIntervalTimer", "Timer", "at_interval")
26 changes: 25 additions & 1 deletion server/timing/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,35 @@ def __call__(self, func):
return self

def __str__(self):
return f"{self.interval} {self.func}"
return f"{self.get_delay()} {self.func}"

def __repr__(self):
return f"<Timer {str(self)}>"


class LazyIntervalTimer(Timer):
"""A timer that calls a function to get the next interval"""

def __init__(
self,
interval_func,
func=None,
args=(),
start=False,
loop=None
):
super().__init__(
interval=None,
func=func,
args=args,
start=start,
loop=loop
)
self.interval_func = interval_func

def get_delay(self):
return self.interval_func()


def at_interval(interval, func=None, args=(), start=True, loop=None):
return Timer(interval, func=func, args=args, start=start, loop=loop)
Loading

0 comments on commit 38cc8fe

Please sign in to comment.