From ff2e28ca022ae1b4e719539b2dc42eab5a9d475d Mon Sep 17 00:00:00 2001 From: Askaholic Date: Fri, 7 May 2021 15:27:41 -0800 Subject: [PATCH] Accept matchmaking requests from rabbitmq --- server/ladder_service/ladder_service.py | 184 ++++++- server/message_queue_service.py | 54 +- tests/integration_tests/conftest.py | 34 +- tests/integration_tests/test_game.py | 5 - .../test_matchmaker_requests.py | 470 ++++++++++++++++++ tests/integration_tests/test_server.py | 3 - tests/unit_tests/conftest.py | 25 +- tests/unit_tests/test_ladder_service.py | 16 +- 8 files changed, 754 insertions(+), 37 deletions(-) create mode 100644 tests/integration_tests/test_matchmaker_requests.py diff --git a/server/ladder_service/ladder_service.py b/server/ladder_service/ladder_service.py index cb47c4656..e9564808f 100644 --- a/server/ladder_service/ladder_service.py +++ b/server/ladder_service/ladder_service.py @@ -8,6 +8,7 @@ from collections import defaultdict from typing import Awaitable, Callable, Optional +import aio_pika import aiocron import humanize from sqlalchemy import and_, func, select, text, true @@ -34,6 +35,7 @@ ) from server.decorators import with_logger from server.exceptions import DisabledError +from server.factions import Faction from server.game_service import GameService from server.games import InitMode, LadderGame from server.games.ladder_game import GameClosedError @@ -45,7 +47,9 @@ OnMatchedCallback, Search ) +from server.message_queue_service import MessageQueueService from server.metrics import MatchLaunch +from server.player_service import PlayerService from server.players import Player, PlayerState from server.types import GameLaunchOptions, Map, NeroxisGeneratedMap @@ -61,20 +65,38 @@ def __init__( self, database: FAFDatabase, game_service: GameService, + message_queue_service: MessageQueueService, + player_service: PlayerService, violation_service: ViolationService, ): self._db = database - self._informed_players: set[Player] = set() self.game_service = game_service - self.queues = {} + self.message_queue_service = message_queue_service + self.player_service = player_service self.violation_service = violation_service + self.queues = {} + self._initialized = False + self._informed_players: set[Player] = set() self._searches: dict[Player, dict[str, Search]] = defaultdict(dict) self._allow_new_searches = True async def initialize(self) -> None: + if self._initialized: + return + await self.update_data() + await self.message_queue_service.declare_exchange( + config.MQ_EXCHANGE_NAME + ) + await self.message_queue_service.consume( + config.MQ_EXCHANGE_NAME, + "request.match.create", + self.handle_mq_matchmaking_request + ) + self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data) + self._initialized = True async def update_data(self) -> None: async with self._db.acquire() as conn: @@ -432,6 +454,158 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None: ) }) + async def handle_mq_matchmaking_request( + self, + message: aio_pika.IncomingMessage + ): + try: + game = await self._handle_mq_matchmaking_request(message) + except Exception as e: + if isinstance(e, NotConnectedError): + code = "launch_failed" + args = [{"player_id": player.id} for player in e.players] + elif isinstance(e, json.JSONDecodeError): + code = "invalid_request" + args = [{"message": str(e)}] + elif isinstance(e, KeyError): + code = "invalid_request" + args = [{"message": f"missing '{e.args[0]}'"}] + elif isinstance(e, InvalidRequestError): + code = e.code + args = e.args + else: + code = "unknown" + args = e.args + + await self.message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "error.match.create", + {"error_code": code, "args": args}, + correlation_id=message.correlation_id + ) + else: + await self.message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "success.match.create", + {"game_id": game.id}, + correlation_id=message.correlation_id + ) + + async def _handle_mq_matchmaking_request( + self, + message: aio_pika.IncomingMessage + ): + self._logger.debug( + "Got matchmaking request: %s", message.correlation_id + ) + request = json.loads(message.body) + # TODO: Use id instead of name? + queue_name = request.get("matchmaker_queue") + map_name = request["map_name"] + game_name = request["game_name"] + participants = request["participants"] + featured_mod = request.get("featured_mod") + game_options = request.get("game_options") + + if not featured_mod and not queue_name: + raise KeyError("featured_mod") + + if queue_name and queue_name not in self.queues: + raise InvalidRequestError( + "invalid_request", + {"message": f"invalid queue '{queue_name}'"}, + ) + + if not participants: + raise InvalidRequestError( + "invalid_request", + {"message": "empty participants"}, + ) + + player_ids = [participant["player_id"] for participant in participants] + missing_players = [ + id for id in player_ids if self.player_service[id] is None + ] + if missing_players: + raise InvalidRequestError( + "players_not_found", + *[{"player_id": id} for id in missing_players] + ) + + all_players = [ + self.player_service[player_id] for player_id in player_ids + ] + non_idle_players = [ + player for player in all_players + if player.state != PlayerState.IDLE + ] + if non_idle_players: + raise InvalidRequestError( + "invalid_state", + *[ + {"player_id": player.id, "state": player.state.name} + for player in all_players + ] + ) + + queue = self.queues[queue_name] if queue_name else None + featured_mod = featured_mod or queue.featured_mod + host = all_players[0] + guests = all_players[1:] + + for player in all_players: + player.state = PlayerState.STARTING_AUTOMATCH + + game = None + try: + game = self.game_service.create_game( + game_class=LadderGame, + game_mode=featured_mod, + host=host, + name="Matchmaker Game", + mapname=map_name, + matchmaker_queue_id=queue.id if queue else None, + rating_type=queue.rating_type if queue else None, + max_players=len(participants) + ) + game.init_mode = InitMode.AUTO_LOBBY + game.set_name_unchecked(game_name) + + for participant in participants: + player_id = participant["player_id"] + faction = Faction.from_value(participant["faction"]) + team = participant["team"] + slot = participant["slot"] + + game.set_player_option(player_id, "Faction", faction.value) + game.set_player_option(player_id, "Team", team) + game.set_player_option(player_id, "StartSpot", slot) + game.set_player_option(player_id, "Army", slot) + game.set_player_option(player_id, "Color", slot) + + def make_game_options(player: Player) -> GameLaunchOptions: + return GameLaunchOptions( + mapname=game.map_name, + expected_players=len(all_players), + game_options=game_options, + team=game.get_player_option(player.id, "Team"), + faction=game.get_player_option(player.id, "Faction"), + map_position=game.get_player_option(player.id, "StartSpot") + ) + + await self.launch_match(game, host, guests, make_game_options) + + return game + except Exception: + if game: + await game.on_game_finish() + + for player in all_players: + if player.state == PlayerState.STARTING_AUTOMATCH: + player.state = PlayerState.IDLE + + raise + def on_match_found( self, s1: Search, @@ -729,3 +903,9 @@ async def graceful_shutdown(self): class NotConnectedError(asyncio.TimeoutError): def __init__(self, players: list[Player]): self.players = players + + +class InvalidRequestError(Exception): + def __init__(self, code: str, *args): + super().__init__(*args) + self.code = code diff --git a/server/message_queue_service.py b/server/message_queue_service.py index 5015c172a..c0b957c8b 100644 --- a/server/message_queue_service.py +++ b/server/message_queue_service.py @@ -4,7 +4,7 @@ import asyncio import json -from typing import Iterable +from typing import Callable, Iterable, Optional import aio_pika from aio_pika import DeliveryMode, ExchangeType @@ -125,26 +125,29 @@ async def _shutdown(self) -> None: async def publish( self, exchange_name: str, - routing: str, + routing_key: str, payload: dict, mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, + correlation_id: Optional[str] = None, ) -> None: await self.publish_many( exchange_name, - routing, + routing_key, [payload], mandatory=mandatory, - delivery_mode=delivery_mode + delivery_mode=delivery_mode, + correlation_id=correlation_id, ) async def publish_many( self, exchange_name: str, - routing: str, + routing_key: str, payloads: Iterable[dict], mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, + correlation_id: Optional[str] = None, ) -> None: if not self._is_ready: self._logger.warning( @@ -152,19 +155,18 @@ async def publish_many( ) return - exchange = self._exchanges.get(exchange_name) - if exchange is None: - raise KeyError(f"Unknown exchange {exchange_name}.") + exchange = self._get_exchange(exchange_name) async with self._channel.transaction(): for payload in payloads: message = aio_pika.Message( json.dumps(payload).encode(), - delivery_mode=delivery_mode + delivery_mode=delivery_mode, + correlation_id=correlation_id, ) await exchange.publish( message, - routing_key=routing, + routing_key=routing_key, mandatory=mandatory ) self._logger.log( @@ -172,9 +174,39 @@ async def publish_many( "Published message %s to %s/%s", payload, exchange_name, - routing + routing_key ) + async def consume( + self, + exchange_name: str, + routing_key: str, + process_message: Callable[[aio_pika.IncomingMessage], None], + ) -> None: + await self.initialize() + if not self._is_ready: + self._logger.warning( + "Not connected to RabbitMQ, unable to declare queue." + ) + return + + exchange = self._get_exchange(exchange_name) + queue = await self._channel.declare_queue( + None, + auto_delete=True, + durable=False + ) + + await queue.bind(exchange, routing_key) + await queue.consume(process_message, exclusive=True) + + def _get_exchange(self, exchange_name: str) -> aio_pika.Exchange: + exchange = self._exchanges.get(exchange_name) + if exchange is None: + raise KeyError(f"Unknown exchange {exchange_name}.") + + return exchange + @synchronizedmethod("initialization_lock") async def reconnect(self) -> None: self._is_ready = False diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 7e0a69184..c414ab476 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -38,9 +38,22 @@ def mock_games(): @pytest.fixture -async def ladder_service(mocker, database, game_service, violation_service): +async def ladder_service( + mocker, + database, + game_service, + message_queue_service, + player_service, + violation_service, +): mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1) - ladder_service = LadderService(database, game_service, violation_service) + ladder_service = LadderService( + database, + game_service, + message_queue_service, + player_service, + violation_service, + ) await ladder_service.initialize() yield ladder_service await ladder_service.shutdown() @@ -515,18 +528,23 @@ async def channel(): yield channel -async def connect_mq_consumer(server, channel, routing_key): - """ - Returns a subclass of Protocol that yields messages read from a rabbitmq - exchange. - """ +async def connect_mq_queue(channel, routing_key): exchange = await channel.declare_exchange( config.MQ_EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True ) - queue = await channel.declare_queue("", exclusive=True) + queue = await channel.declare_queue(None, exclusive=True) await queue.bind(exchange, routing_key=routing_key) + return queue + + +async def connect_mq_consumer(channel, routing_key): + """ + Returns a subclass of Protocol that yields messages read from a rabbitmq + exchange. + """ + queue = await connect_mq_queue(channel, routing_key) proto = AioQueueProtocol(queue) await proto.consume() diff --git a/tests/integration_tests/test_game.py b/tests/integration_tests/test_game.py index 8ca9f39e7..a9255aec7 100644 --- a/tests/integration_tests/test_game.py +++ b/tests/integration_tests/test_game.py @@ -390,7 +390,6 @@ async def test_game_ended_rates_game(lobby_server): @fast_forward(30) async def test_game_ended_broadcasts_rating_update(lobby_server, channel): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "success.rating.update" ) @@ -925,12 +924,10 @@ async def test_game_stats_broadcasts_achievement_updates( channel ): mq_proto_ach = await connect_mq_consumer( - lobby_server, channel, "request.achievement.update" ) mq_proto_evt = await connect_mq_consumer( - lobby_server, channel, "request.event.update" ) @@ -1018,7 +1015,6 @@ async def test_galactic_war_1v1_game_ended_broadcasts_army_results( channel ): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "success.gameResults.create" ) @@ -1114,7 +1110,6 @@ async def test_galactic_war_1v1_game_ended_broadcasts_army_results( @fast_forward(30) async def test_galactic_war_2v1_game_ended_broadcasts_army_results(lobby_server, channel): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "success.gameResults.create" ) diff --git a/tests/integration_tests/test_matchmaker_requests.py b/tests/integration_tests/test_matchmaker_requests.py new file mode 100644 index 000000000..97821c7d2 --- /dev/null +++ b/tests/integration_tests/test_matchmaker_requests.py @@ -0,0 +1,470 @@ +# External matchmaker requests over rabbitmq +import asyncio +import json +import uuid + +import pytest + +from server.config import config +from tests.utils import fast_forward + +from .conftest import connect_and_sign_in, connect_mq_queue, read_until_command +from .test_game import client_response, start_search + +pytestmark = pytest.mark.rabbitmq + + +@fast_forward(20) +async def test_valid_request_1v1( + lobby_server, + channel, + message_queue_service +): + test_id, _, proto1 = await connect_and_sign_in( + ("test", "test_password"), lobby_server + ) + rhiza_id, _, proto2 = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await asyncio.gather(*( + read_until_command(proto, "game_info") + for proto in (proto1, proto2) + )) + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "featured_mod": "ladder1v1", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": test_id, + "team": 2, + "slot": 1, + "faction": "uef" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ] + }, + correlation_id=correlation_id + ) + + msg1, msg2 = await asyncio.gather( + client_response(proto1), + client_response(proto2) + ) + assert msg1["uid"] == msg2["uid"] + assert msg1["mapname"] == msg2["mapname"] + assert msg1["name"] == msg2["name"] + assert msg1["mod"] == msg2["mod"] + assert msg1["rating_type"] == msg2["rating_type"] + assert msg1["expected_players"] == msg2["expected_players"] + assert "game_options" not in msg1 and "game_options" not in msg2 + + assert msg1["mapname"] == "scmp_003" + assert msg1["name"] == "test VERSUS Rhiza" + assert msg1["mod"] == "ladder1v1" + assert msg1["rating_type"] == "ladder_1v1" + assert msg1["expected_players"] == 2 + + assert msg1["team"] == 2 + assert msg1["map_position"] == 1 + assert msg1["faction"] == 1 + + assert msg2["team"] == 3 + assert msg2["map_position"] == 2 + assert msg2["faction"] == 3 + + await proto1.send_message({ + "target": "game", + "command": "GameState", + "args": ["Launching"] + }) + + message = await success_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "game_id": msg1["uid"] + } + assert await error_queue.get(fail=False) is None + + +@fast_forward(20) +async def test_valid_request_1v1_game_options( + lobby_server, + channel, + message_queue_service +): + test_id, _, proto1 = await connect_and_sign_in( + ("test", "test_password"), lobby_server + ) + rhiza_id, _, proto2 = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await asyncio.gather(*( + read_until_command(proto, "game_info") + for proto in (proto1, proto2) + )) + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "featured_mod": "ladder1v1", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": test_id, + "team": 2, + "slot": 1, + "faction": "uef" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ], + "game_options": { + "Share": "ShareUntilDeath", + "RestrictedCategories": ["T3", "T4", "SUBS", "PARAGON"], + } + }, + correlation_id=correlation_id + ) + + msg1, msg2 = await asyncio.gather( + client_response(proto1), + client_response(proto2) + ) + assert msg1["game_options"] == msg2["game_options"] + + assert msg1["game_options"] == { + "Share": "ShareUntilDeath", + "RestrictedCategories": ["T3", "T4", "SUBS", "PARAGON"], + } + + await proto1.send_message({ + "target": "game", + "command": "GameState", + "args": ["Launching"] + }) + + message = await success_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "game_id": msg1["uid"] + } + assert await error_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_empty( + ladder_service, + channel, + message_queue_service, +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + # Empty payload + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "missing 'map_name'"}, + ] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_missing_queue_and_featured_mod( + ladder_service, + channel, + message_queue_service, +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "game_name": "Test bad game", + "map_name": "scmp_003", + "participants": [], + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "missing 'featured_mod'"}, + ] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_invalid_queue_name( + ladder_service, + channel, + message_queue_service, +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "bad_queue_name", + "game_name": "Test bad game", + "map_name": "scmp_003", + "participants": [], + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "invalid queue 'bad_queue_name'"}, + ], + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_invalid_request_empty_participants( + ladder_service, + channel, + message_queue_service +): + del ladder_service + + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "game_name": "Test bad game", + "map_name": "scmp_003", + "participants": [], + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_request", + "args": [ + {"message": "empty participants"}, + ], + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_player_offline( + lobby_server, + channel, + message_queue_service +): + rhiza_id, _, proto = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await read_until_command(proto, "game_info") + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": 1, + "team": 2, + "slot": 1, + "faction": "uef" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ] + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "players_not_found", "args": [{"player_id": 1}] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(10) +async def test_player_already_searching( + lobby_server, + channel, + message_queue_service +): + rhiza_id, _, proto = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await read_until_command(proto, "game_info") + await start_search(proto, "ladder1v1") + + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "featured_mod": "faf", + "game_name": "Rhiza solo game", + "map_name": "scmp_003", + "participants": [ + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "cybran" + } + ] + }, + correlation_id=correlation_id + ) + + message = await error_queue.iterator(timeout=5).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "invalid_state", "args": [ + {"player_id": rhiza_id, "state": "SEARCHING_LADDER"}, + ] + } + assert await success_queue.get(fail=False) is None + + +@fast_forward(100) +async def test_players_dont_connect( + lobby_server, + channel, + message_queue_service +): + test_id, _, proto1 = await connect_and_sign_in( + ("test", "test_password"), lobby_server + ) + rhiza_id, _, proto2 = await connect_and_sign_in( + ("Rhiza", "puff_the_magic_dragon"), lobby_server + ) + success_queue = await connect_mq_queue(channel, "success.match.create") + error_queue = await connect_mq_queue(channel, "error.match.create") + + await asyncio.gather(*( + read_until_command(proto, "game_info") + for proto in (proto1, proto2) + )) + + # Include all the information we could possibly need + correlation_id = str(uuid.uuid4()) + await message_queue_service.publish( + config.MQ_EXCHANGE_NAME, + "request.match.create", + { + "matchmaker_queue": "ladder1v1", + "featured_mod": "faf", + "game_name": "test VERSUS Rhiza", + "map_name": "scmp_003", + "participants": [ + { + "player_id": test_id, + "team": 2, + "slot": 1, + "faction": "aeon" + }, + { + "player_id": rhiza_id, + "team": 3, + "slot": 2, + "faction": "seraphim" + } + ] + }, + correlation_id=correlation_id + ) + + msg = await client_response(proto1) + assert msg["faction"] == 2 + # Mod field sould override the mod from queue + assert msg["mod"] == "faf" + + message = await error_queue.iterator(timeout=85).__anext__() + assert message.correlation_id == correlation_id + assert json.loads(message.body.decode()) == { + "error_code": "launch_failed", "args": [{"player_id": rhiza_id}] + } + assert await success_queue.get(fail=False) is None diff --git a/tests/integration_tests/test_server.py b/tests/integration_tests/test_server.py index 6c8f7ba01..b77f6fd5a 100644 --- a/tests/integration_tests/test_server.py +++ b/tests/integration_tests/test_server.py @@ -359,12 +359,10 @@ async def test_player_info_broadcast(lobby_server): @fast_forward(5) async def test_player_info_broadcast_to_rabbitmq(lobby_server, channel): mq_proto = await connect_mq_consumer( - lobby_server, channel, "broadcast.playerInfo.update" ) mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "broadcast.*.update" ) @@ -599,7 +597,6 @@ async def test_game_info_broadcast_to_players_in_lobby(lobby_server): @fast_forward(10) async def test_info_broadcast_to_rabbitmq(lobby_server, channel): mq_proto_all = await connect_mq_consumer( - lobby_server, channel, "broadcast.*.update" ) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index 7d65cdbb7..d285fbd9b 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -21,21 +21,26 @@ def ladder_and_game_service_context( @asynccontextmanager async def make_ladder_and_game_service(): async with database_context(request) as database: + player_service = mock.Mock() + message_queue_service = mock.Mock( + declare_exchange=mock.AsyncMock(), + consume=mock.AsyncMock() + ) with mock.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1): game_service = GameService( database, - player_service=mock.Mock(), + player_service=player_service, game_stats_service=mock.Mock(), rating_service=mock.Mock(), - message_queue_service=mock.Mock( - declare_exchange=mock.AsyncMock() - ) + message_queue_service=message_queue_service, ) violation_service = ViolationService() ladder_service = LadderService( database, game_service, - violation_service + message_queue_service, + player_service, + violation_service, ) await game_service.initialize() @@ -56,10 +61,18 @@ async def ladder_service( mocker, database, game_service, + message_queue_service, + player_service, violation_service, ): mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1) - ladder_service = LadderService(database, game_service, violation_service) + ladder_service = LadderService( + database, + game_service, + message_queue_service, + player_service, + violation_service, + ) await ladder_service.initialize() yield ladder_service await ladder_service.shutdown() diff --git a/tests/unit_tests/test_ladder_service.py b/tests/unit_tests/test_ladder_service.py index b62b063a3..62557d26a 100644 --- a/tests/unit_tests/test_ladder_service.py +++ b/tests/unit_tests/test_ladder_service.py @@ -21,8 +21,20 @@ from .strategies import st_players -async def test_queue_initialization(database, game_service, violation_service): - ladder_service = LadderService(database, game_service, violation_service) +async def test_queue_initialization( + database, + game_service, + message_queue_service, + player_service, + violation_service, +): + ladder_service = LadderService( + database, + game_service, + message_queue_service, + player_service, + violation_service, + ) def make_mock_queue(*args, **kwargs): queue = mock.create_autospec(MatchmakerQueue)