Skip to content

Commit

Permalink
Accept matchmaking requests from rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
Askaholic committed Dec 25, 2023
1 parent 6d97003 commit ff2e28c
Show file tree
Hide file tree
Showing 8 changed files with 754 additions and 37 deletions.
184 changes: 182 additions & 2 deletions server/ladder_service/ladder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import defaultdict
from typing import Awaitable, Callable, Optional

import aio_pika
import aiocron
import humanize
from sqlalchemy import and_, func, select, text, true
Expand All @@ -34,6 +35,7 @@
)
from server.decorators import with_logger
from server.exceptions import DisabledError
from server.factions import Faction
from server.game_service import GameService
from server.games import InitMode, LadderGame
from server.games.ladder_game import GameClosedError
Expand All @@ -45,7 +47,9 @@
OnMatchedCallback,
Search
)
from server.message_queue_service import MessageQueueService
from server.metrics import MatchLaunch
from server.player_service import PlayerService
from server.players import Player, PlayerState
from server.types import GameLaunchOptions, Map, NeroxisGeneratedMap

Expand All @@ -61,20 +65,38 @@ def __init__(
self,
database: FAFDatabase,
game_service: GameService,
message_queue_service: MessageQueueService,
player_service: PlayerService,
violation_service: ViolationService,
):
self._db = database
self._informed_players: set[Player] = set()
self.game_service = game_service
self.queues = {}
self.message_queue_service = message_queue_service
self.player_service = player_service
self.violation_service = violation_service
self.queues = {}

self._initialized = False
self._informed_players: set[Player] = set()
self._searches: dict[Player, dict[str, Search]] = defaultdict(dict)
self._allow_new_searches = True

async def initialize(self) -> None:
if self._initialized:
return

await self.update_data()
await self.message_queue_service.declare_exchange(
config.MQ_EXCHANGE_NAME
)
await self.message_queue_service.consume(
config.MQ_EXCHANGE_NAME,
"request.match.create",
self.handle_mq_matchmaking_request
)

self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
self._initialized = True

async def update_data(self) -> None:
async with self._db.acquire() as conn:
Expand Down Expand Up @@ -432,6 +454,158 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None:
)
})

async def handle_mq_matchmaking_request(
self,
message: aio_pika.IncomingMessage
):
try:
game = await self._handle_mq_matchmaking_request(message)
except Exception as e:
if isinstance(e, NotConnectedError):
code = "launch_failed"
args = [{"player_id": player.id} for player in e.players]
elif isinstance(e, json.JSONDecodeError):
code = "invalid_request"
args = [{"message": str(e)}]
elif isinstance(e, KeyError):
code = "invalid_request"
args = [{"message": f"missing '{e.args[0]}'"}]
elif isinstance(e, InvalidRequestError):
code = e.code
args = e.args
else:
code = "unknown"
args = e.args

await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"error.match.create",
{"error_code": code, "args": args},
correlation_id=message.correlation_id
)
else:
await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"success.match.create",
{"game_id": game.id},
correlation_id=message.correlation_id
)

async def _handle_mq_matchmaking_request(
self,
message: aio_pika.IncomingMessage
):
self._logger.debug(
"Got matchmaking request: %s", message.correlation_id
)
request = json.loads(message.body)
# TODO: Use id instead of name?
queue_name = request.get("matchmaker_queue")
map_name = request["map_name"]
game_name = request["game_name"]
participants = request["participants"]
featured_mod = request.get("featured_mod")
game_options = request.get("game_options")

if not featured_mod and not queue_name:
raise KeyError("featured_mod")

if queue_name and queue_name not in self.queues:
raise InvalidRequestError(
"invalid_request",
{"message": f"invalid queue '{queue_name}'"},
)

if not participants:
raise InvalidRequestError(
"invalid_request",
{"message": "empty participants"},
)

player_ids = [participant["player_id"] for participant in participants]
missing_players = [
id for id in player_ids if self.player_service[id] is None
]
if missing_players:
raise InvalidRequestError(
"players_not_found",
*[{"player_id": id} for id in missing_players]
)

all_players = [
self.player_service[player_id] for player_id in player_ids
]
non_idle_players = [
player for player in all_players
if player.state != PlayerState.IDLE
]
if non_idle_players:
raise InvalidRequestError(
"invalid_state",
*[
{"player_id": player.id, "state": player.state.name}
for player in all_players
]
)

queue = self.queues[queue_name] if queue_name else None
featured_mod = featured_mod or queue.featured_mod
host = all_players[0]
guests = all_players[1:]

for player in all_players:
player.state = PlayerState.STARTING_AUTOMATCH

game = None
try:
game = self.game_service.create_game(
game_class=LadderGame,
game_mode=featured_mod,
host=host,
name="Matchmaker Game",
mapname=map_name,
matchmaker_queue_id=queue.id if queue else None,
rating_type=queue.rating_type if queue else None,
max_players=len(participants)
)
game.init_mode = InitMode.AUTO_LOBBY
game.set_name_unchecked(game_name)

for participant in participants:
player_id = participant["player_id"]
faction = Faction.from_value(participant["faction"])
team = participant["team"]
slot = participant["slot"]

game.set_player_option(player_id, "Faction", faction.value)
game.set_player_option(player_id, "Team", team)
game.set_player_option(player_id, "StartSpot", slot)
game.set_player_option(player_id, "Army", slot)
game.set_player_option(player_id, "Color", slot)

def make_game_options(player: Player) -> GameLaunchOptions:
return GameLaunchOptions(
mapname=game.map_name,
expected_players=len(all_players),
game_options=game_options,
team=game.get_player_option(player.id, "Team"),
faction=game.get_player_option(player.id, "Faction"),
map_position=game.get_player_option(player.id, "StartSpot")
)

await self.launch_match(game, host, guests, make_game_options)

return game
except Exception:
if game:
await game.on_game_finish()

for player in all_players:
if player.state == PlayerState.STARTING_AUTOMATCH:
player.state = PlayerState.IDLE

raise

def on_match_found(
self,
s1: Search,
Expand Down Expand Up @@ -729,3 +903,9 @@ async def graceful_shutdown(self):
class NotConnectedError(asyncio.TimeoutError):
def __init__(self, players: list[Player]):
self.players = players


class InvalidRequestError(Exception):
def __init__(self, code: str, *args):
super().__init__(*args)
self.code = code
54 changes: 43 additions & 11 deletions server/message_queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncio
import json
from typing import Iterable
from typing import Callable, Iterable, Optional

import aio_pika
from aio_pika import DeliveryMode, ExchangeType
Expand Down Expand Up @@ -125,56 +125,88 @@ async def _shutdown(self) -> None:
async def publish(
self,
exchange_name: str,
routing: str,
routing_key: str,
payload: dict,
mandatory: bool = False,
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
correlation_id: Optional[str] = None,
) -> None:
await self.publish_many(
exchange_name,
routing,
routing_key,
[payload],
mandatory=mandatory,
delivery_mode=delivery_mode
delivery_mode=delivery_mode,
correlation_id=correlation_id,
)

async def publish_many(
self,
exchange_name: str,
routing: str,
routing_key: str,
payloads: Iterable[dict],
mandatory: bool = False,
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
correlation_id: Optional[str] = None,
) -> None:
if not self._is_ready:
self._logger.warning(
"Not connected to RabbitMQ, unable to publish message."
)
return

exchange = self._exchanges.get(exchange_name)
if exchange is None:
raise KeyError(f"Unknown exchange {exchange_name}.")
exchange = self._get_exchange(exchange_name)

async with self._channel.transaction():
for payload in payloads:
message = aio_pika.Message(
json.dumps(payload).encode(),
delivery_mode=delivery_mode
delivery_mode=delivery_mode,
correlation_id=correlation_id,
)
await exchange.publish(
message,
routing_key=routing,
routing_key=routing_key,
mandatory=mandatory
)
self._logger.log(
TRACE,
"Published message %s to %s/%s",
payload,
exchange_name,
routing
routing_key
)

async def consume(
self,
exchange_name: str,
routing_key: str,
process_message: Callable[[aio_pika.IncomingMessage], None],
) -> None:
await self.initialize()
if not self._is_ready:
self._logger.warning(
"Not connected to RabbitMQ, unable to declare queue."
)
return

exchange = self._get_exchange(exchange_name)
queue = await self._channel.declare_queue(
None,
auto_delete=True,
durable=False
)

await queue.bind(exchange, routing_key)
await queue.consume(process_message, exclusive=True)

def _get_exchange(self, exchange_name: str) -> aio_pika.Exchange:
exchange = self._exchanges.get(exchange_name)
if exchange is None:
raise KeyError(f"Unknown exchange {exchange_name}.")

return exchange

@synchronizedmethod("initialization_lock")
async def reconnect(self) -> None:
self._is_ready = False
Expand Down
34 changes: 26 additions & 8 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,22 @@ def mock_games():


@pytest.fixture
async def ladder_service(mocker, database, game_service, violation_service):
async def ladder_service(
mocker,
database,
game_service,
message_queue_service,
player_service,
violation_service,
):
mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1)
ladder_service = LadderService(database, game_service, violation_service)
ladder_service = LadderService(
database,
game_service,
message_queue_service,
player_service,
violation_service,
)
await ladder_service.initialize()
yield ladder_service
await ladder_service.shutdown()
Expand Down Expand Up @@ -515,18 +528,23 @@ async def channel():
yield channel


async def connect_mq_consumer(server, channel, routing_key):
"""
Returns a subclass of Protocol that yields messages read from a rabbitmq
exchange.
"""
async def connect_mq_queue(channel, routing_key):
exchange = await channel.declare_exchange(
config.MQ_EXCHANGE_NAME,
aio_pika.ExchangeType.TOPIC,
durable=True
)
queue = await channel.declare_queue("", exclusive=True)
queue = await channel.declare_queue(None, exclusive=True)
await queue.bind(exchange, routing_key=routing_key)
return queue


async def connect_mq_consumer(channel, routing_key):
"""
Returns a subclass of Protocol that yields messages read from a rabbitmq
exchange.
"""
queue = await connect_mq_queue(channel, routing_key)
proto = AioQueueProtocol(queue)
await proto.consume()

Expand Down
Loading

0 comments on commit ff2e28c

Please sign in to comment.