Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#924 sync queues #927

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
766 changes: 766 additions & 0 deletions .editorconfig

Large diffs are not rendered by default.

66 changes: 58 additions & 8 deletions server/ladder_service/ladder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,21 @@
MapPool,
MatchmakerQueue,
OnMatchedCallback,
PopTimer,
Search
)
from server.matchmaker.algorithm.team_matchmaker import TeamMatchMaker
from server.matchmaker.search import Match, are_searches_disjoint
from server.metrics import MatchLaunch
from server.players import Player, PlayerState
from server.types import GameLaunchOptions, Map, NeroxisGeneratedMap


def has_no_overlap(match: Match, matches_tmm_searches: set[Search]):
searches_in_match = set(search for team in match for search in team.get_original_searches())
return are_searches_disjoint(searches_in_match, matches_tmm_searches)


@with_logger
class LadderService(Service):
"""
Expand All @@ -63,17 +71,59 @@ def __init__(
game_service: GameService,
violation_service: ViolationService,
):
self._is_running = True
self._db = database
self._informed_players: set[Player] = set()
self.game_service = game_service
self.queues = {}
self.violation_service = violation_service

self._searches: dict[Player, dict[str, Search]] = defaultdict(dict)
self.timer = None
self.matchmaker = TeamMatchMaker()
self.timer = PopTimer()

async def initialize(self) -> None:
await self.update_data()
self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
await self._initialize_pop_timer()

async def _initialize_pop_timer(self) -> None:
self.timer.queues = list(self.queues.values())
asyncio.create_task(self._queue_pop_timer())

async def _queue_pop_timer(self) -> None:
""" Periodically tries to match all Searches in the queue. The amount
of time until next queue 'pop' is determined by the number of players
in the queue.
"""
self._logger.debug("MatchmakerQueue pop timer initialized")
while self._is_running:
try:
await self.timer.next_pop()
await self._queue_pop_iteration()

except asyncio.CancelledError:
break
except Exception:
self._logger.exception(
"Unexpected error during queue pop timer loop!"
)
# To avoid potential busy loops
await asyncio.sleep(1)
self._logger.error("popping queues stopped")

async def _queue_pop_iteration(self):
possible_games = list()
for queue in self.queues.values():
possible_games += await queue.find_matches()
matches_tmm = self.matchmaker.pick_noncolliding_games(possible_games)
await self._found_matches(matches_tmm)

async def _found_matches(self, matches: list[Match]):
for queue in self.queues.values():
await queue.found_matches([match for match in matches if match[0].queue == queue])
self.game_service.mark_dirty(queue)

async def update_data(self) -> None:
async with self._db.acquire() as conn:
Expand All @@ -83,8 +133,10 @@ async def update_data(self) -> None:
for name, info in db_queues.items():
if name not in self.queues:
queue = MatchmakerQueue(
self.game_service,
self.on_match_found,
game_service=self.game_service,
on_match_found=self.on_match_found,
timer=self.timer,
matchmaker=self.matchmaker,
name=name,
queue_id=info["id"],
featured_mod=info["mod"],
Expand All @@ -93,7 +145,6 @@ async def update_data(self) -> None:
params=info.get("params")
)
self.queues[name] = queue
queue.initialize()
else:
queue = self.queues[name]
queue.featured_mod = info["mod"]
Expand All @@ -118,7 +169,6 @@ async def update_data(self) -> None:
# Remove queues that don't exist anymore
for queue_name in list(self.queues.keys()):
if queue_name not in db_queues:
self.queues[queue_name].shutdown()
del self.queues[queue_name]

async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]:
Expand Down Expand Up @@ -323,9 +373,10 @@ def start_search(

queue = self.queues[queue_name]
search = Search(
players,
players=players,
rating_type=queue.rating_type,
on_matched=on_matched
on_matched=on_matched,
queue=queue
)

for player in players:
Expand Down Expand Up @@ -723,8 +774,7 @@ def on_connection_lost(self, conn: "LobbyConnection") -> None:
self._informed_players.remove(player)

async def shutdown(self):
for queue in self.queues.values():
queue.shutdown()
self._is_running = False


class NotConnectedError(asyncio.TimeoutError):
Expand Down
263 changes: 0 additions & 263 deletions server/matchmaker/algorithm/bucket_teams.py
Original file line number Diff line number Diff line change
@@ -1,263 +0,0 @@
import itertools
import random
from collections import OrderedDict
from typing import Iterable, Iterator, Optional, TypeVar

from ...decorators import with_logger
from ..search import CombinedSearch, Match, Search
from .stable_marriage import Matchmaker, StableMarriageMatchmaker, avg_mean

T = TypeVar("T")
Buckets = dict[Search, list[tuple[Search, float]]]


@with_logger
class BucketTeamMatchmaker(Matchmaker):
"""
Uses heuristics to group searches of any size
into CombinedSearches of team_size
and then runs StableMarriageMatchmaker
to produce a list of matches from these.
"""

def find(
self, searches: Iterable[Search], team_size: int, rating_peak: float
) -> tuple[list[Match], list[Search]]:
teams, searches_without_team = self._find_teams(searches, team_size)

matchmaker1v1 = StableMarriageMatchmaker()
matches, unmatched_searches = matchmaker1v1.find(teams, 1, rating_peak)

unmatched_searches.extend(searches_without_team)
return matches, unmatched_searches

@staticmethod
def _find_teams(
searches: Iterable[Search], team_size: int
) -> tuple[list[Search], list[Search]]:
full_teams = []
unmatched = searches
need_team = []
for search in unmatched:
if len(search.players) == team_size:
full_teams.append(search)
else:
need_team.append(search)

if all(len(s.players) == 1 for s in need_team):
teams, unmatched = _make_teams_from_single(need_team, team_size)
else:
teams, unmatched = _make_teams(need_team, team_size)
full_teams.extend(teams)

return full_teams, unmatched


def _make_teams_from_single(
searches: list[Search], size: int
) -> tuple[list[Search], list[Search]]:
"""
Make teams in the special case where all players are solo queued (no
parties).

Tries to put players of similar skill on the same team as long as there are
enough such players to form at least 2 teams. If there are not enough
similar players for two teams, then distributes similarly rated players
accross different teams.

# Algorithm
1. Group players into "buckets" by rating. This is a sort of heuristic for
determining which players have similar rating.
2. Create as many games as possible within each bucket.
3. Create games from remaining players by balancing teams with players from
different buckets.
"""
assert all(len(s.players) == 1 for s in searches)

# Make buckets
buckets = _make_buckets(searches)
remaining: list[tuple[Search, float]] = []

new_searches: list[Search] = []
# Match up players within buckets
for bucket in buckets.values():
# Always produce an even number of teams
num_groups = len(bucket) // (size * 2)
num_teams = num_groups * 2
num_players = num_teams * size

selected = random.sample(bucket, num_players)
# TODO: Optimize?
remaining.extend(s for s in bucket if s not in selected)
# Sort by trueskill mean
selected.sort(key=lambda item: item[1])
new_searches.extend(_distribute(selected, size))

# Match up players accross buckets
remaining.sort(key=lambda item: item[1])
while len(remaining) >= size:
if len(remaining) >= 2 * size:
# enough for at least 2 teams
selected = remaining[: 2 * size]
new_searches.extend(_distribute(selected, size))
else:
selected = remaining[:size]
new_searches.append(CombinedSearch(*[s for s, m in selected]))

remaining = [item for item in remaining if item not in selected]

return new_searches, [search for search, _ in remaining]


def _make_buckets(searches: list[Search]) -> Buckets:
"""
Group players together by similar rating.

# Algorithm
1. Choose a random player as the "pivot".
2. Find all players with rating within 100 pts of this player and place
them in a bucket.
3. Repeat with remaining players.
"""
remaining = list(map(lambda s: (s, avg_mean(s)), searches))
buckets: Buckets = {}

while remaining:
# Choose a pivot
pivot, mean = random.choice(remaining)
low, high = mean - 100, mean + 100

# Partition remaining based on how close their means are
bucket, not_bucket = [], []
for item in remaining:
(_, other_mean) = item
if low <= other_mean <= high:
bucket.append(item)
else:
not_bucket.append(item)

buckets[pivot] = bucket
remaining = not_bucket

return buckets


def _distribute(
items: list[tuple[Search, float]], team_size: int
) -> Iterator[CombinedSearch]:
"""
Distributes a sorted list into teams of a given size in a balanced manner.
Player "skill" is determined by their position in the list.

For example (using numbers to represent list positions)
```
_distribute([1,2,3,4], 2) == [[1,4], [2,3]]
```
In this simple scenario, one team gets the best and the worst player and
the other player gets the two in the middle. This is the only way of
distributing these 4 items into 2 teams such that there is no obviously
favored team.
"""
num_teams = len(items) // team_size
teams: list[list[Search]] = [[] for _ in range(num_teams)]
half = len(items) // 2
# Rotate the second half of the list
rotated = items[:half] + rotate(items[half:], half // 2)
for i, (search, _) in enumerate(rotated):
# Distribute the pairs to the appropriate team
teams[i % num_teams].append(search)

return (CombinedSearch(*team) for team in teams)


def _make_teams(searches: list[Search], size: int) -> tuple[list[Search], list[Search]]:
"""
Tries to group as many searches together into teams of the given size as
possible. Returns the new grouped searches, and the remaining searches that
were not succesfully grouped.

Does not try to balance teams so it should be used only as a last resort.
"""

searches_by_size = _make_searches_by_size(searches)

new_searches = []
for search in searches:
if len(search.players) > size:
continue

new_search = _make_team_for_search(search, searches_by_size, size)
if new_search:
new_searches.append(new_search)

return new_searches, list(itertools.chain(*searches_by_size.values()))


def _make_searches_by_size(searches: list[Search]) -> dict[int, set[Search]]:
"""
Creates a lookup table indexed by number of players in the search.
"""

searches_by_size: dict[int, set[Search]] = OrderedDict()

# Would be easier with defaultdict, but we want to preserve key order
for search in searches:
size = len(search.players)
if size not in searches_by_size:
searches_by_size[size] = set()
searches_by_size[size].add(search)

return searches_by_size


def _make_team_for_search(
search: Search, searches_by_size: dict[int, set[Search]], size: int
) -> Optional[Search]:
"""
Match this search with other searches to create a new team of `size`
members.
"""

num_players = len(search.players)
if search not in searches_by_size[num_players]:
return None
searches_by_size[num_players].remove(search)

if num_players == size:
return search

num_needed = size - num_players
try_size = num_needed
new_search = search
while num_needed > 0:
if try_size == 0:
_uncombine(new_search, searches_by_size)
return None

try:
other = searches_by_size[try_size].pop()
new_search = CombinedSearch(new_search, other)
num_needed -= try_size
try_size = num_needed
except KeyError:
try_size -= 1

return new_search


def _uncombine(search: Search, searches_by_size: dict[int, set[Search]]) -> None:
"""
Adds all of the searches in search back to their respective spots in
`searches_by_size`.
"""

if not isinstance(search, CombinedSearch):
searches_by_size[len(search.players)].add(search)
return

for s in search.searches:
_uncombine(s, searches_by_size)


def rotate(list_: list[T], amount: int) -> list[T]:
return list_[amount:] + list_[:amount]
Loading