Run coroutines in parallel, raising the first exception that dosen't
-match any of the specified exception classes.
+
Expand source code
-
async def gather_without_exceptions(
- tasks: list[asyncio.Task],
- *exceptions: type[BaseException],
-) -> list[Any]:
- """
- Run coroutines in parallel, raising the first exception that dosen't
- match any of the specified exception classes.
- """
- results = []
- for fut in asyncio.as_completed(tasks):
- try:
- results.append(await fut)
- except exceptions:
- logger.debug(
- "Ignoring error in gather_without_exceptions", exc_info=True
- )
- return results
+ self.server.write_broadcast({"command": "ping"})
+
+ async def wait_report_dirtes(self):
+ """
+ Wait for the current report_dirties task to complete.
+ """
+ if self._report_dirties_event is None:
+ return
+
+ await self._report_dirties_event.wait()
+
+ async def graceful_shutdown(self):
+ if config.SHUTDOWN_KICK_IDLE_PLAYERS:
+ message = (
+ "If you're in a game you can continue to play, otherwise you "
+ "will be disconnected. If you aren't reconnected automatically "
+ "please wait a few minutes and try to connect again."
+ )
+ else:
+ message = (
+ "If you're in a game you can continue to play, however, you "
+ "will not be able to create any new games until the server has "
+ "been restarted."
+ )
+
+ delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD)
+ self.server.write_broadcast({
+ "command": "notice",
+ "style": "info",
+ "text": (
+ f"The server will be shutting down for maintenance in {delta}! "
+ f"{message}"
+ )
+ })
+
+ async def shutdown(self):
+ self.server.write_broadcast({
+ "command": "notice",
+ "style": "info",
+ "text": (
+ "The server has been shut down for maintenance "
+ "but should be back online soon. If you experience any "
+ "problems, please restart your client. <br/><br/>"
+ "We apologize for this interruption."
+ )
+ })
@@ -182,13 +239,14 @@
Classes
self.message_queue_service = message_queue_service
self.game_service = game_service
self.player_service = player_service
+ self._report_dirties_event = None
async def initialize(self):
# Using a lazy interval timer so that the intervals can be changed
# without restarting the server.
self._broadcast_dirties_timer = LazyIntervalTimer(
lambda: config.DIRTY_REPORT_INTERVAL,
- self.report_dirties,
+ self._monitored_report_dirties,
start=True
)
self._broadcast_ping_timer = LazyIntervalTimer(
@@ -197,6 +255,14 @@
)
def broadcast_ping(self):
- self.server.write_broadcast({"command": "ping"})
+ self.server.write_broadcast({"command": "ping"})
+
+ async def wait_report_dirtes(self):
+ """
+ Wait for the current report_dirties task to complete.
+ """
+ if self._report_dirties_event is None:
+ return
+
+ await self._report_dirties_event.wait()
+
+ async def graceful_shutdown(self):
+ if config.SHUTDOWN_KICK_IDLE_PLAYERS:
+ message = (
+ "If you're in a game you can continue to play, otherwise you "
+ "will be disconnected. If you aren't reconnected automatically "
+ "please wait a few minutes and try to connect again."
+ )
+ else:
+ message = (
+ "If you're in a game you can continue to play, however, you "
+ "will not be able to create any new games until the server has "
+ "been restarted."
+ )
+
+ delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD)
+ self.server.write_broadcast({
+ "command": "notice",
+ "style": "info",
+ "text": (
+ f"The server will be shutting down for maintenance in {delta}! "
+ f"{message}"
+ )
+ })
+
+ async def shutdown(self):
+ self.server.write_broadcast({
+ "command": "notice",
+ "style": "info",
+ "text": (
+ "The server has been shut down for maintenance "
+ "but should be back online soon. If you experience any "
+ "problems, please restart your client. <br/><br/>"
+ "We apologize for this interruption."
+ )
+ })
Ancestors
@@ -376,11 +487,31 @@
Methods
)
+
+async def wait_report_dirtes(self)
+
+
+
Wait for the current report_dirties task to complete.
+
+
+Expand source code
+
+
async def wait_report_dirtes(self):
+ """
+ Wait for the current report_dirties task to complete.
+ """
+ if self._report_dirties_event is None:
+ return
+
+ await self._report_dirties_event.wait()
self.DIRTY_REPORT_INTERVAL = 1
self.PING_INTERVAL = 45
+ # How many seconds to wait for games to end before doing a hard shutdown.
+ # If using kubernetes, you must set terminationGracePeriodSeconds
+ # on the pod to be larger than this value. With docker compose, use
+ # --timeout (-t) to set a longer timeout.
+ self.SHUTDOWN_GRACE_PERIOD = 30 * 60
+ self.SHUTDOWN_KICK_IDLE_PLAYERS = False
self.CONTROL_SERVER_PORT = 4000
+ self.HEALTH_SERVER_PORT = 2000
self.METRICS_PORT = 8011
self.ENABLE_METRICS = False
@@ -126,10 +133,9 @@
Module server.config
self.FAF_POLICY_SERVER_BASE_URL = "http://faf-policy-server"
self.USE_POLICY_SERVER = True
- self.FORCE_STEAM_LINK_AFTER_DATE = 1536105599 # 5 september 2018 by default
- self.FORCE_STEAM_LINK = False
-
self.ALLOW_PASSWORD_LOGIN = True
+ # How many seconds a connection has to authenticate before being killed
+ self.LOGIN_TIMEOUT = 5 * 60
self.NEWBIE_BASE_MEAN = 500
self.NEWBIE_MIN_GAMES = 10
@@ -200,10 +206,14 @@
Module server.config
with open(config_file) as f:
new_values.update(yaml.safe_load(f))
except FileNotFoundError:
- self._logger.info("No configuration file found at %s", config_file)
+ self._logger.warning(
+ "No configuration file found at %s",
+ config_file
+ )
except TypeError:
self._logger.info(
- "Configuration file at %s appears to be empty", config_file
+ "Configuration file at %s appears to be empty",
+ config_file
)
triggered_callback_keys = tuple(
@@ -313,8 +323,15 @@
Classes
self.DIRTY_REPORT_INTERVAL = 1
self.PING_INTERVAL = 45
+ # How many seconds to wait for games to end before doing a hard shutdown.
+ # If using kubernetes, you must set terminationGracePeriodSeconds
+ # on the pod to be larger than this value. With docker compose, use
+ # --timeout (-t) to set a longer timeout.
+ self.SHUTDOWN_GRACE_PERIOD = 30 * 60
+ self.SHUTDOWN_KICK_IDLE_PLAYERS = False
self.CONTROL_SERVER_PORT = 4000
+ self.HEALTH_SERVER_PORT = 2000
self.METRICS_PORT = 8011
self.ENABLE_METRICS = False
@@ -343,10 +360,9 @@
Classes
self.FAF_POLICY_SERVER_BASE_URL = "http://faf-policy-server"
self.USE_POLICY_SERVER = True
- self.FORCE_STEAM_LINK_AFTER_DATE = 1536105599 # 5 september 2018 by default
- self.FORCE_STEAM_LINK = False
-
self.ALLOW_PASSWORD_LOGIN = True
+ # How many seconds a connection has to authenticate before being killed
+ self.LOGIN_TIMEOUT = 5 * 60
self.NEWBIE_BASE_MEAN = 500
self.NEWBIE_MIN_GAMES = 10
@@ -417,10 +433,14 @@
Classes
with open(config_file) as f:
new_values.update(yaml.safe_load(f))
except FileNotFoundError:
- self._logger.info("No configuration file found at %s", config_file)
+ self._logger.warning(
+ "No configuration file found at %s",
+ config_file
+ )
except TypeError:
self._logger.info(
- "Configuration file at %s appears to be empty", config_file
+ "Configuration file at %s appears to be empty",
+ config_file
)
triggered_callback_keys = tuple(
@@ -472,10 +492,14 @@
Methods
with open(config_file) as f:
new_values.update(yaml.safe_load(f))
except FileNotFoundError:
- self._logger.info("No configuration file found at %s", config_file)
+ self._logger.warning(
+ "No configuration file found at %s",
+ config_file
+ )
except TypeError:
self._logger.info(
- "Configuration file at %s appears to be empty", config_file
+ "Configuration file at %s appears to be empty",
+ config_file
)
triggered_callback_keys = tuple(
diff --git a/configuration_service.html b/configuration_service.html
index 67083733e..bc48e3c2a 100644
--- a/configuration_service.html
+++ b/configuration_service.html
@@ -120,6 +120,7 @@
"""
pass # pragma: no cover
+ async def graceful_shutdown(self) -> None:
+ """
+ Called once after the graceful shutdown period is initiated.
+
+ This signals that the service should stop accepting new events but
+ continue to wait for existing ones to complete normally. The hook
+ funciton `shutdown` will be called after the grace period has ended to
+ fully shutdown the service.
+ """
+ pass # pragma: no cover
+
async def shutdown(self) -> None:
"""
Called once after the server received the shutdown signal.
@@ -397,6 +408,31 @@
Subclasses
Methods
+
+async def graceful_shutdown(self) ‑> None
+
+
+
Called once after the graceful shutdown period is initiated.
+
This signals that the service should stop accepting new events but
+continue to wait for existing ones to complete normally. The hook
+funciton shutdown will be called after the grace period has ended to
+fully shutdown the service.
+
+
+Expand source code
+
+
async def graceful_shutdown(self) -> None:
+ """
+ Called once after the graceful shutdown period is initiated.
+
+ This signals that the service should stop accepting new events but
+ continue to wait for existing ones to complete normally. The hook
+ funciton `shutdown` will be called after the grace period has ended to
+ fully shutdown the service.
+ """
+ pass # pragma: no cover
"""
pass # pragma: no cover
+ async def graceful_shutdown(self) -> None:
+ """
+ Called once after the graceful shutdown period is initiated.
+
+ This signals that the service should stop accepting new events but
+ continue to wait for existing ones to complete normally. The hook
+ funciton `shutdown` will be called after the grace period has ended to
+ fully shutdown the service.
+ """
+ pass # pragma: no cover
+
async def shutdown(self) -> None:
"""
Called once after the server received the shutdown signal.
@@ -171,6 +182,17 @@
Classes
"""
pass # pragma: no cover
+ async def graceful_shutdown(self) -> None:
+ """
+ Called once after the graceful shutdown period is initiated.
+
+ This signals that the service should stop accepting new events but
+ continue to wait for existing ones to complete normally. The hook
+ funciton `shutdown` will be called after the grace period has ended to
+ fully shutdown the service.
+ """
+ pass # pragma: no cover
+
async def shutdown(self) -> None:
"""
Called once after the server received the shutdown signal.
@@ -202,6 +224,31 @@
Subclasses
Methods
+
+async def graceful_shutdown(self) ‑> None
+
+
+
Called once after the graceful shutdown period is initiated.
+
This signals that the service should stop accepting new events but
+continue to wait for existing ones to complete normally. The hook
+funciton shutdown will be called after the grace period has ended to
+fully shutdown the service.
+
+
+Expand source code
+
+
async def graceful_shutdown(self) -> None:
+ """
+ Called once after the graceful shutdown period is initiated.
+
+ This signals that the service should stop accepting new events but
+ continue to wait for existing ones to complete normally. The hook
+ funciton `shutdown` will be called after the grace period has ended to
+ fully shutdown the service.
+ """
+ pass # pragma: no cover
Manages the lifecycle of active games
"""
+import asyncio
from collections import Counter
from typing import Optional, Union, ValuesView
@@ -44,6 +45,7 @@
Module server.game_service
from .db import FAFDatabase
from .db.models import game_featuredMods
from .decorators import with_logger
+from .exceptions import DisabledError
from .games import (
CustomGame,
FeaturedMod,
@@ -81,6 +83,8 @@
"""
Main entrypoint for creating new games
"""
+ if not self._allow_new_games:
+ raise DisabledError()
+
game_id = self.create_uid()
game_args = {
"database": self._db,
@@ -236,10 +244,17 @@
Module server.game_service
rating_type_counter[(rating_type, state)]
)
+ @property
+ def all_games(self) -> ValuesView[Game]:
+ return self._games.values()
+
@property
def live_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state is GameState.LIVE
+ ]
@property
def open_games(self) -> list[Game]:
@@ -254,22 +269,32 @@
Module server.game_service
The client ignores everything "closed". This property fetches all such not-closed games.
"""
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.LIVE]
-
- @property
- def all_games(self) -> ValuesView[Game]:
- return self._games.values()
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.LIVE)
+ ]
@property
def pending_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.INITIALIZING)
+ ]
def remove_game(self, game: Game):
if game.id in self._games:
+ self._logger.debug("Removing game %s", game)
del self._games[game.id]
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self._games
+ ):
+ self._drain_event.set()
+
def __getitem__(self, item: int) -> Game:
return self._games[item]
@@ -290,7 +315,35 @@
Module server.game_service
):
metrics.rated_games.labels(game_results.rating_type).inc()
# TODO: Remove when rating service starts listening to message queue
- await self._rating_service.enqueue(result_dict)
+ await self._rating_service.enqueue(result_dict)
+
+ async def drain_games(self):
+ """
+ Wait for all games to finish.
+ """
+ if not self._games:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
+ async def graceful_shutdown(self):
+ self._allow_new_games = False
+
+ await self.close_lobby_games()
+
+ async def close_lobby_games(self):
+ self._logger.info("Closing all games currently in lobby")
+ for game in self.pending_games:
+ for game_connection in list(game.connections):
+ # Tell the client to kill the FA process
+ game_connection.player.write_message({
+ "command": "notice",
+ "style": "kill"
+ })
+ await game_connection.abort()
@@ -334,6 +387,8 @@
"""
Main entrypoint for creating new games
"""
+ if not self._allow_new_games:
+ raise DisabledError()
+
game_id = self.create_uid()
game_args = {
"database": self._db,
@@ -489,10 +548,17 @@
Classes
rating_type_counter[(rating_type, state)]
)
+ @property
+ def all_games(self) -> ValuesView[Game]:
+ return self._games.values()
+
@property
def live_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state is GameState.LIVE
+ ]
@property
def open_games(self) -> list[Game]:
@@ -507,22 +573,32 @@
Classes
The client ignores everything "closed". This property fetches all such not-closed games.
"""
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.LIVE]
-
- @property
- def all_games(self) -> ValuesView[Game]:
- return self._games.values()
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.LIVE)
+ ]
@property
def pending_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.INITIALIZING)
+ ]
def remove_game(self, game: Game):
if game.id in self._games:
+ self._logger.debug("Removing game %s", game)
del self._games[game.id]
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self._games
+ ):
+ self._drain_event.set()
+
def __getitem__(self, item: int) -> Game:
return self._games[item]
@@ -543,7 +619,35 @@
Classes
):
metrics.rated_games.labels(game_results.rating_type).inc()
# TODO: Remove when rating service starts listening to message queue
- await self._rating_service.enqueue(result_dict)
+ await self._rating_service.enqueue(result_dict)
+
+ async def drain_games(self):
+ """
+ Wait for all games to finish.
+ """
+ if not self._games:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
+ async def graceful_shutdown(self):
+ self._allow_new_games = False
+
+ await self.close_lobby_games()
+
+ async def close_lobby_games(self):
+ self._logger.info("Closing all games currently in lobby")
+ for game in self.pending_games:
+ for game_connection in list(game.connections):
+ # Tell the client to kill the FA process
+ game_connection.player.write_message({
+ "command": "notice",
+ "style": "kill"
+ })
+ await game_connection.abort()
Ancestors
@@ -572,8 +676,11 @@
Instance variables
@property
def live_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state is GameState.LIVE
+ ]
The client ignores everything "closed". This property fetches all such not-closed games.
"""
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.LIVE)
+ ]
@property
def pending_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.INITIALIZING)
+ ]
Methods
+
+async def close_lobby_games(self)
+
+
+
+
+
+Expand source code
+
+
async def close_lobby_games(self):
+ self._logger.info("Closing all games currently in lobby")
+ for game in self.pending_games:
+ for game_connection in list(game.connections):
+ # Tell the client to kill the FA process
+ game_connection.player.write_message({
+ "command": "notice",
+ "style": "kill"
+ })
+ await game_connection.abort()
"""
Main entrypoint for creating new games
"""
+ if not self._allow_new_games:
+ raise DisabledError()
+
game_id = self.create_uid()
game_args = {
"database": self._db,
@@ -686,6 +823,28 @@
Methods
return self.game_id_counter
+
+async def drain_games(self)
+
+
+
Wait for all games to finish.
+
+
+Expand source code
+
+
async def drain_games(self):
+ """
+ Wait for all games to finish.
+ """
+ if not self._games:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
+
async def initialise_game_counter(self)
@@ -797,7 +956,15 @@
Methods
def remove_game(self, game: Game):
if game.id in self._games:
- del self._games[game.id]
+ self._logger.debug("Removing game %s", game)
+ del self._games[game.id]
+
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self._games
+ ):
+ self._drain_event.set()
Depending on the state, it is either:
- (LOBBY) The currently connected players
- (LIVE) Players who participated in the game
- - Empty list
"""
if self.state is GameState.LOBBY:
return self.get_connected_players()
@@ -1116,7 +1115,6 @@
Classes
Depending on the state, it is either:
- (LOBBY) The currently connected players
- (LIVE) Players who participated in the game
- - Empty list
"""
if self.state is GameState.LOBBY:
return self.get_connected_players()
@@ -2057,8 +2055,7 @@
Returns
Players in the game
Depending on the state, it is either:
- (LOBBY) The currently connected players
-- (LIVE) Players who participated in the game
-- Empty list
+- (LIVE) Players who participated in the game
Expand source code
@@ -2071,7 +2068,6 @@
Returns
Depending on the state, it is either:
- (LOBBY) The currently connected players
- (LIVE) Players who participated in the game
- - Empty list
"""
if self.state is GameState.LOBBY:
return self.get_connected_players()
diff --git a/games/index.html b/games/index.html
index 6d60c5e07..afa40ae85 100644
--- a/games/index.html
+++ b/games/index.html
@@ -539,7 +539,6 @@
Class variables
Depending on the state, it is either:
- (LOBBY) The currently connected players
- (LIVE) Players who participated in the game
- - Empty list
"""
if self.state is GameState.LOBBY:
return self.get_connected_players()
@@ -1480,8 +1479,7 @@
Returns
Players in the game
Depending on the state, it is either:
- (LOBBY) The currently connected players
-- (LIVE) Players who participated in the game
-- Empty list
+- (LIVE) Players who participated in the game
Expand source code
@@ -1494,7 +1492,6 @@
Returns
Depending on the state, it is either:
- (LOBBY) The currently connected players
- (LIVE) Players who participated in the game
- - Empty list
"""
if self.state is GameState.LOBBY:
return self.get_connected_players()
diff --git a/geoip_service.html b/geoip_service.html
index 707d29b86..7d4ef6050 100644
--- a/geoip_service.html
+++ b/geoip_service.html
@@ -758,6 +758,7 @@
)
def broadcast_ping(self):
- self.server.write_broadcast({"command": "ping"})
+ self.server.write_broadcast({"command": "ping"})
+
+ async def wait_report_dirtes(self):
+ """
+ Wait for the current report_dirties task to complete.
+ """
+ if self._report_dirties_event is None:
+ return
+
+ await self._report_dirties_event.wait()
+
+ async def graceful_shutdown(self):
+ if config.SHUTDOWN_KICK_IDLE_PLAYERS:
+ message = (
+ "If you're in a game you can continue to play, otherwise you "
+ "will be disconnected. If you aren't reconnected automatically "
+ "please wait a few minutes and try to connect again."
+ )
+ else:
+ message = (
+ "If you're in a game you can continue to play, however, you "
+ "will not be able to create any new games until the server has "
+ "been restarted."
+ )
+
+ delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD)
+ self.server.write_broadcast({
+ "command": "notice",
+ "style": "info",
+ "text": (
+ f"The server will be shutting down for maintenance in {delta}! "
+ f"{message}"
+ )
+ })
+
+ async def shutdown(self):
+ self.server.write_broadcast({
+ "command": "notice",
+ "style": "info",
+ "text": (
+ "The server has been shut down for maintenance "
+ "but should be back online soon. If you experience any "
+ "problems, please restart your client. <br/><br/>"
+ "We apologize for this interruption."
+ )
+ })
Ancestors
@@ -814,11 +885,31 @@
Methods
)
+
+async def wait_report_dirtes(self)
+
+
+
Wait for the current report_dirties task to complete.
+
+
+Expand source code
+
+
async def wait_report_dirtes(self):
+ """
+ Wait for the current report_dirties task to complete.
+ """
+ if self._report_dirties_event is None:
+ return
+
+ await self._report_dirties_event.wait()
"""
Main entrypoint for creating new games
"""
+ if not self._allow_new_games:
+ raise DisabledError()
+
game_id = self.create_uid()
game_args = {
"database": self._db,
@@ -2540,10 +2638,17 @@
Inherited members
rating_type_counter[(rating_type, state)]
)
+ @property
+ def all_games(self) -> ValuesView[Game]:
+ return self._games.values()
+
@property
def live_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state is GameState.LIVE
+ ]
@property
def open_games(self) -> list[Game]:
@@ -2558,22 +2663,32 @@
Inherited members
The client ignores everything "closed". This property fetches all such not-closed games.
"""
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.LIVE]
-
- @property
- def all_games(self) -> ValuesView[Game]:
- return self._games.values()
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.LIVE)
+ ]
@property
def pending_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.INITIALIZING)
+ ]
def remove_game(self, game: Game):
if game.id in self._games:
+ self._logger.debug("Removing game %s", game)
del self._games[game.id]
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self._games
+ ):
+ self._drain_event.set()
+
def __getitem__(self, item: int) -> Game:
return self._games[item]
@@ -2594,7 +2709,35 @@
Inherited members
):
metrics.rated_games.labels(game_results.rating_type).inc()
# TODO: Remove when rating service starts listening to message queue
- await self._rating_service.enqueue(result_dict)
+ await self._rating_service.enqueue(result_dict)
+
+ async def drain_games(self):
+ """
+ Wait for all games to finish.
+ """
+ if not self._games:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
+ async def graceful_shutdown(self):
+ self._allow_new_games = False
+
+ await self.close_lobby_games()
+
+ async def close_lobby_games(self):
+ self._logger.info("Closing all games currently in lobby")
+ for game in self.pending_games:
+ for game_connection in list(game.connections):
+ # Tell the client to kill the FA process
+ game_connection.player.write_message({
+ "command": "notice",
+ "style": "kill"
+ })
+ await game_connection.abort()
Ancestors
@@ -2623,8 +2766,11 @@
Instance variables
@property
def live_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state is GameState.LIVE
+ ]
The client ignores everything "closed". This property fetches all such not-closed games.
"""
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.LIVE]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.LIVE)
+ ]
@property
def pending_games(self) -> list[Game]:
- return [game for game in self._games.values()
- if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING]
+ return [
+ game
+ for game in self.all_games
+ if game.state in (GameState.LOBBY, GameState.INITIALIZING)
+ ]
Methods
+
+async def close_lobby_games(self)
+
+
+
+
+
+Expand source code
+
+
async def close_lobby_games(self):
+ self._logger.info("Closing all games currently in lobby")
+ for game in self.pending_games:
+ for game_connection in list(game.connections):
+ # Tell the client to kill the FA process
+ game_connection.player.write_message({
+ "command": "notice",
+ "style": "kill"
+ })
+ await game_connection.abort()
"""
Main entrypoint for creating new games
"""
+ if not self._allow_new_games:
+ raise DisabledError()
+
game_id = self.create_uid()
game_args = {
"database": self._db,
@@ -2737,6 +2913,28 @@
Methods
return self.game_id_counter
+
+async def drain_games(self)
+
+
+
Wait for all games to finish.
+
+
+Expand source code
+
+
async def drain_games(self):
+ """
+ Wait for all games to finish.
+ """
+ if not self._games:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
+
async def initialise_game_counter(self)
@@ -2848,7 +3046,15 @@
Methods
def remove_game(self, game: Game):
if game.id in self._games:
- del self._games[game.id]
+ self._logger.debug("Removing game %s", game)
+ del self._games[game.id]
+
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self._games
+ ):
+ self._drain_event.set()
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -4396,9 +4609,15 @@
Inherited members
if player in self._informed_players:
self._informed_players.remove(player)
- async def shutdown(self):
+ async def graceful_shutdown(self):
+ self._allow_new_searches = False
+
for queue in self.queues.values():
- queue.shutdown()
+ queue.shutdown()
+
+ for player, searches in self._searches.items():
+ for queue_name in list(searches.keys()):
+ self._cancel_search(player, queue_name)
Ancestors
@@ -4800,6 +5019,9 @@
Methods
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -4965,6 +5187,7 @@
)
self.uniqueid_exempt = frozenset(map(lambda x: x[0], result))
- async def shutdown(self):
- for player in self:
- if player.lobby_connection is not None:
- with contextlib.suppress(Exception):
- player.lobby_connection.write_warning(
- "The server has been shut down for maintenance, "
- "but should be back online soon. If you experience any "
- "problems, please restart your client. <br/><br/>"
- "We apologize for this interruption."
- )
+ async def kick_idle_players(self):
+ for fut in asyncio.as_completed([
+ player.lobby_connection.abort("Graceful shutdown.")
+ for player in self.all_players
+ if player.state == PlayerState.IDLE
+ if player.lobby_connection is not None
+ ]):
+ try:
+ await fut
+ except Exception:
+ self._logger.debug(
+ "Error while aborting connection",
+ exc_info=True
+ )
def on_connection_lost(self, conn: "LobbyConnection") -> None:
if not conn.player:
@@ -6214,7 +6444,11 @@
async def kick_idle_players(self):
+ for fut in asyncio.as_completed([
+ player.lobby_connection.abort("Graceful shutdown.")
+ for player in self.all_players
+ if player.state == PlayerState.IDLE
+ if player.lobby_connection is not None
+ ]):
+ try:
+ await fut
+ except Exception:
+ self._logger.debug(
+ "Error while aborting connection",
+ exc_info=True
+ )
return ctx
- async def shutdown(self):
- results = await asyncio.gather(
- *(ctx.stop() for ctx in self.contexts),
- return_exceptions=True
+ async def graceful_shutdown(self):
+ """
+ Start a graceful shut down of the server.
+
+ 1. Notify all services of graceful shutdown
+ """
+ self._logger.info("Initiating graceful shutdown")
+
+ await map_suppress(
+ lambda service: service.graceful_shutdown(),
+ self.services.values(),
+ logger=self._logger,
+ msg="when starting graceful shutdown of service "
)
- for result, ctx in zip(results, self.contexts):
- if isinstance(result, BaseException):
- self._logger.exception(
- "Unexpected error when stopping context %s",
- ctx,
- exc_info=result
- )
- results = await asyncio.gather(
- *(service.shutdown() for service in self.services.values()),
- return_exceptions=True
+ async def shutdown(self):
+ """
+ Immediately shutdown the server.
+
+ 1. Stop accepting new connections
+ 2. Stop all services
+ 3. Close all existing connections
+ """
+ self._logger.info("Initiating full shutdown")
+
+ await self._stop_contexts()
+ await self._shutdown_services()
+ await self._shutdown_contexts()
+
+ self.contexts.clear()
+ self.started = False
+
+ async def drain(self):
+ """
+ Wait for all games to end.
+ """
+ game_service: GameService = self.services["game_service"]
+ broadcast_service: BroadcastService = self.services["broadcast_service"]
+ try:
+ await asyncio.wait_for(
+ game_service.drain_games(),
+ timeout=config.SHUTDOWN_GRACE_PERIOD
+ )
+ except asyncio.CancelledError:
+ self._logger.debug(
+ "Stopped waiting for games to end due to forced shutdown"
+ )
+ except asyncio.TimeoutError:
+ self._logger.warning(
+ "Graceful shutdown period ended! %s games are still live!",
+ len(game_service.live_games)
+ )
+ finally:
+ # The report_dirties loop is responsible for clearing dirty games
+ # and broadcasting the update messages to players and to RabbitMQ.
+ # We need to wait here for that loop to complete otherwise it is
+ # possible for the services to be shut down inbetween clearing the
+ # games and posting the messages, causing the posts to fail.
+ await broadcast_service.wait_report_dirtes()
+
+ async def _shutdown_services(self):
+ await map_suppress(
+ lambda service: service.shutdown(),
+ self.services.values(),
+ logger=self._logger,
+ msg="when shutting down service "
)
- for result, service in zip(results, self.services.values()):
- if isinstance(result, BaseException):
- self._logger.error(
- "Unexpected error when shutting down service %s",
- service
- )
- results = await asyncio.gather(
- *(ctx.shutdown() for ctx in self.contexts),
- return_exceptions=True
+ async def _stop_contexts(self):
+ await map_suppress(
+ lambda ctx: ctx.stop(),
+ self.contexts,
+ logger=self._logger,
+ msg="when stopping context "
)
- for result, ctx in zip(results, self.contexts):
- if isinstance(result, BaseException):
- self._logger.error(
- "Unexpected error when shutting down context %s",
- ctx
- )
- self.contexts.clear()
- self.started = False
+ async def _shutdown_contexts(self):
+ await map_suppress(
+ lambda ctx: ctx.shutdown(),
+ self.contexts,
+ logger=self._logger,
+ msg="when shutting down context "
+ )
Methods
+
+async def drain(self)
+
+
+
Wait for all games to end.
+
+
+Expand source code
+
+
async def drain(self):
+ """
+ Wait for all games to end.
+ """
+ game_service: GameService = self.services["game_service"]
+ broadcast_service: BroadcastService = self.services["broadcast_service"]
+ try:
+ await asyncio.wait_for(
+ game_service.drain_games(),
+ timeout=config.SHUTDOWN_GRACE_PERIOD
+ )
+ except asyncio.CancelledError:
+ self._logger.debug(
+ "Stopped waiting for games to end due to forced shutdown"
+ )
+ except asyncio.TimeoutError:
+ self._logger.warning(
+ "Graceful shutdown period ended! %s games are still live!",
+ len(game_service.live_games)
+ )
+ finally:
+ # The report_dirties loop is responsible for clearing dirty games
+ # and broadcasting the update messages to players and to RabbitMQ.
+ # We need to wait here for that loop to complete otherwise it is
+ # possible for the services to be shut down inbetween clearing the
+ # games and posting the messages, causing the posts to fail.
+ await broadcast_service.wait_report_dirtes()
+
+
+
+async def graceful_shutdown(self)
+
+
+
Start a graceful shut down of the server.
+
+
Notify all services of graceful shutdown
+
+
+
+Expand source code
+
+
async def graceful_shutdown(self):
+ """
+ Start a graceful shut down of the server.
+
+ 1. Notify all services of graceful shutdown
+ """
+ self._logger.info("Initiating graceful shutdown")
+
+ await map_suppress(
+ lambda service: service.graceful_shutdown(),
+ self.services.values(),
+ logger=self._logger,
+ msg="when starting graceful shutdown of service "
+ )
Static meta information about the container/process
+
+
+Expand source code
+
+
"""
+Static meta information about the container/process
+"""
+
+import os
+import platform
+
+PYTHON_VERSION = platform.python_version()
+
+# Environment variables
+VERSION = os.getenv("VERSION") or "dev"
+CONTAINER_NAME = os.getenv("CONTAINER_NAME") or "faf-python-server"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/ladder_service/index.html b/ladder_service/index.html
index 67eae1ad7..5063e09fd 100644
--- a/ladder_service/index.html
+++ b/ladder_service/index.html
@@ -106,6 +106,7 @@
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -751,9 +755,15 @@
Classes
if player in self._informed_players:
self._informed_players.remove(player)
- async def shutdown(self):
+ async def graceful_shutdown(self):
+ self._allow_new_searches = False
+
for queue in self.queues.values():
- queue.shutdown()
+ queue.shutdown()
+
+ for player, searches in self._searches.items():
+ for queue_name in list(searches.keys()):
+ self._cancel_search(player, queue_name)
Ancestors
@@ -1155,6 +1165,9 @@
Methods
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -1320,6 +1333,7 @@
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -744,10 +749,16 @@
Module server.ladder_service.ladder_service
if player in self._informed_players:
self._informed_players.remove(player)
- async def shutdown(self):
+ async def graceful_shutdown(self):
+ self._allow_new_searches = False
+
for queue in self.queues.values():
queue.shutdown()
+ for player, searches in self._searches.items():
+ for queue_name in list(searches.keys()):
+ self._cancel_search(player, queue_name)
+
class NotConnectedError(asyncio.TimeoutError):
def __init__(self, players: list[Player]):
@@ -794,6 +805,7 @@
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -1439,9 +1454,15 @@
Classes
if player in self._informed_players:
self._informed_players.remove(player)
- async def shutdown(self):
+ async def graceful_shutdown(self):
+ self._allow_new_searches = False
+
for queue in self.queues.values():
- queue.shutdown()
+ queue.shutdown()
+
+ for player, searches in self._searches.items():
+ for queue_name in list(searches.keys()):
+ self._cancel_search(player, queue_name)
Ancestors
@@ -1843,6 +1864,9 @@
Methods
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
+ if not self._allow_new_searches:
+ raise DisabledError()
+
timeouts = self.violation_service.get_violations(players)
if timeouts:
self._logger.debug("timeouts: %s", timeouts)
@@ -2008,6 +2032,7 @@
async def on_connection_made(self, protocol: Protocol, peername: Address):
self.protocol = protocol
self.peer_address = peername
+ self._timeout_task = asyncio.create_task(self.timeout_login())
metrics.server_connections.inc()
+ async def timeout_login(self):
+ with contextlib.suppress(asyncio.CancelledError):
+ await asyncio.sleep(config.LOGIN_TIMEOUT)
+ if not self._authenticated:
+ await self.abort("Client took too long to log in.")
+
async def abort(self, logspam=""):
self._authenticated = False
@@ -196,37 +209,45 @@
Module server.lobbyconnection
handler = getattr(self, f"command_{cmd}")
await handler(message)
- except AuthenticationError as ex:
- metrics.user_logins.labels("failure", ex.method).inc()
+ except AuthenticationError as e:
+ metrics.user_logins.labels("failure", e.method).inc()
await self.send({
"command": "authentication_failed",
- "text": ex.message
+ "text": e.message
})
- except BanError as ex:
+ except BanError as e:
await self.send({
"command": "notice",
"style": "error",
- "text": ex.message()
+ "text": e.message()
})
- await self.abort(ex.message())
- except ClientError as ex:
- self._logger.warning("Client error: %s", ex.message)
+ await self.abort(e.message())
+ except ClientError as e:
+ self._logger.warning("Client error: %s", e.message)
await self.send({
"command": "notice",
"style": "error",
- "text": ex.message
+ "text": e.message
})
- if not ex.recoverable:
- await self.abort(ex.message)
- except (KeyError, ValueError) as ex:
- self._logger.exception(ex)
+ if not e.recoverable:
+ await self.abort(e.message)
+ except (KeyError, ValueError) as e:
+ self._logger.exception(e)
await self.abort(f"Garbage command: {message}")
except ConnectionError as e:
# Propagate connection errors to the ServerContext error handler.
raise e
- except Exception as ex: # pragma: no cover
+ except DisabledError:
+ # TODO: Respond with correlation uid for original message
+ await self.send({"command": "disabled", "request": cmd})
+ self._logger.info(
+ "Ignoring disabled command for %s: %s",
+ self.get_user_identifier(),
+ cmd
+ )
+ except Exception as e: # pragma: no cover
await self.send({"command": "invalid"})
- self._logger.exception(ex)
+ self._logger.exception(e)
await self.abort("Error processing command")
async def command_ping(self, msg):
@@ -269,7 +290,11 @@
Module server.lobbyconnection
async def command_matchmaker_info(self, message):
await self.send({
"command": "matchmaker_info",
- "queues": [queue.to_dict() for queue in self.ladder_service.queues.values()]
+ "queues": [
+ queue.to_dict()
+ for queue in self.ladder_service.queues.values()
+ if queue.is_running
+ ]
})
async def send_game_list(self):
@@ -348,11 +373,10 @@
Module server.lobbyconnection
"Administrative action: %s closed game for %s",
self.player, player
)
- with contextlib.suppress(DisconnectedError):
- await player.send_message({
- "command": "notice",
- "style": "kill",
- })
+ player.write_message({
+ "command": "notice",
+ "style": "kill",
+ })
elif action == "closelobby":
if await self.player_service.has_permission_role(
@@ -1260,6 +1284,9 @@
Module server.lobbyconnection
return
self.send = nop
+ if self._timeout_task and not self._timeout_task.done():
+ self._timeout_task.cancel()
+
if self.game_connection:
self._logger.debug(
"Lost lobby connection killing game connection for player %s",
@@ -1343,6 +1370,7 @@
async def on_connection_made(self, protocol: Protocol, peername: Address):
self.protocol = protocol
self.peer_address = peername
+ self._timeout_task = asyncio.create_task(self.timeout_login())
metrics.server_connections.inc()
+ async def timeout_login(self):
+ with contextlib.suppress(asyncio.CancelledError):
+ await asyncio.sleep(config.LOGIN_TIMEOUT)
+ if not self._authenticated:
+ await self.abort("Client took too long to log in.")
+
async def abort(self, logspam=""):
self._authenticated = False
@@ -1418,37 +1453,45 @@
Classes
handler = getattr(self, f"command_{cmd}")
await handler(message)
- except AuthenticationError as ex:
- metrics.user_logins.labels("failure", ex.method).inc()
+ except AuthenticationError as e:
+ metrics.user_logins.labels("failure", e.method).inc()
await self.send({
"command": "authentication_failed",
- "text": ex.message
+ "text": e.message
})
- except BanError as ex:
+ except BanError as e:
await self.send({
"command": "notice",
"style": "error",
- "text": ex.message()
+ "text": e.message()
})
- await self.abort(ex.message())
- except ClientError as ex:
- self._logger.warning("Client error: %s", ex.message)
+ await self.abort(e.message())
+ except ClientError as e:
+ self._logger.warning("Client error: %s", e.message)
await self.send({
"command": "notice",
"style": "error",
- "text": ex.message
+ "text": e.message
})
- if not ex.recoverable:
- await self.abort(ex.message)
- except (KeyError, ValueError) as ex:
- self._logger.exception(ex)
+ if not e.recoverable:
+ await self.abort(e.message)
+ except (KeyError, ValueError) as e:
+ self._logger.exception(e)
await self.abort(f"Garbage command: {message}")
except ConnectionError as e:
# Propagate connection errors to the ServerContext error handler.
raise e
- except Exception as ex: # pragma: no cover
+ except DisabledError:
+ # TODO: Respond with correlation uid for original message
+ await self.send({"command": "disabled", "request": cmd})
+ self._logger.info(
+ "Ignoring disabled command for %s: %s",
+ self.get_user_identifier(),
+ cmd
+ )
+ except Exception as e: # pragma: no cover
await self.send({"command": "invalid"})
- self._logger.exception(ex)
+ self._logger.exception(e)
await self.abort("Error processing command")
async def command_ping(self, msg):
@@ -1491,7 +1534,11 @@
Classes
async def command_matchmaker_info(self, message):
await self.send({
"command": "matchmaker_info",
- "queues": [queue.to_dict() for queue in self.ladder_service.queues.values()]
+ "queues": [
+ queue.to_dict()
+ for queue in self.ladder_service.queues.values()
+ if queue.is_running
+ ]
})
async def send_game_list(self):
@@ -1570,11 +1617,10 @@
Classes
"Administrative action: %s closed game for %s",
self.player, player
)
- with contextlib.suppress(DisconnectedError):
- await player.send_message({
- "command": "notice",
- "style": "kill",
- })
+ player.write_message({
+ "command": "notice",
+ "style": "kill",
+ })
elif action == "closelobby":
if await self.player_service.has_permission_role(
@@ -2482,6 +2528,9 @@
Classes
return
self.send = nop
+ if self._timeout_task and not self._timeout_task.done():
+ self._timeout_task.cancel()
+
if self.game_connection:
self._logger.debug(
"Lost lobby connection killing game connection for player %s",
@@ -2741,11 +2790,10 @@
Methods
"Administrative action: %s closed game for %s",
self.player, player
)
- with contextlib.suppress(DisconnectedError):
- await player.send_message({
- "command": "notice",
- "style": "kill",
- })
+ player.write_message({
+ "command": "notice",
+ "style": "kill",
+ })
elif action == "closelobby":
if await self.player_service.has_permission_role(
@@ -3309,7 +3357,11 @@
Methods
async def command_matchmaker_info(self, message):
await self.send({
"command": "matchmaker_info",
- "queues": [queue.to_dict() for queue in self.ladder_service.queues.values()]
+ "queues": [
+ queue.to_dict()
+ for queue in self.ladder_service.queues.values()
+ if queue.is_running
+ ]
})
@@ -3666,6 +3718,9 @@
Methods
return
self.send = nop
+ if self._timeout_task and not self._timeout_task.done():
+ self._timeout_task.cancel()
+
if self.game_connection:
self._logger.debug(
"Lost lobby connection killing game connection for player %s",
@@ -3686,6 +3741,7 @@
handler = getattr(self, f"command_{cmd}")
await handler(message)
- except AuthenticationError as ex:
- metrics.user_logins.labels("failure", ex.method).inc()
+ except AuthenticationError as e:
+ metrics.user_logins.labels("failure", e.method).inc()
await self.send({
"command": "authentication_failed",
- "text": ex.message
+ "text": e.message
})
- except BanError as ex:
+ except BanError as e:
await self.send({
"command": "notice",
"style": "error",
- "text": ex.message()
+ "text": e.message()
})
- await self.abort(ex.message())
- except ClientError as ex:
- self._logger.warning("Client error: %s", ex.message)
+ await self.abort(e.message())
+ except ClientError as e:
+ self._logger.warning("Client error: %s", e.message)
await self.send({
"command": "notice",
"style": "error",
- "text": ex.message
+ "text": e.message
})
- if not ex.recoverable:
- await self.abort(ex.message)
- except (KeyError, ValueError) as ex:
- self._logger.exception(ex)
+ if not e.recoverable:
+ await self.abort(e.message)
+ except (KeyError, ValueError) as e:
+ self._logger.exception(e)
await self.abort(f"Garbage command: {message}")
except ConnectionError as e:
# Propagate connection errors to the ServerContext error handler.
raise e
- except Exception as ex: # pragma: no cover
+ except DisabledError:
+ # TODO: Respond with correlation uid for original message
+ await self.send({"command": "disabled", "request": cmd})
+ self._logger.info(
+ "Ignoring disabled command for %s: %s",
+ self.get_user_identifier(),
+ cmd
+ )
+ except Exception as e: # pragma: no cover
await self.send({"command": "invalid"})
- self._logger.exception(ex)
+ self._logger.exception(e)
await self.abort("Error processing command")
@@ -4011,6 +4075,22 @@
Params
await self.abort(message)
+
+async def timeout_login(self)
+
+
+
+
+
+Expand source code
+
+
async def timeout_login(self):
+ with contextlib.suppress(asyncio.CancelledError):
+ await asyncio.sleep(config.LOGIN_TIMEOUT)
+ if not self._authenticated:
+ await self.abort("Client took too long to log in.")
in the queue.
"""
self._logger.debug("MatchmakerQueue initialized for %s", self.name)
- while self._is_running:
+ while self.is_running:
try:
await self.timer.next_pop()
@@ -718,6 +722,7 @@
in the queue.
"""
self._logger.debug("MatchmakerQueue initialized for %s", self.name)
- while self._is_running:
+ while self.is_running:
try:
await self.timer.next_pop()
@@ -1035,7 +1055,8 @@
Returns
Expand source code
def shutdown(self):
- self._is_running = False
+ self._is_running = False
+ self.timer.cancel()
@@ -1056,7 +1077,10 @@
Returns
"queue_pop_time": datetime.fromtimestamp(
self.timer.next_queue_pop, timezone.utc
).isoformat(),
- "queue_pop_time_delta": self.timer.next_queue_pop - time.time(),
+ "queue_pop_time_delta": round(
+ self.timer.next_queue_pop - time.time(),
+ ndigits=2
+ ),
"num_players": self.num_players,
"boundary_80s": [search.boundary_80 for search in self._queue.keys()],
"boundary_75s": [search.boundary_75 for search in self._queue.keys()],
@@ -1113,6 +1137,7 @@
Returns
self._last_queue_pop = time()
# Optimistically schedule first pop for half of the max pop time
self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2)
+ self._wait_task = None
async def next_pop(self):
""" Wait for the timer to pop. """
@@ -1120,7 +1145,10 @@
in the queue.
"""
self._logger.debug("MatchmakerQueue initialized for %s", self.name)
- while self._is_running:
+ while self.is_running:
try:
await self.timer.next_pop()
@@ -296,6 +300,7 @@
in the queue.
"""
self._logger.debug("MatchmakerQueue initialized for %s", self.name)
- while self._is_running:
+ while self.is_running:
try:
await self.timer.next_pop()
@@ -567,6 +579,7 @@
in the queue.
"""
self._logger.debug("MatchmakerQueue initialized for %s", self.name)
- while self._is_running:
+ while self.is_running:
try:
await self.timer.next_pop()
@@ -884,7 +912,8 @@
Returns
Expand source code
def shutdown(self):
- self._is_running = False
+ self._is_running = False
+ self.timer.cancel()
@@ -905,7 +934,10 @@
Returns
"queue_pop_time": datetime.fromtimestamp(
self.timer.next_queue_pop, timezone.utc
).isoformat(),
- "queue_pop_time_delta": self.timer.next_queue_pop - time.time(),
+ "queue_pop_time_delta": round(
+ self.timer.next_queue_pop - time.time(),
+ ndigits=2
+ ),
"num_players": self.num_players,
"boundary_80s": [search.boundary_80 for search in self._queue.keys()],
"boundary_75s": [search.boundary_75 for search in self._queue.keys()],
@@ -970,6 +1002,7 @@
self._last_queue_pop = time()
# Optimistically schedule first pop for half of the max pop time
self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2)
+ self._wait_task = None
async def next_pop(self):
""" Wait for the timer to pop. """
@@ -69,7 +70,10 @@
self._last_queue_pop = time()
# Optimistically schedule first pop for half of the max pop time
self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2)
+ self._wait_task = None
async def next_pop(self):
""" Wait for the timer to pop. """
@@ -173,7 +182,10 @@
from trueskill import Rating
import server.metrics as metrics
+from server.config import config
from server.db import FAFDatabase
from server.decorators import with_logger
-from server.players import Player
+from server.players import Player, PlayerState
from server.rating import RatingType
+from server.timing import at_interval
from .core import Service
from .db.models import (
@@ -294,16 +296,20 @@
Module server.player_service
)
self.uniqueid_exempt = frozenset(map(lambda x: x[0], result))
- async def shutdown(self):
- for player in self:
- if player.lobby_connection is not None:
- with contextlib.suppress(Exception):
- player.lobby_connection.write_warning(
- "The server has been shut down for maintenance, "
- "but should be back online soon. If you experience any "
- "problems, please restart your client. <br/><br/>"
- "We apologize for this interruption."
- )
+ async def kick_idle_players(self):
+ for fut in asyncio.as_completed([
+ player.lobby_connection.abort("Graceful shutdown.")
+ for player in self.all_players
+ if player.state == PlayerState.IDLE
+ if player.lobby_connection is not None
+ ]):
+ try:
+ await fut
+ except Exception:
+ self._logger.debug(
+ "Error while aborting connection",
+ exc_info=True
+ )
def on_connection_lost(self, conn: "LobbyConnection") -> None:
if not conn.player:
@@ -316,7 +322,11 @@
)
self.uniqueid_exempt = frozenset(map(lambda x: x[0], result))
- async def shutdown(self):
- for player in self:
- if player.lobby_connection is not None:
- with contextlib.suppress(Exception):
- player.lobby_connection.write_warning(
- "The server has been shut down for maintenance, "
- "but should be back online soon. If you experience any "
- "problems, please restart your client. <br/><br/>"
- "We apologize for this interruption."
- )
+ async def kick_idle_players(self):
+ for fut in asyncio.as_completed([
+ player.lobby_connection.abort("Graceful shutdown.")
+ for player in self.all_players
+ if player.state == PlayerState.IDLE
+ if player.lobby_connection is not None
+ ]):
+ try:
+ await fut
+ except Exception:
+ self._logger.debug(
+ "Error while aborting connection",
+ exc_info=True
+ )
def on_connection_lost(self, conn: "LobbyConnection") -> None:
if not conn.player:
@@ -593,7 +607,11 @@
async def kick_idle_players(self):
+ for fut in asyncio.as_completed([
+ player.lobby_connection.abort("Graceful shutdown.")
+ for player in self.all_players
+ if player.state == PlayerState.IDLE
+ if player.lobby_connection is not None
+ ]):
+ try:
+ await fut
+ except Exception:
+ self._logger.debug(
+ "Error while aborting connection",
+ exc_info=True
+ )
self._server.close()
await self._server.wait_closed()
+ async def drain_connections(self):
+ """
+ Wait for all connections to terminate.
+ """
+ if not self.connections:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
def write_broadcast(self, message, validate_fn=lambda _: True):
self.write_broadcast_raw(
self.protocol_class.encode_message(message),
@@ -262,6 +275,14 @@
Module server.servercontext
self.name,
connection.get_user_identifier()
)
+
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self.connections
+ ):
+ self._drain_event.set()
+
metrics.user_connections.labels(
connection.user_agent,
connection.version
@@ -318,6 +339,7 @@
self._server.close()
await self._server.wait_closed()
+ async def drain_connections(self):
+ """
+ Wait for all connections to terminate.
+ """
+ if not self.connections:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
+
def write_broadcast(self, message, validate_fn=lambda _: True):
self.write_broadcast_raw(
self.protocol_class.encode_message(message),
@@ -510,6 +544,14 @@
Classes
self.name,
connection.get_user_identifier()
)
+
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self.connections
+ ):
+ self._drain_event.set()
+
metrics.user_connections.labels(
connection.user_agent,
connection.version
@@ -600,6 +642,28 @@
async def drain_connections(self):
+ """
+ Wait for all connections to terminate.
+ """
+ if not self.connections:
+ return
+
+ if not self._drain_event:
+ self._drain_event = asyncio.Event()
+
+ await self._drain_event.wait()
self.name,
connection.get_user_identifier()
)
+
+ if (
+ self._drain_event is not None
+ and not self._drain_event.is_set()
+ and not self.connections
+ ):
+ self._drain_event.set()
+
metrics.user_connections.labels(
connection.user_agent,
connection.version
@@ -849,6 +921,7 @@