diff --git a/asyncio_extensions.html b/asyncio_extensions.html index a216041c0..81a525037 100644 --- a/asyncio_extensions.html +++ b/asyncio_extensions.html @@ -41,8 +41,10 @@

Module server.asyncio_extensions

AsyncContextManager, Callable, Coroutine, + Iterable, Optional, Protocol, + TypeVar, cast, overload ) @@ -51,6 +53,7 @@

Module server.asyncio_extensions

AsyncFunc = Callable[..., Coroutine[Any, Any, Any]] AsyncDecorator = Callable[[AsyncFunc], AsyncFunc] +T = TypeVar("T") class AsyncLock(Protocol, AsyncContextManager["AsyncLock"]): @@ -59,23 +62,24 @@

Module server.asyncio_extensions

def release(self) -> None: ... -async def gather_without_exceptions( - tasks: list[asyncio.Task], - *exceptions: type[BaseException], -) -> list[Any]: - """ - Run coroutines in parallel, raising the first exception that dosen't - match any of the specified exception classes. - """ - results = [] - for fut in asyncio.as_completed(tasks): - try: - results.append(await fut) - except exceptions: - logger.debug( - "Ignoring error in gather_without_exceptions", exc_info=True +async def map_suppress( + func: Callable[[T], Coroutine[Any, Any, Any]], + iterable: Iterable[T], + logger: logging.Logger = logger, + msg: str = "" +): + results = await asyncio.gather( + *(func(item) for item in iterable), + return_exceptions=True + ) + for result, item in zip(results, iterable): + if isinstance(result, BaseException): + logger.exception( + "Unexpected error %s%s", + msg, + item, + exc_info=result ) - return results # Based on python3.8 asyncio.Lock @@ -239,33 +243,33 @@

Module server.asyncio_extensions

Functions

-
-async def gather_without_exceptions(tasks: list[_asyncio.Task], *exceptions: type[BaseException]) ‑> list[typing.Any] +
+async def map_suppress(func: Callable[[~T], Coroutine[Any, Any, Any]], iterable: Iterable[~T], logger: logging.Logger = <Logger server.asyncio_extensions (WARNING)>, msg: str = '')
-

Run coroutines in parallel, raising the first exception that dosen't -match any of the specified exception classes.

+
Expand source code -
async def gather_without_exceptions(
-    tasks: list[asyncio.Task],
-    *exceptions: type[BaseException],
-) -> list[Any]:
-    """
-    Run coroutines in parallel, raising the first exception that dosen't
-    match any of the specified exception classes.
-    """
-    results = []
-    for fut in asyncio.as_completed(tasks):
-        try:
-            results.append(await fut)
-        except exceptions:
-            logger.debug(
-                "Ignoring error in gather_without_exceptions", exc_info=True
-            )
-    return results
+
async def map_suppress(
+    func: Callable[[T], Coroutine[Any, Any, Any]],
+    iterable: Iterable[T],
+    logger: logging.Logger = logger,
+    msg: str = ""
+):
+    results = await asyncio.gather(
+        *(func(item) for item in iterable),
+        return_exceptions=True
+    )
+    for result, item in zip(results, iterable):
+        if isinstance(result, BaseException):
+            logger.exception(
+                "Unexpected error %s%s",
+                msg,
+                item,
+                exc_info=result
+            )
@@ -552,7 +556,7 @@

Index

  • Functions

    diff --git a/broadcast_service.html b/broadcast_service.html index f2f02b8d4..3d11b13ca 100644 --- a/broadcast_service.html +++ b/broadcast_service.html @@ -26,7 +26,10 @@

    Module server.broadcast_service

    Expand source code -
    from aio_pika import DeliveryMode
    +
    import asyncio
    +
    +import humanize
    +from aio_pika import DeliveryMode
     
     from .config import config
     from .core import Service
    @@ -55,13 +58,14 @@ 

    Module server.broadcast_service

    self.message_queue_service = message_queue_service self.game_service = game_service self.player_service = player_service + self._report_dirties_event = None async def initialize(self): # Using a lazy interval timer so that the intervals can be changed # without restarting the server. self._broadcast_dirties_timer = LazyIntervalTimer( lambda: config.DIRTY_REPORT_INTERVAL, - self.report_dirties, + self._monitored_report_dirties, start=True ) self._broadcast_ping_timer = LazyIntervalTimer( @@ -70,6 +74,14 @@

    Module server.broadcast_service

    start=True ) + async def _monitored_report_dirties(self): + event = asyncio.Event() + self._report_dirties_event = event + try: + await self.report_dirties() + finally: + event.set() + async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. @@ -143,7 +155,52 @@

    Module server.broadcast_service

    ) def broadcast_ping(self): - self.server.write_broadcast({"command": "ping"})
    + self.server.write_broadcast({"command": "ping"}) + + async def wait_report_dirtes(self): + """ + Wait for the current report_dirties task to complete. + """ + if self._report_dirties_event is None: + return + + await self._report_dirties_event.wait() + + async def graceful_shutdown(self): + if config.SHUTDOWN_KICK_IDLE_PLAYERS: + message = ( + "If you're in a game you can continue to play, otherwise you " + "will be disconnected. If you aren't reconnected automatically " + "please wait a few minutes and try to connect again." + ) + else: + message = ( + "If you're in a game you can continue to play, however, you " + "will not be able to create any new games until the server has " + "been restarted." + ) + + delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD) + self.server.write_broadcast({ + "command": "notice", + "style": "info", + "text": ( + f"The server will be shutting down for maintenance in {delta}! " + f"{message}" + ) + }) + + async def shutdown(self): + self.server.write_broadcast({ + "command": "notice", + "style": "info", + "text": ( + "The server has been shut down for maintenance " + "but should be back online soon. If you experience any " + "problems, please restart your client. <br/><br/>" + "We apologize for this interruption." + ) + })
  • @@ -182,13 +239,14 @@

    Classes

    self.message_queue_service = message_queue_service self.game_service = game_service self.player_service = player_service + self._report_dirties_event = None async def initialize(self): # Using a lazy interval timer so that the intervals can be changed # without restarting the server. self._broadcast_dirties_timer = LazyIntervalTimer( lambda: config.DIRTY_REPORT_INTERVAL, - self.report_dirties, + self._monitored_report_dirties, start=True ) self._broadcast_ping_timer = LazyIntervalTimer( @@ -197,6 +255,14 @@

    Classes

    start=True ) + async def _monitored_report_dirties(self): + event = asyncio.Event() + self._report_dirties_event = event + try: + await self.report_dirties() + finally: + event.set() + async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. @@ -270,7 +336,52 @@

    Classes

    ) def broadcast_ping(self): - self.server.write_broadcast({"command": "ping"})
    + self.server.write_broadcast({"command": "ping"}) + + async def wait_report_dirtes(self): + """ + Wait for the current report_dirties task to complete. + """ + if self._report_dirties_event is None: + return + + await self._report_dirties_event.wait() + + async def graceful_shutdown(self): + if config.SHUTDOWN_KICK_IDLE_PLAYERS: + message = ( + "If you're in a game you can continue to play, otherwise you " + "will be disconnected. If you aren't reconnected automatically " + "please wait a few minutes and try to connect again." + ) + else: + message = ( + "If you're in a game you can continue to play, however, you " + "will not be able to create any new games until the server has " + "been restarted." + ) + + delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD) + self.server.write_broadcast({ + "command": "notice", + "style": "info", + "text": ( + f"The server will be shutting down for maintenance in {delta}! " + f"{message}" + ) + }) + + async def shutdown(self): + self.server.write_broadcast({ + "command": "notice", + "style": "info", + "text": ( + "The server has been shut down for maintenance " + "but should be back online soon. If you experience any " + "problems, please restart your client. <br/><br/>" + "We apologize for this interruption." + ) + })

    Ancestors

    @@ -109,54 +102,13 @@

    Module server.control

    -

    Functions

    -
    -
    -async def run_control_server(player_service: PlayerService, game_service: GameService) ‑> ControlServer -
    -
    -

    Initialize the http control server

    -
    - -Expand source code - -
    async def run_control_server(
    -    player_service: PlayerService,
    -    game_service: GameService
    -) -> ControlServer:
    -    """
    -    Initialize the http control server
    -    """
    -    host = socket.gethostbyname(socket.gethostname())
    -    port = config.CONTROL_SERVER_PORT
    -
    -    ctrl_server = ControlServer(game_service, player_service, host, port)
    -    await ctrl_server.start()
    -
    -    return ctrl_server
    -
    -
    -
    -def to_dict_list(list_) -
    -
    -
    -
    - -Expand source code - -
    def to_dict_list(list_):
    -    return list(map(lambda p: p.to_dict(), list_))
    -
    -
    -

    Classes

    class ControlServer -(game_service: GameService, player_service: PlayerService, host: str, port: int) +(lobby_server: ServerInstance)
    @@ -168,47 +120,63 @@

    Classes

    class ControlServer: def __init__( self, - game_service: GameService, - player_service: PlayerService, - host: str, - port: int + lobby_server: "ServerInstance", ): - self.game_service = game_service - self.player_service = player_service - self.host = host - self.port = port + self.lobby_server = lobby_server + self.game_service = lobby_server.services["game_service"] + self.player_service = lobby_server.services["player_service"] + self.host = None + self.port = None self.app = web.Application() self.runner = web.AppRunner(self.app) self.app.add_routes([ web.get("/games", self.games), - web.get("/players", self.players) + web.get("/players", self.players), ]) - async def start(self) -> None: + async def run_from_config(self) -> None: + """ + Initialize the http control server + """ + host = socket.gethostbyname(socket.gethostname()) + port = config.CONTROL_SERVER_PORT + + await self.shutdown() + await self.start(host, port) + + async def start(self, host: str, port: int) -> None: + self.host = host + self.port = port await self.runner.setup() - self.site = web.TCPSite(self.runner, self.host, self.port) + self.site = web.TCPSite(self.runner, host, port) await self.site.start() self._logger.info( - "Control server listening on http://%s:%s", self.host, self.port + "Control server listening on http://%s:%s", host, port ) async def shutdown(self) -> None: await self.runner.cleanup() + self.host = None + self.port = None - async def games(self, request): - body = dumps(to_dict_list(self.game_service.all_games)) - return web.Response(body=body.encode(), content_type="application/json") + async def games(self, request) -> web.Response: + return web.json_response([ + game.to_dict() + for game in self.game_service.all_games + ]) - async def players(self, request): - body = dumps(to_dict_list(self.player_service.all_players)) - return web.Response(body=body.encode(), content_type="application/json") + async def players(self, request) -> web.Response: + return web.json_response([ + player.to_dict() + for player in self.player_service.all_players + ])

    Methods

    -async def games(self, request) +async def games(self, request) ‑> aiohttp.web_response.Response
    @@ -216,13 +184,15 @@

    Methods

    Expand source code -
    async def games(self, request):
    -    body = dumps(to_dict_list(self.game_service.all_games))
    -    return web.Response(body=body.encode(), content_type="application/json")
    +
    async def games(self, request) -> web.Response:
    +    return web.json_response([
    +        game.to_dict()
    +        for game in self.game_service.all_games
    +    ])
    -async def players(self, request) +async def players(self, request) ‑> aiohttp.web_response.Response
    @@ -230,9 +200,31 @@

    Methods

    Expand source code -
    async def players(self, request):
    -    body = dumps(to_dict_list(self.player_service.all_players))
    -    return web.Response(body=body.encode(), content_type="application/json")
    +
    async def players(self, request) -> web.Response:
    +    return web.json_response([
    +        player.to_dict()
    +        for player in self.player_service.all_players
    +    ])
    + +
    +
    +async def run_from_config(self) ‑> None +
    +
    +

    Initialize the http control server

    +
    + +Expand source code + +
    async def run_from_config(self) -> None:
    +    """
    +    Initialize the http control server
    +    """
    +    host = socket.gethostbyname(socket.gethostname())
    +    port = config.CONTROL_SERVER_PORT
    +
    +    await self.shutdown()
    +    await self.start(host, port)
    @@ -245,11 +237,13 @@

    Methods

    Expand source code
    async def shutdown(self) -> None:
    -    await self.runner.cleanup()
    + await self.runner.cleanup() + self.host = None + self.port = None
    -async def start(self) ‑> None +async def start(self, host: str, port: int) ‑> None
    @@ -257,12 +251,14 @@

    Methods

    Expand source code -
    async def start(self) -> None:
    +
    async def start(self, host: str, port: int) -> None:
    +    self.host = host
    +    self.port = port
         await self.runner.setup()
    -    self.site = web.TCPSite(self.runner, self.host, self.port)
    +    self.site = web.TCPSite(self.runner, host, port)
         await self.site.start()
         self._logger.info(
    -        "Control server listening on http://%s:%s", self.host, self.port
    +        "Control server listening on http://%s:%s", host, port
         )
    @@ -282,12 +278,6 @@

    Index

  • server
  • -
  • Functions

    - -
  • Classes

    diff --git a/core/index.html b/core/index.html index d31131310..c08265432 100644 --- a/core/index.html +++ b/core/index.html @@ -366,6 +366,17 @@

    Methods

    """ pass # pragma: no cover + async def graceful_shutdown(self) -> None: + """ + Called once after the graceful shutdown period is initiated. + + This signals that the service should stop accepting new events but + continue to wait for existing ones to complete normally. The hook + funciton `shutdown` will be called after the grace period has ended to + fully shutdown the service. + """ + pass # pragma: no cover + async def shutdown(self) -> None: """ Called once after the server received the shutdown signal. @@ -397,6 +408,31 @@

    Subclasses

    Methods

    +
    +async def graceful_shutdown(self) ‑> None +
    +
    +

    Called once after the graceful shutdown period is initiated.

    +

    This signals that the service should stop accepting new events but +continue to wait for existing ones to complete normally. The hook +funciton shutdown will be called after the grace period has ended to +fully shutdown the service.

    +
    + +Expand source code + +
    async def graceful_shutdown(self) -> None:
    +    """
    +    Called once after the graceful shutdown period is initiated.
    +
    +    This signals that the service should stop accepting new events but
    +    continue to wait for existing ones to complete normally. The hook
    +    funciton `shutdown` will be called after the grace period has ended to
    +    fully shutdown the service.
    +    """
    +    pass  # pragma: no cover
    +
    +
    async def initialize(self) ‑> None
    @@ -484,6 +520,7 @@

    Service

      +
    • graceful_shutdown
    • initialize
    • on_connection_lost
    • shutdown
    • diff --git a/core/service.html b/core/service.html index 01f2dde61..2477992be 100644 --- a/core/service.html +++ b/core/service.html @@ -58,6 +58,17 @@

      Module server.core.service

      """ pass # pragma: no cover + async def graceful_shutdown(self) -> None: + """ + Called once after the graceful shutdown period is initiated. + + This signals that the service should stop accepting new events but + continue to wait for existing ones to complete normally. The hook + funciton `shutdown` will be called after the grace period has ended to + fully shutdown the service. + """ + pass # pragma: no cover + async def shutdown(self) -> None: """ Called once after the server received the shutdown signal. @@ -171,6 +182,17 @@

      Classes

      """ pass # pragma: no cover + async def graceful_shutdown(self) -> None: + """ + Called once after the graceful shutdown period is initiated. + + This signals that the service should stop accepting new events but + continue to wait for existing ones to complete normally. The hook + funciton `shutdown` will be called after the grace period has ended to + fully shutdown the service. + """ + pass # pragma: no cover + async def shutdown(self) -> None: """ Called once after the server received the shutdown signal. @@ -202,6 +224,31 @@

      Subclasses

    Methods

    +
    +async def graceful_shutdown(self) ‑> None +
    +
    +

    Called once after the graceful shutdown period is initiated.

    +

    This signals that the service should stop accepting new events but +continue to wait for existing ones to complete normally. The hook +funciton shutdown will be called after the grace period has ended to +fully shutdown the service.

    +
    + +Expand source code + +
    async def graceful_shutdown(self) -> None:
    +    """
    +    Called once after the graceful shutdown period is initiated.
    +
    +    This signals that the service should stop accepting new events but
    +    continue to wait for existing ones to complete normally. The hook
    +    funciton `shutdown` will be called after the grace period has ended to
    +    fully shutdown the service.
    +    """
    +    pass  # pragma: no cover
    +
    +
    async def initialize(self) ‑> None
    @@ -277,6 +324,7 @@

    Index

  • Service

      +
    • graceful_shutdown
    • initialize
    • on_connection_lost
    • shutdown
    • diff --git a/exceptions.html b/exceptions.html index 9ca047cf1..8f1607de4 100644 --- a/exceptions.html +++ b/exceptions.html @@ -85,7 +85,13 @@

      Module server.exceptions

      def __init__(self, message, method, *args, **kwargs): super().__init__(*args, **kwargs) self.message = message - self.method = method
      + self.method = method + + +class DisabledError(Exception): + """ + The operation is disabled due to an impending server shutdown. + """
  • @@ -215,6 +221,27 @@

    Ancestors

  • builtins.BaseException
  • +
    +class DisabledError +(*args, **kwargs) +
    +
    +

    The operation is disabled due to an impending server shutdown.

    +
    + +Expand source code + +
    class DisabledError(Exception):
    +    """
    +    The operation is disabled due to an impending server shutdown.
    +    """
    +
    +

    Ancestors

    +
      +
    • builtins.Exception
    • +
    • builtins.BaseException
    • +
    +
    @@ -243,6 +270,9 @@

    ClientError

    +
  • +

    DisabledError

    +
  • diff --git a/game_service.html b/game_service.html index a2b409dc0..b17168da3 100644 --- a/game_service.html +++ b/game_service.html @@ -31,6 +31,7 @@

    Module server.game_service

    Manages the lifecycle of active games """ +import asyncio from collections import Counter from typing import Optional, Union, ValuesView @@ -44,6 +45,7 @@

    Module server.game_service

    from .db import FAFDatabase from .db.models import game_featuredMods from .decorators import with_logger +from .exceptions import DisabledError from .games import ( CustomGame, FeaturedMod, @@ -81,6 +83,8 @@

    Module server.game_service

    self._rating_service = rating_service self._message_queue_service = message_queue_service self.game_id_counter = 0 + self._allow_new_games = False + self._drain_event = None # Populated below in really_update_static_ish_data. self.featured_mods = dict() @@ -97,6 +101,7 @@

    Module server.game_service

    self._update_cron = aiocron.crontab( "*/10 * * * *", func=self.update_data ) + self._allow_new_games = True async def initialise_game_counter(self): async with self._db.acquire() as conn: @@ -182,6 +187,9 @@

    Module server.game_service

    """ Main entrypoint for creating new games """ + if not self._allow_new_games: + raise DisabledError() + game_id = self.create_uid() game_args = { "database": self._db, @@ -236,10 +244,17 @@

    Module server.game_service

    rating_type_counter[(rating_type, state)] ) + @property + def all_games(self) -> ValuesView[Game]: + return self._games.values() + @property def live_games(self) -> list[Game]: - return [game for game in self._games.values() - if game.state is GameState.LIVE] + return [ + game + for game in self.all_games + if game.state is GameState.LIVE + ] @property def open_games(self) -> list[Game]: @@ -254,22 +269,32 @@

    Module server.game_service

    The client ignores everything "closed". This property fetches all such not-closed games. """ - return [game for game in self._games.values() - if game.state is GameState.LOBBY or game.state is GameState.LIVE] - - @property - def all_games(self) -> ValuesView[Game]: - return self._games.values() + return [ + game + for game in self.all_games + if game.state in (GameState.LOBBY, GameState.LIVE) + ] @property def pending_games(self) -> list[Game]: - return [game for game in self._games.values() - if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING] + return [ + game + for game in self.all_games + if game.state in (GameState.LOBBY, GameState.INITIALIZING) + ] def remove_game(self, game: Game): if game.id in self._games: + self._logger.debug("Removing game %s", game) del self._games[game.id] + if ( + self._drain_event is not None + and not self._drain_event.is_set() + and not self._games + ): + self._drain_event.set() + def __getitem__(self, item: int) -> Game: return self._games[item] @@ -290,7 +315,35 @@

    Module server.game_service

    ): metrics.rated_games.labels(game_results.rating_type).inc() # TODO: Remove when rating service starts listening to message queue - await self._rating_service.enqueue(result_dict) + await self._rating_service.enqueue(result_dict) + + async def drain_games(self): + """ + Wait for all games to finish. + """ + if not self._games: + return + + if not self._drain_event: + self._drain_event = asyncio.Event() + + await self._drain_event.wait() + + async def graceful_shutdown(self): + self._allow_new_games = False + + await self.close_lobby_games() + + async def close_lobby_games(self): + self._logger.info("Closing all games currently in lobby") + for game in self.pending_games: + for game_connection in list(game.connections): + # Tell the client to kill the FA process + game_connection.player.write_message({ + "command": "notice", + "style": "kill" + }) + await game_connection.abort()
    @@ -334,6 +387,8 @@

    Classes

    self._rating_service = rating_service self._message_queue_service = message_queue_service self.game_id_counter = 0 + self._allow_new_games = False + self._drain_event = None # Populated below in really_update_static_ish_data. self.featured_mods = dict() @@ -350,6 +405,7 @@

    Classes

    self._update_cron = aiocron.crontab( "*/10 * * * *", func=self.update_data ) + self._allow_new_games = True async def initialise_game_counter(self): async with self._db.acquire() as conn: @@ -435,6 +491,9 @@

    Classes

    """ Main entrypoint for creating new games """ + if not self._allow_new_games: + raise DisabledError() + game_id = self.create_uid() game_args = { "database": self._db, @@ -489,10 +548,17 @@

    Classes

    rating_type_counter[(rating_type, state)] ) + @property + def all_games(self) -> ValuesView[Game]: + return self._games.values() + @property def live_games(self) -> list[Game]: - return [game for game in self._games.values() - if game.state is GameState.LIVE] + return [ + game + for game in self.all_games + if game.state is GameState.LIVE + ] @property def open_games(self) -> list[Game]: @@ -507,22 +573,32 @@

    Classes

    The client ignores everything "closed". This property fetches all such not-closed games. """ - return [game for game in self._games.values() - if game.state is GameState.LOBBY or game.state is GameState.LIVE] - - @property - def all_games(self) -> ValuesView[Game]: - return self._games.values() + return [ + game + for game in self.all_games + if game.state in (GameState.LOBBY, GameState.LIVE) + ] @property def pending_games(self) -> list[Game]: - return [game for game in self._games.values() - if game.state is GameState.LOBBY or game.state is GameState.INITIALIZING] + return [ + game + for game in self.all_games + if game.state in (GameState.LOBBY, GameState.INITIALIZING) + ] def remove_game(self, game: Game): if game.id in self._games: + self._logger.debug("Removing game %s", game) del self._games[game.id] + if ( + self._drain_event is not None + and not self._drain_event.is_set() + and not self._games + ): + self._drain_event.set() + def __getitem__(self, item: int) -> Game: return self._games[item] @@ -543,7 +619,35 @@

    Classes

    ): metrics.rated_games.labels(game_results.rating_type).inc() # TODO: Remove when rating service starts listening to message queue - await self._rating_service.enqueue(result_dict) + await self._rating_service.enqueue(result_dict) + + async def drain_games(self): + """ + Wait for all games to finish. + """ + if not self._games: + return + + if not self._drain_event: + self._drain_event = asyncio.Event() + + await self._drain_event.wait() + + async def graceful_shutdown(self): + self._allow_new_games = False + + await self.close_lobby_games() + + async def close_lobby_games(self): + self._logger.info("Closing all games currently in lobby") + for game in self.pending_games: + for game_connection in list(game.connections): + # Tell the client to kill the FA process + game_connection.player.write_message({ + "command": "notice", + "style": "kill" + }) + await game_connection.abort()

    Ancestors

    @@ -476,10 +513,18 @@

    Sub-modules

    Manages the GeoIP database

    +
    server.health
    +
    +

    Kubernetes compatible HTTP health check server.

    +
    server.ice_servers

    ICE server configuration

    +
    server.info
    +
    +

    Static meta information about the container/process

    +
    server.ladder_service
    @@ -561,34 +606,6 @@

    Sub-modules

    -

    Functions

    -
    -
    -async def run_control_server(player_service: PlayerService, game_service: GameService) ‑> ControlServer -
    -
    -

    Initialize the http control server

    -
    - -Expand source code - -
    async def run_control_server(
    -    player_service: PlayerService,
    -    game_service: GameService
    -) -> ControlServer:
    -    """
    -    Initialize the http control server
    -    """
    -    host = socket.gethostbyname(socket.gethostname())
    -    port = config.CONTROL_SERVER_PORT
    -
    -    ctrl_server = ControlServer(game_service, player_service, host, port)
    -    await ctrl_server.start()
    -
    -    return ctrl_server
    -
    -
    -

    Classes

    @@ -620,13 +637,14 @@

    Classes

    self.message_queue_service = message_queue_service self.game_service = game_service self.player_service = player_service + self._report_dirties_event = None async def initialize(self): # Using a lazy interval timer so that the intervals can be changed # without restarting the server. self._broadcast_dirties_timer = LazyIntervalTimer( lambda: config.DIRTY_REPORT_INTERVAL, - self.report_dirties, + self._monitored_report_dirties, start=True ) self._broadcast_ping_timer = LazyIntervalTimer( @@ -635,6 +653,14 @@

    Classes

    start=True ) + async def _monitored_report_dirties(self): + event = asyncio.Event() + self._report_dirties_event = event + try: + await self.report_dirties() + finally: + event.set() + async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. @@ -708,7 +734,52 @@

    Classes

    ) def broadcast_ping(self): - self.server.write_broadcast({"command": "ping"}) + self.server.write_broadcast({"command": "ping"}) + + async def wait_report_dirtes(self): + """ + Wait for the current report_dirties task to complete. + """ + if self._report_dirties_event is None: + return + + await self._report_dirties_event.wait() + + async def graceful_shutdown(self): + if config.SHUTDOWN_KICK_IDLE_PLAYERS: + message = ( + "If you're in a game you can continue to play, otherwise you " + "will be disconnected. If you aren't reconnected automatically " + "please wait a few minutes and try to connect again." + ) + else: + message = ( + "If you're in a game you can continue to play, however, you " + "will not be able to create any new games until the server has " + "been restarted." + ) + + delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD) + self.server.write_broadcast({ + "command": "notice", + "style": "info", + "text": ( + f"The server will be shutting down for maintenance in {delta}! " + f"{message}" + ) + }) + + async def shutdown(self): + self.server.write_broadcast({ + "command": "notice", + "style": "info", + "text": ( + "The server has been shut down for maintenance " + "but should be back online soon. If you experience any " + "problems, please restart your client. <br/><br/>" + "We apologize for this interruption." + ) + })

    Ancestors

      @@ -814,11 +885,31 @@

      Methods

      )
    +
    +async def wait_report_dirtes(self) +
    +
    +

    Wait for the current report_dirties task to complete.

    +
    + +Expand source code + +
    async def wait_report_dirtes(self):
    +    """
    +    Wait for the current report_dirties task to complete.
    +    """
    +    if self._report_dirties_event is None:
    +        return
    +
    +    await self._report_dirties_event.wait()
    +
    +

    Inherited members

    @@ -166,6 +174,7 @@

    Classes

    self._last_queue_pop = time() # Optimistically schedule first pop for half of the max pop time self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2) + self._wait_task = None async def next_pop(self): """ Wait for the timer to pop. """ @@ -173,7 +182,10 @@

    Classes

    time_remaining = self.next_queue_pop - time() self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining) metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining)) - await asyncio.sleep(time_remaining) + + self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining)) + await self._wait_task + num_players = self.queue.num_players metrics.matchmaker_players.labels(self.queue.name).set(num_players) @@ -212,10 +224,28 @@

    Classes

    next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MAX ) return config.QUEUE_POP_TIME_MAX - return next_pop_time + return next_pop_time + + def cancel(self): + if self._wait_task: + self._wait_task.cancel()

    Methods

    +
    +def cancel(self) +
    +
    +
    +
    + +Expand source code + +
    def cancel(self):
    +    if self._wait_task:
    +        self._wait_task.cancel()
    +
    +
    async def next_pop(self)
    @@ -231,7 +261,10 @@

    Methods

    time_remaining = self.next_queue_pop - time() self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining) metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining)) - await asyncio.sleep(time_remaining) + + self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining)) + await self._wait_task + num_players = self.queue.num_players metrics.matchmaker_players.labels(self.queue.name).set(num_players) @@ -305,6 +338,7 @@

    Index

  • PopTimer

    diff --git a/message_queue_service.html b/message_queue_service.html index 51da20530..1330f19c1 100644 --- a/message_queue_service.html +++ b/message_queue_service.html @@ -564,6 +564,7 @@

    Inherited members

    • Service:
        +
      • graceful_shutdown
      • initialize
      • on_connection_lost
      • shutdown
      • diff --git a/oauth_service.html b/oauth_service.html index c814b3cba..2e0de3936 100644 --- a/oauth_service.html +++ b/oauth_service.html @@ -333,6 +333,7 @@

        Inherited members

        • Service:
            +
          • graceful_shutdown
          • initialize
          • on_connection_lost
          • shutdown
          • diff --git a/party_service.html b/party_service.html index 851c56f15..2e98a5b77 100644 --- a/party_service.html +++ b/party_service.html @@ -676,6 +676,7 @@

            Inherited members

            • Service:
                +
              • graceful_shutdown
              • initialize
              • on_connection_lost
              • shutdown
              • diff --git a/player_service.html b/player_service.html index aed7794c9..e3419638d 100644 --- a/player_service.html +++ b/player_service.html @@ -31,7 +31,7 @@

                Module server.player_service

                Manages connected and authenticated players """ -import contextlib +import asyncio from typing import Optional, ValuesView import aiocron @@ -39,10 +39,12 @@

                Module server.player_service

                from trueskill import Rating import server.metrics as metrics +from server.config import config from server.db import FAFDatabase from server.decorators import with_logger -from server.players import Player +from server.players import Player, PlayerState from server.rating import RatingType +from server.timing import at_interval from .core import Service from .db.models import ( @@ -294,16 +296,20 @@

                Module server.player_service

                ) self.uniqueid_exempt = frozenset(map(lambda x: x[0], result)) - async def shutdown(self): - for player in self: - if player.lobby_connection is not None: - with contextlib.suppress(Exception): - player.lobby_connection.write_warning( - "The server has been shut down for maintenance, " - "but should be back online soon. If you experience any " - "problems, please restart your client. <br/><br/>" - "We apologize for this interruption." - ) + async def kick_idle_players(self): + for fut in asyncio.as_completed([ + player.lobby_connection.abort("Graceful shutdown.") + for player in self.all_players + if player.state == PlayerState.IDLE + if player.lobby_connection is not None + ]): + try: + await fut + except Exception: + self._logger.debug( + "Error while aborting connection", + exc_info=True + ) def on_connection_lost(self, conn: "LobbyConnection") -> None: if not conn.player: @@ -316,7 +322,11 @@

                Module server.player_service

                conn.player.id, conn.player.login, conn.session - ) + ) + + async def graceful_shutdown(self): + if config.SHUTDOWN_KICK_IDLE_PLAYERS: + self._kick_idle_task = at_interval(1, self.kick_idle_players)
  • @@ -571,16 +581,20 @@

    Classes

    ) self.uniqueid_exempt = frozenset(map(lambda x: x[0], result)) - async def shutdown(self): - for player in self: - if player.lobby_connection is not None: - with contextlib.suppress(Exception): - player.lobby_connection.write_warning( - "The server has been shut down for maintenance, " - "but should be back online soon. If you experience any " - "problems, please restart your client. <br/><br/>" - "We apologize for this interruption." - ) + async def kick_idle_players(self): + for fut in asyncio.as_completed([ + player.lobby_connection.abort("Graceful shutdown.") + for player in self.all_players + if player.state == PlayerState.IDLE + if player.lobby_connection is not None + ]): + try: + await fut + except Exception: + self._logger.debug( + "Error while aborting connection", + exc_info=True + ) def on_connection_lost(self, conn: "LobbyConnection") -> None: if not conn.player: @@ -593,7 +607,11 @@

    Classes

    conn.player.id, conn.player.login, conn.session - ) + ) + + async def graceful_shutdown(self): + if config.SHUTDOWN_KICK_IDLE_PLAYERS: + self._kick_idle_task = at_interval(1, self.kick_idle_players)

    Ancestors