Skip to content

Commit

Permalink
Accept matchmaking requests from rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
Askaholic committed Aug 6, 2021
1 parent 0dbdc1a commit 4b597cc
Show file tree
Hide file tree
Showing 7 changed files with 463 additions and 29 deletions.
153 changes: 152 additions & 1 deletion server/ladder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import defaultdict
from typing import Dict, List, Optional, Set, Tuple

import aio_pika
import aiocron
from sqlalchemy import and_, func, select, text, true

Expand All @@ -30,9 +31,12 @@
matchmaker_queue_map_pool
)
from .decorators import with_logger
from .factions import Faction
from .game_service import GameService
from .games import Game, InitMode, LadderGame
from .matchmaker import MapPool, MatchmakerQueue, OnMatchedCallback, Search
from .message_queue_service import MessageQueueService
from .player_service import PlayerService
from .players import Player, PlayerState
from .protocol import DisconnectedError
from .types import GameLaunchOptions, Map, NeroxisGeneratedMap
Expand All @@ -55,17 +59,35 @@ def __init__(
self,
database: FAFDatabase,
game_service: GameService,
player_service: PlayerService,
message_queue_service: MessageQueueService
):
self._db = database
self._informed_players: Set[Player] = set()
self.game_service = game_service
self.player_service = player_service
self.message_queue_service = message_queue_service
self.queues = {}
self._initialized = False

self._searches: Dict[Player, Dict[str, Search]] = defaultdict(dict)

async def initialize(self) -> None:
if self._initialized:
return

await self.update_data()
await self.message_queue_service.declare_exchange(
config.MQ_EXCHANGE_NAME
)
await self.message_queue_service.consume(
config.MQ_EXCHANGE_NAME,
"request.match.create",
self.handle_mq_matchmaking_request
)

self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
self._initialized = True

async def update_data(self) -> None:
async with self._db.acquire() as conn:
Expand Down Expand Up @@ -325,6 +347,135 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None:
)
})

async def handle_mq_matchmaking_request(
self,
message: aio_pika.IncomingMessage
):
try:
game = await self._handle_mq_matchmaking_request(message)
except Exception as e:
if isinstance(e, GameLaunchError):
code = "launch_failed"
args = [{"player_id": player.id} for player in e.players]
elif isinstance(e, json.JSONDecodeError):
code = "malformed_request"
args = [{"message": str(e)}]
elif isinstance(e, KeyError):
code = "malformed_request"
args = [{"message": f"missing {e.args[0]}"}]
else:
code, *args = e.args

await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"error.match.create",
{"error_code": code, "args": args},
correlation_id=message.correlation_id
)
else:
await self.message_queue_service.publish(
config.MQ_EXCHANGE_NAME,
"success.match.create",
{"game_id": game.id},
correlation_id=message.correlation_id
)

async def _handle_mq_matchmaking_request(
self,
message: aio_pika.IncomingMessage
):
self._logger.debug(
"Got matchmaking request: %s", message.correlation_id
)
request = json.loads(message.body)
# TODO: Use id instead of name?
queue_name = request.get("matchmaker_queue")
map_name = request["map_name"]
game_name = request["game_name"]
participants = request["participants"]
featured_mod = request.get("featured_mod")
if not featured_mod and not queue_name:
raise KeyError("featured_mod")

if queue_name and queue_name not in self.queues:
raise Exception("invalid_request", "invalid queue")

if not participants:
raise Exception("invalid_request", "empty participants")

player_ids = [participant["player_id"] for participant in participants]
missing_players = [
id for id in player_ids if self.player_service[id] is None
]
if missing_players:
raise Exception(
"players_not_found",
*[{"player_id": id} for id in missing_players]
)

all_players = [
self.player_service[player_id] for player_id in player_ids
]
non_idle_players = [
player for player in all_players
if player.state != PlayerState.IDLE
]
if non_idle_players:
raise Exception(
"invalid_state",
[
{"player_id": player.id, "state": player.state.name}
for player in all_players
]
)

queue = self.queues[queue_name] if queue_name else None
featured_mod = featured_mod or queue.featured_mod
host = all_players[0]
guests = all_players[1:]

for player in all_players:
player.state = PlayerState.STARTING_AUTOMATCH

try:
game = self.game_service.create_game(
game_class=LadderGame,
game_mode=featured_mod,
host=host,
name="Matchmaker Game",
mapname=map_name,
matchmaker_queue_id=queue.id if queue else None,
rating_type=queue.rating_type if queue else None,
max_players=len(participants)
)
game.init_mode = InitMode.AUTO_LOBBY
game.set_name_unchecked(game_name)

for participant in participants:
player_id = participant["player_id"]
faction = Faction.from_value(participant["faction"])
team = participant["team"]
slot = participant["slot"]

game.set_player_option(player_id, "Faction", faction.value)
game.set_player_option(player_id, "Team", team)
game.set_player_option(player_id, "StartSpot", slot)
game.set_player_option(player_id, "Army", slot)
game.set_player_option(player_id, "Color", slot)

await self.launch_game(game, host, guests)

return game
except Exception:
self._logger.exception("")
await game.on_game_end()

for player in all_players:
if player.state == PlayerState.STARTING_AUTOMATCH:
player.state = PlayerState.IDLE

raise

def on_match_found(
self,
s1: Search,
Expand Down Expand Up @@ -465,7 +616,7 @@ async def launch_game(
def game_options(player: Player) -> GameLaunchOptions:
return options._replace(
team=game.get_player_option(player.id, "Team"),
faction=player.faction,
faction=game.get_player_option(player.id, "Faction"),
map_position=game.get_player_option(player.id, "StartSpot")
)

Expand Down
52 changes: 43 additions & 9 deletions server/message_queue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncio
import json
from typing import Dict
from typing import Callable, Dict, Optional

import aio_pika
from aio_pika import DeliveryMode, ExchangeType
Expand Down Expand Up @@ -125,34 +125,68 @@ async def _shutdown(self) -> None:
async def publish(
self,
exchange_name: str,
routing: str,
routing_key: str,
payload: Dict,
mandatory: bool = False,
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
correlation_id: Optional[str] = None
) -> None:
if not self._is_ready:
self._logger.warning(
"Not connected to RabbitMQ, unable to publish message."
)
return

exchange = self._exchanges.get(exchange_name)
if exchange is None:
raise KeyError(f"Unknown exchange {exchange_name}.")
exchange = self._get_exchange(exchange_name)

message = aio_pika.Message(
json.dumps(payload).encode(), delivery_mode=delivery_mode
json.dumps(payload).encode(),
delivery_mode=delivery_mode,
correlation_id=correlation_id,
)

async with self._channel.transaction():
await exchange.publish(
message,
routing_key=routing,
mandatory=mandatory
routing_key=routing_key,
mandatory=mandatory,
)
self._logger.log(
TRACE, "Published message %s to %s/%s", payload, exchange_name, routing
TRACE, "Published message %s to %s/%s",
payload,
exchange_name,
routing_key
)

async def consume(
self,
exchange_name: str,
routing_key: str,
process_message: Callable[[aio_pika.IncomingMessage], None]
) -> None:
await self.initialize()
if not self._is_ready:
self._logger.warning(
"Not connected to RabbitMQ, unable to declare queue."
)
return

exchange = self._get_exchange(exchange_name)
queue = await self._channel.declare_queue(
None,
auto_delete=True,
durable=False
)

await queue.bind(exchange, routing_key)
await queue.consume(process_message, exclusive=True)

def _get_exchange(self, exchange_name: str) -> aio_pika.Exchange:
exchange = self._exchanges.get(exchange_name)
if exchange is None:
raise KeyError(f"Unknown exchange {exchange_name}.")

return exchange

@synchronizedmethod("initialization_lock")
async def reconnect(self) -> None:
Expand Down
32 changes: 24 additions & 8 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,20 @@ def mock_games():


@pytest.fixture
async def ladder_service(mocker, database, game_service):
async def ladder_service(
mocker,
database,
game_service,
player_service,
message_queue_service
):
mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1)
ladder_service = LadderService(database, game_service)
ladder_service = LadderService(
database,
game_service,
player_service,
message_queue_service
)
await ladder_service.initialize()
yield ladder_service
await ladder_service.shutdown()
Expand Down Expand Up @@ -388,17 +399,22 @@ async def channel():
await connection.close()


async def connect_mq_consumer(server, channel, routing_key):
"""
Returns a subclass of Protocol that yields messages read from a rabbitmq
exchange.
"""
async def connect_mq_queue(channel, routing_key):
exchange = await channel.declare_exchange(
config.MQ_EXCHANGE_NAME,
aio_pika.ExchangeType.TOPIC
)
queue = await channel.declare_queue("", exclusive=True)
queue = await channel.declare_queue(None, exclusive=True)
await queue.bind(exchange, routing_key=routing_key)
return queue


async def connect_mq_consumer(channel, routing_key):
"""
Returns a subclass of Protocol that yields messages read from a rabbitmq
exchange.
"""
queue = await connect_mq_queue(channel, routing_key)
proto = AioQueueProtocol(queue)
await proto.consume()

Expand Down
Loading

0 comments on commit 4b597cc

Please sign in to comment.