From c2ee6bff5bddf723252f5c6190d2643fa4bd326e Mon Sep 17 00:00:00 2001 From: Magic <82341152+MagicTheDev@users.noreply.github.com> Date: Mon, 29 Jan 2024 00:26:35 -0600 Subject: [PATCH] ClashKing v4 --- requirements.txt | 1 + tracking/player/config.py | 20 + tracking/player/constants.py | 3 - tracking/player/main.py | 22 + tracking/player/tracking.py | 954 +++++++++++++++-------------------- tracking/player/utils.py | 86 ++++ utility/components.py | 1 - 7 files changed, 541 insertions(+), 546 deletions(-) create mode 100644 tracking/player/config.py delete mode 100644 tracking/player/constants.py create mode 100644 tracking/player/main.py diff --git a/requirements.txt b/requirements.txt index 3af72fb1..3a4741e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ emoji==1.6.3 expiring-dict==1.1.0 ipython==8.13.0 kafka-python==2.0.2 +loguru==0.7.2 matplotlib==3.7.1 motor==3.3.2 msgspec==0.15.1 diff --git a/tracking/player/config.py b/tracking/player/config.py new file mode 100644 index 00000000..42fd6297 --- /dev/null +++ b/tracking/player/config.py @@ -0,0 +1,20 @@ +from os import getenv +from dotenv import load_dotenv +from dataclasses import dataclass +load_dotenv() + + +@dataclass(frozen=True, slots=True) +class Config: + min_coc_email = 1 + max_coc_email = 12 + coc_password = getenv("COC_PASSWORD") + + secondary_loop_change = 15 + tertiary_loop_change = 150 + max_tag_split = 50_000 + + static_mongodb = getenv("STATIC_MONGODB") + stats_mongodb = getenv("STATS_MONGODB") + redis_ip = getenv("REDIS_IP") + redis_pw = getenv("REDIS_PW") diff --git a/tracking/player/constants.py b/tracking/player/constants.py deleted file mode 100644 index 24a165d2..00000000 --- a/tracking/player/constants.py +++ /dev/null @@ -1,3 +0,0 @@ - -SECONDARY_LOOP_CHANGE = 15 -TERTIARY_LOOP_CHANGE = 150 \ No newline at end of file diff --git a/tracking/player/main.py b/tracking/player/main.py new file mode 100644 index 00000000..4c42ba20 --- /dev/null +++ b/tracking/player/main.py @@ -0,0 +1,22 @@ +import motor.motor_asyncio +import asyncio + +from collections import deque +from redis import asyncio as redis +from .config import Config +from .tracking import main +from ..utils import create_keys + + +if __name__ == '__main__': + config = Config() + loop = asyncio.get_event_loop() + + stats_mongo_client = motor.motor_asyncio.AsyncIOMotorClient(config.stats_mongodb) + static_mongo_client = motor.motor_asyncio.AsyncIOMotorClient(config.static_mongodb) + + redis_host = redis.Redis(host=config.redis_ip, port=6379, db=0, password=config.redis_pw, decode_responses=False, max_connections=2500) + keys = create_keys([f"apiclashofclans+test{x}@gmail.com" for x in range(config.min_coc_email, config.max_coc_email + 1)], [config.coc_password] * config.max_coc_email) + keys = deque(keys) + loop.create_task(main(keys=keys, cache=redis_host, stats_mongo_client=stats_mongo_client, static_mongo_client=static_mongo_client)) + loop.run_forever() diff --git a/tracking/player/tracking.py b/tracking/player/tracking.py index b1f04840..a64ecdab 100644 --- a/tracking/player/tracking.py +++ b/tracking/player/tracking.py @@ -1,543 +1,460 @@ -from typing import List, Dict, Tuple, Union -from pydantic import BaseModel -from base64 import b64decode as base64_b64decode -from json import loads as json_loads -from datetime import datetime -from dotenv import load_dotenv -from msgspec.json import decode -from msgspec import Struct -from pymongo import UpdateOne, InsertOne -from datetime import timedelta - -from os import getenv -from aiomultiprocess import Worker -from expiring_dict import ExpiringDict -from redis.commands.json.path import Path - -from kafka import KafkaProducer - -import ujson -import coc -import fastapi -import motor.motor_asyncio -import collections -import time +from collections import deque +from redis import asyncio as redis import aiohttp import asyncio -import pytz -import redis -from redis import asyncio as redis +import pendulum as pend +import ujson +import time import snappy -from ..utils import create_keys +from msgspec.msgpack import decode +from loguru import logger +from .utils import Player, get_player_changes, gen_legend_date, gen_season_date, gen_raid_date, gen_games_season + +from pymongo import InsertOne, UpdateOne +from .config import Config + + +async def get_clan_member_tags(clan_db, keys: deque): + clan_tags = await clan_db.distinct("tag") + + tasks = [] + connector = aiohttp.TCPConnector(limit=250) + async with aiohttp.ClientSession(connector=connector) as session: + for tag in clan_tags: + headers = {"Authorization": f"Bearer {keys[0]}"} + tag = tag.replace("#", "%23") + url = f"https://api.clashofclans.com/v1/clans/{tag}" + keys.rotate(1) + + async def fetch(url, session, headers): + async with session.get(url, headers=headers) as response: + try: + clan = await response.json() + return clan + except: + return None + + tasks.append(fetch(url, session, headers)) + responses = await asyncio.gather(*tasks, return_exceptions=True) + await session.close() -utc = pytz.utc -load_dotenv() + CLAN_MEMBERS = [] + for response in responses: + try: + CLAN_MEMBERS += [member["tag"] for member in response["memberList"]] + except: + pass + return CLAN_MEMBERS, set(clan_tags) + + +async def get_tags_to_track(CLAN_MEMBERS: list, loop_spot: int, player_stats): + # people only become unpaused by joining a clan again or being tracked in legends again + gone_for_a_month = int(pend.now(tz=pend.UTC).timestamp()) - 2_592_000 + # 1/15 loops, only track legend members & clan members + if loop_spot % 15 != 0 and loop_spot % 150 != 0: + db_tags = await player_stats.distinct("tag", filter={"league": "Legend League"}) + all_tags_to_track = list(set(db_tags + CLAN_MEMBERS)) + else: + # every 150 loops track those that have been gone for a month, every 15th loop we check those that have been gone for less (active) + if loop_spot % 150 == 0: + pipeline = [{"$match": {"last_online": {"$lte": gone_for_a_month}}}, {"$project": {"tag": "$tag"}}, {"$unset": "_id"}] + else: + pipeline = [{"$match": {"last_online": {"$gte": gone_for_a_month}}}, {"$project": {"tag": "$tag"}}, {"$unset": "_id"}] + db_tags = [x["tag"] for x in (await player_stats.aggregate(pipeline).to_list(length=None))] + all_tags_to_track = list(set(db_tags + CLAN_MEMBERS)) + + return all_tags_to_track + + +async def add_new_autocomplete_additions(cache: redis.Redis, all_tags: list, player_search): + # add any new additions to the autocomplete + pipeline = [{"$match": {}}, {"$project": {"tag": "$tag"}}, {"$unset": "_id"}] + autocomplete_tags = [x["tag"] for x in (await player_search.aggregate(pipeline).to_list(length=None))] + autocomplete_tags = set(autocomplete_tags) + add_to_autocomplete = [tag for tag in all_tags if tag not in autocomplete_tags] + auto_changes = [] + for tag in add_to_autocomplete: + try: + r = await cache.get(tag) + if r is None: + continue + r = ujson.loads(r) + clan_tag = r.get("clan", {}).get("tag", "Unknown") + league = r.get("league", {}).get("name", "Unranked") + auto_changes.append(InsertOne({"name": r.get("name"), "clan": clan_tag, "league": league, "tag": r.get("tag"), "th": r.get("townHallLevel")})) + except: + continue + if auto_changes: + await player_search.bulk_write(auto_changes) + + +async def get_player_responses(keys: deque, tags: list[str], cache: redis.Redis, player_stats, player_search): + tasks = [] + connector = aiohttp.TCPConnector(limit=2000, ttl_dns_cache=300) + timeout = aiohttp.ClientTimeout(total=1800) + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + for tag in tags: + keys.rotate(1) + async def fetch(url, session: aiohttp.ClientSession, headers): + async with session.get(url, headers=headers) as new_response: + if new_response.status == 404: # remove banned players + t = url.split("%23")[-1] + await player_stats.delete_one({"tag": f"#{t}"}) + await cache.getdel(f"#{t}") + await player_search.delete_one({"tag": f"#{t}"}) + return None + elif new_response.status != 200: + return None + new_response = await new_response.read() + return new_response + + tasks.append(fetch(url=f'https://api.clashofclans.com/v1/players/{tag.replace("#", "%23")}', session=session, headers={"Authorization": f"Bearer {keys[0]}"})) + + results = await asyncio.gather(*tasks) + return results + + +async def player_response_handler(new_response: bytes, cache: redis.Redis, bulk_db_changes: list, auto_complete: list, set_clan_tags: set, bulk_insert: list, bulk_clan_changes: list): + obj = decode(new_response, type=Player) + compressed_new_response = snappy.compress(new_response) + previous_compressed_response = await cache.get(obj.tag) + + if compressed_new_response != previous_compressed_response: + await cache.set(obj.tag, compressed_new_response, ex=2_592_000) + if previous_compressed_response is None: + return None + + BEEN_ONLINE = False + new_response = ujson.loads(new_response) + + previous_response = snappy.decompress(previous_compressed_response) + previous_response = ujson.loads(previous_response) + + season = gen_season_date() + raid_date = gen_raid_date() + games_season = gen_games_season() + legend_date = gen_legend_date() + + tag = obj.tag + clan_tag = new_response.get("clan", {}).get("tag", "Unknown") + league = new_response.get("league", {}).get("name", "Unranked") + prev_league = previous_response.get("league", {}).get("name", "Unranked") + if league != prev_league: + bulk_db_changes.append(UpdateOne({"tag": tag}, {"$set": {"league": league}})) + auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"league": league}})) + + is_clan_member = clan_tag in set_clan_tags + + changes, fields_to_update = get_player_changes(previous_response, new_response) + online_types = {"donations", "Gold Grab", "Most Valuable Clanmate", "attackWins", + "War League Legend", + "Wall Buster", "name", "Well Seasoned", "Games Champion", "Elixir Escapade", + "Heroic Heist", + "warPreference", "warStars", "Nice and Tidy", "builderBaseTrophies"} + skip_store_types = {"War League Legend", "Wall Buster", "Aggressive Capitalism", + "Baby Dragon", + "Elixir Escapade", + "Gold Grab", "Heroic Heist", "Nice and Tidy", "Well Seasoned", + "attackWins", + "builderBaseTrophies", "donations", "donationsReceived", "trophies", + "versusBattleWins", "versusTrophies"} + + special_types = {"War League Legend", "warStars", "Aggressive Capitalism", "Nice and Tidy", "Well Seasoned", + "clanCapitalContributions", "Games Champion"} + ws_types = {"clanCapitalContributions", "name", "troops", "heroes", "spells", "heroEquipment", + "townHallLevel", + "league", "trophies", "Most Valuable Clanmate"} + only_once = {"troops": 0, "heroes": 0, "spells": 0, "heroEquipment": 0} + ws_tasks = [] + if changes: + player_level_changes = [] + clan_level_changes = [] + for (parent, type_), (old_value, value) in changes.items(): + if type_ in special_types: + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$set": {f"{type_.replace(' ', '_').lower()}": value}}, + upsert=True)) + + if type_ not in skip_store_types: + if old_value is None: + bulk_insert.append(InsertOne({"tag": tag, "type": type_, "value": value, + "time": int(pend.now(tz=pend.UTC).timestamp()), + "clan": clan_tag, "th": new_response.get("townHallLevel")})) + else: + bulk_insert.append(InsertOne( + {"tag": tag, "type": type_, "p_value": old_value, "value": value, + "time": int(pend.now(tz=pend.UTC).timestamp()), "clan": clan_tag, "th": new_response.get("townHallLevel")})) -BETA = False + if type_ == "donations": + previous_dono = 0 if (previous_dono := previous_response["donations"]) > (current_dono := new_response["donations"]) else previous_dono + player_level_changes.append({"$inc": {f"donations.{season}.donated": (current_dono - previous_dono)}}) + clan_level_changes.append({"$inc": {f"{season}.{tag}.donations": (current_dono - previous_dono)}}) -keys = create_keys([f"apiclashofclans+test{x}@gmail.com" for x in range(1, 13)], [getenv("COC_PASSWORD")] * 12) + elif type_ == "donationsReceived": + previous_dono = 0 if (previous_dono := previous_response["donationsReceived"]) > (current_dono := new_response["donationsReceived"]) else previous_dono + player_level_changes.append({"$inc": {f"donations.{season}.received": (current_dono - previous_dono)}}) + clan_level_changes.append({"$inc": {f"{season}.{tag}.received": (current_dono - previous_dono)}}) + elif type_ == "clanCapitalContributions": + player_level_changes.append({"$push": {f"capital_gold.{raid_date}.donate": (new_response["clanCapitalContributions"] -previous_response["clanCapitalContributions"])}}) + clan_level_changes.append({"$inc": {f"{season}.{tag}.capital_gold_dono": (new_response["clanCapitalContributions"] - previous_response["clanCapitalContributions"])}}) + type_ = "Most Valuable Clanmate" # temporary + elif type_ == "Gold Grab": + diff = value - old_value + player_level_changes.append({"$inc": {f"gold.{season}": diff}}) + clan_level_changes.append({"$inc": {f"{season}.{tag}.gold_looted": diff}}) -async def main(producer: KafkaProducer): - global keys + elif type_ == "Elixir Escapade": + diff = value - old_value + player_level_changes.append({"$inc": {f"elixir.{season}": diff}}) + clan_level_changes.append({"$inc": {f"{season}.{tag}.elixir_looted": diff}}) - client = motor.motor_asyncio.AsyncIOMotorClient(getenv("LOOPER_DB_LOGIN")) - player_stats = client.new_looper.player_stats - clan_stats = client.new_looper.clan_stats + elif type_ == "Heroic Heist": + diff = value - old_value + player_level_changes.append({"$inc": {f"dark_elixir.{season}": diff}}) + clan_level_changes.append({"$inc": {f"{season}.{tag}.dark_elixir_looted": diff}}) - db_client = motor.motor_asyncio.AsyncIOMotorClient(getenv("DB_LOGIN")) - clan_db = db_client.usafam.clans - player_search = db_client.usafam.player_search + elif type_ == "Well Seasoned": + diff = value - old_value + player_level_changes.append({"$inc": {f"season_pass.{games_season}": diff}}) + elif type_ == "Games Champion": + diff = value - old_value + player_level_changes.append({"$inc": {f"clan_games.{games_season}.points": diff}, + "$set": {f"clan_games.{games_season}.clan": clan_tag}}) + clan_level_changes.append({"$inc": {f"{games_season}.{tag}.clan_games": diff}}) - cache = redis.Redis(host='localhost', port=6379, db=0, password=getenv("REDIS_PW"), decode_responses=False, max_connections=2500) + elif type_ == "attackWins": + player_level_changes.append({"$set": {f"attack_wins.{season}": value}}) + clan_level_changes.append({"$set": {f"{season}.{tag}.attack_wins": value}}) - def get_changes(previous_response: dict, response: dict): - new_json = {} - fields_to_update = [] - ok_achievements = {"Gold Grab", "Elixir Escapade", "Heroic Heist", "Games Champion", "Aggressive Capitalism", - "Well Seasoned", "Nice and Tidy", "War League Legend", "Wall Buster"} - for key, item in response.items(): - old_item = previous_response.get(key) - if old_item != item: - fields_to_update.append(key) - not_ok_fields = {"labels", "legendStatistics", "playerHouse", "versusBattleWinCount"} - if key in not_ok_fields: - continue - if old_item != item: - if isinstance(item, list): - for count, spot in enumerate(item): - spot_name = spot["name"] - if key == "achievements" and spot_name not in ok_achievements: + + elif type_ == "trophies": + player_level_changes.append({"$set": {f"season_trophies.{season}": value}}) + clan_level_changes.append({"$set": {f"{season}.{tag}.trophies": value}}) + + + elif type_ == "name": + player_level_changes.append({"$set": {f"name": value}}) + auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"name": value}})) + + elif type_ == "clan": + player_level_changes.append({"$set": {f"clan_tag": clan_tag}}) + auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"clan": clan_tag}})) + + elif type_ == "townHallLevel": + player_level_changes.append({"$set": {f"townhall": value}}) + auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"th": value}})) + + elif parent in {"troops", "heroes", "spells", "heroEquipment"}: + type_ = parent + if only_once[parent] == 1: + continue + only_once[parent] += 1 + + if type_ in online_types: + BEEN_ONLINE = True + + if type_ in ws_types and is_clan_member: + if type_ == "trophies": + if not (value >= 4900 and league == "Legend League"): continue - old_ = next((item for item in old_item if item["name"] == spot_name), None) - if old_ != spot: - if key == "achievements": - if old_ is not None: - new_json[(key, spot_name.replace(".", ""))] = (old_["value"], spot["value"]) - else: - new_json[(key, spot_name.replace(".", ""))] = (None, spot["value"]) - else: - if old_ is not None: - new_json[(key, spot_name.replace(".", ""))] = (old_["level"], spot["level"]) - else: - new_json[(key, spot_name.replace(".", ""))] = (None, spot["level"]) - else: - if key == "clan": - new_json[(key, key)] = (None, {"tag" : item["tag"], "name" : item["name"]}) - elif key == "league": - new_json[(key, key)] = (None, {"tag" : item["id"], "name" : item["name"]}) + json_data = {"type": type_, "old_player": previous_response, + "new_player": new_response, "timestamp" : int(pend.now(tz=pend.UTC).timestamp())} + await cache.publish(channel="player", message=ujson.dumps(json_data).encode("utf-8")) + + if player_level_changes: + bulk_db_changes.append(UpdateOne( + {"tag": tag}, + player_level_changes, + upsert=True + )) + + if clan_level_changes and is_clan_member: + clan_level_changes.append({f"{season}.{tag}.name": new_response.get("name"), f"{season}.{tag}.townhall" : new_response.get("townHallLevel")}) + bulk_clan_changes.append(UpdateOne( + {"tag": clan_tag}, + clan_level_changes, + upsert=True + )) + # LEGENDS CODE, dont fix what aint broke + + + if new_response["trophies"] != previous_response["trophies"] and new_response["trophies"] >= 4900 and league == "Legend League": + diff_trophies = new_response["trophies"] - previous_response["trophies"] + diff_attacks = new_response["attackWins"] - previous_response["attackWins"] + + if diff_trophies <= - 1: + diff_trophies = abs(diff_trophies) + if diff_trophies <= 100: + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.defenses": diff_trophies}}, upsert=True)) + + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.new_defenses": { + "change": diff_trophies, + "time": int(pend.now(tz=pend.UTC).timestamp()), + "trophies": new_response["trophies"] + }}}, upsert=True)) + + elif diff_trophies >= 1: + heroes = new_response.get("heroes", []) + equipment = [] + for hero in heroes: + for gear in hero.get("equipment", []): + equipment.append({"name": gear.get("name"), "level": gear.get("level")}) + + bulk_db_changes.append( + UpdateOne({"tag": tag}, + {"$inc": {f"legends.{legend_date}.num_attacks": diff_attacks}}, + upsert=True)) + # if one attack + if diff_attacks == 1: + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": { + f"legends.{legend_date}.attacks": diff_trophies}}, + upsert=True)) + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.new_attacks": { + "change": diff_trophies, + "time": int(pend.now(tz=pend.UTC).timestamp()), + "trophies": new_response["trophies"], + "hero_gear": equipment + }}}, upsert=True)) + if diff_trophies == 40: + bulk_db_changes.append( + UpdateOne({"tag": tag}, {"$inc": {f"legends.streak": 1}})) + else: - new_json[(key, key)] = (old_item, item) + bulk_db_changes.append( + UpdateOne({"tag": tag}, {"$set": {f"legends.streak": 0}})) + + # if multiple attacks, but divisible by 40 + elif diff_attacks > 1 and diff_trophies / 40 == diff_attacks: + for x in range(0, diff_attacks): + bulk_db_changes.append( + UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.attacks": 40}}, + upsert=True)) + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.new_attacks": { + "change": 40, + "time": int(pend.now(tz=pend.UTC).timestamp()), + "trophies": new_response["trophies"], + "hero_gear": equipment + }}}, upsert=True)) + bulk_db_changes.append( + UpdateOne({"tag": tag}, {"$inc": {f"legends.streak": diff_attacks}})) + else: + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": { + f"legends.{legend_date}.attacks": diff_trophies}}, + upsert=True)) + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.new_attacks": { + "change": diff_trophies, + "time": int(pend.now(tz=pend.UTC).timestamp()), + "trophies": new_response["trophies"], + "hero_gear": equipment + }}}, upsert=True)) + + bulk_db_changes.append( + UpdateOne({"tag": tag}, {"$set": {f"legends.streak": 0}}, upsert=True)) + + if new_response["defenseWins"] != previous_response["defenseWins"]: + diff_defenses = new_response["defenseWins"] - previous_response["defenseWins"] + for x in range(0, diff_defenses): + bulk_db_changes.append( + UpdateOne({"tag": tag}, {"$push": {f"legends.{legend_date}.defenses": 0}}, + upsert=True)) + bulk_db_changes.append(UpdateOne({"tag": tag}, + {"$push": {f"legends.{legend_date}.new_defenses": { + "change": 0, + "time": int(pend.now(tz=pend.UTC).timestamp()), + "trophies": new_response["trophies"] + }}}, upsert=True)) + + if BEEN_ONLINE: + _time = int(pend.now(tz=pend.UTC).timestamp()) + bulk_db_changes.append( + UpdateOne({"tag": tag}, { + "$inc": {f"activity.{season}": 1}, + "$push": {f"last_online_times.{season}": _time}, + "$set": {"last_online": _time} + }, upsert=True)) + bulk_clan_changes.append( + UpdateOne({"tag": clan_tag}, + {"$inc": {f"{season}.{tag}.activity": 1}}, + upsert=True)) + + await asyncio.gather(*ws_tasks) + + +async def main(keys: deque, cache: redis.Redis, stats_mongo_client, static_mongo_client, config: Config): + player_stats = stats_mongo_client.new_looper.player_stats + clan_stats = stats_mongo_client.new_looper.clan_stats + + clan_db = static_mongo_client.usafam.clans + player_search = static_mongo_client.usafam.player_search - return (new_json, fields_to_update) loop_spot = 0 while True: try: loop_spot += 1 + CLAN_MEMBERS, clan_tag_set = await get_clan_member_tags(clan_db=clan_db, keys=keys) + all_tags = await get_tags_to_track(CLAN_MEMBERS=CLAN_MEMBERS, loop_spot=loop_spot, player_stats=player_stats) + await add_new_autocomplete_additions(cache=cache, all_tags=all_tags, player_search=player_search) - clan_tags = await clan_db.distinct("tag") - - tasks = [] - connector = aiohttp.TCPConnector(limit=250) - keys = collections.deque(keys) - async with aiohttp.ClientSession(connector=connector) as session: - for tag in clan_tags: - headers = {"Authorization": f"Bearer {keys[0]}"} - tag = tag.replace("#", "%23") - url = f"https://api.clashofclans.com/v1/clans/{tag}" - keys.rotate(1) - async def fetch(url, session, headers): - async with session.get(url, headers=headers) as response: - try: - clan = await response.json() - return clan - except: - return None - tasks.append(fetch(url, session, headers)) - responses = await asyncio.gather(*tasks, return_exceptions=True) - await session.close() - - CLAN_MEMBERS = [] - for response in responses: - try: - CLAN_MEMBERS += [member["tag"] for member in response["memberList"]] - except: - pass - - #people only become unpaused by joining a clan again or being tracked in legends again - gone_for_a_month = int(datetime.now().timestamp()) - 2_592_000 - #1/15 loops, only track legend members & clan members - if loop_spot % 15 != 0 and loop_spot % 150 != 0: - db_tags = await player_stats.distinct("tag", filter= {"league": "Legend League"}) - all_tags_to_track = list(set(db_tags + CLAN_MEMBERS)) - else: - # every 150 loops track those that have been gone for a month, every 15th loop we check those that have been gone for less (active) - if loop_spot % 150 == 0: - pipeline = [{"$match": {"last_online" : {"$lte" : gone_for_a_month}}}, {"$project": {"tag": "$tag"}}, {"$unset": "_id"}] - else: - pipeline = [{"$match": {"last_online" : {"$gte" : gone_for_a_month}}}, {"$project": {"tag": "$tag"}}, {"$unset": "_id"}] - db_tags = [x["tag"] for x in (await player_stats.aggregate(pipeline).to_list(length=None))] - all_tags_to_track = list(set(db_tags + CLAN_MEMBERS)) - - #add any new additions to the autocomplete - pipeline = [{"$match": {}}, {"$project" : {"tag" : "$tag"}}, {"$unset" : "_id"}] - autocomplete_tags = [x["tag"] for x in (await player_search.aggregate(pipeline).to_list(length=None))] - autocomplete_tags = set(autocomplete_tags) - add_to_autocomplete = [tag for tag in all_tags_to_track if tag not in autocomplete_tags] - auto_changes = [] - for tag in add_to_autocomplete: - try: - r = await cache.get(tag) - if r is None: - continue - r = ujson.loads(r) - clan_tag = r.get("clan", {}).get("tag", "Unknown") - league = r.get("league", {}).get("name", "Unranked") - auto_changes.append(InsertOne({"name": r.get("name"), "clan": clan_tag, "league": league, "tag": r.get("tag"), "th": r.get("townHallLevel")})) - except: - continue - if auto_changes: - await player_search.bulk_write(auto_changes) - - print(f"{len(all_tags_to_track)} tags") - + logger.info(f"{len(all_tags)} tags") time_inside = time.time() - class Player(Struct): - tag: str + split_tags = [all_tags[i:i + config.max_tag_split] for i in range(0, len(all_tags), config.max_tag_split)] bulk_db_changes = [] bulk_insert = [] bulk_clan_changes = [] auto_complete = [] - season = gen_season_date() - raid_date = gen_raid_date() - games_season = gen_games_season() - legend_date = gen_legend_date() - - set_clan_tags = set(await clan_db.distinct("tag")) - - players_tracked = set() - - tasks = [] - deque = collections.deque - connector = aiohttp.TCPConnector(limit=2000, ttl_dns_cache=300) - keys = deque(keys) - url = "https://api.clashofclans.com/v1/players/" - timeout = aiohttp.ClientTimeout(total=1800) - async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session3: - for tag in all_tags_to_track: - tag = tag.replace("#", "%23") - keys.rotate(1) - - async def fetch(url, session: aiohttp.ClientSession, headers): - async with session.get(url, headers=headers) as new_response: - if new_response.status == 404: # remove banned players - t = url.split("%23")[-1] - await player_stats.delete_one({"tag": f"#{t}"}) - await cache.getdel(f"#{t}") - await player_search.delete_one({"tag": f"#{t}"}) - return None - new_response = await new_response.read() - - compressed_new_response = snappy.compress(new_response) - - obj = decode(new_response, type=Player) - previous_response = await cache.get(obj.tag) - players_tracked.add(obj.tag) - - if compressed_new_response != previous_response: - await cache.set(obj.tag, compressed_new_response, ex=2_592_000) - if previous_response is None: - return None - BEEN_ONLINE = False - new_response = ujson.loads(new_response) - previous_response = ujson.loads(previous_response) - - tag = obj.tag - clan_tag = new_response.get("clan", {}).get("tag", "Unknown") - league = new_response.get("league", {}).get("name", "Unranked") - prev_league = previous_response.get("league", {}).get("name", "Unranked") - if league != prev_league: - bulk_db_changes.append(UpdateOne({"tag": tag}, {"$set": {"league": league}})) - auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"league": league}})) - - is_clan_member = clan_tag in set_clan_tags - - changes, fields_to_update = get_changes(previous_response, new_response) - online_types = {"donations", "Gold Grab", "Most Valuable Clanmate", "attackWins", - "War League Legend", - "Wall Buster", "name", "Well Seasoned", "Games Champion", "Elixir Escapade", - "Heroic Heist", - "warPreference", "warStars", "Nice and Tidy", "builderBaseTrophies"} - skip_store_types = {"War League Legend", "Wall Buster", "Aggressive Capitalism", - "Baby Dragon", - "Elixir Escapade", - "Gold Grab", "Heroic Heist", "Nice and Tidy", "Well Seasoned", - "attackWins", - "builderBaseTrophies", "donations", "donationsReceived", "trophies", - "versusBattleWins", "versusTrophies"} - - special_types = {"War League Legend", "warStars", "Aggressive Capitalism", "Nice and Tidy", "Well Seasoned", - "clanCapitalContributions", "Games Champion"} - ws_types = {"clanCapitalContributions", "name", "troops", "heroes", "spells", "heroEquipment", - "townHallLevel", - "league", "trophies", "Most Valuable Clanmate"} - only_once = {"troops": 0, "heroes": 0, "spells": 0, "heroEquipment" : 0} - ws_tasks = [] - if changes: - for (parent, type_), (old_value, value) in changes.items(): - if type_ in special_types: - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$set": {f"{type_.replace(' ','_').lower()}": value}}, - upsert=True)) - - if type_ not in skip_store_types: - if old_value is None: - bulk_insert.append(InsertOne({"tag": tag, "type": type_, "value": value, - "time": int(datetime.now().timestamp()), - "clan": clan_tag, "th" : new_response.get("townHallLevel")})) - else: - bulk_insert.append(InsertOne( - {"tag": tag, "type": type_, "p_value": old_value, "value": value, - "time": int(datetime.now().timestamp()), "clan": clan_tag, "th" : new_response.get("townHallLevel")})) - - if type_ == "donations": - previous_dono = 0 if (previous_dono := previous_response["donations"]) > ( - current_dono := new_response["donations"]) else previous_dono - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$inc": {f"donations.{season}.donated": ( - current_dono - previous_dono)}}, - upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append(UpdateOne({"tag": clan_tag}, { - "$inc": {f"{season}.{tag}.donated": (current_dono - previous_dono)}}, - upsert=True)) - elif type_ == "donationsReceived": - previous_dono = 0 if (previous_dono := previous_response[ - "donationsReceived"]) > ( - current_dono := new_response[ - "donationsReceived"]) else previous_dono - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$inc": {f"donations.{season}.received": ( - current_dono - previous_dono)}}, - upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append(UpdateOne({"tag": clan_tag}, { - "$inc": {f"{season}.{tag}.received": (current_dono - previous_dono)}}, - upsert=True)) - elif type_ == "clanCapitalContributions": - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": - {f"capital_gold.{raid_date}.donate": - (new_response[ - "clanCapitalContributions"] - - previous_response[ - "clanCapitalContributions"])}}, - upsert=True)) - type_ = "Most Valuable Clanmate" # temporary - elif type_ == "Gold Grab": - diff = value - old_value - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$inc": {f"gold.{season}": diff}}, upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append( - UpdateOne({"tag": clan_tag}, - {"$inc": {f"{season}.{tag}.gold_looted": diff}}, - upsert=True)) - elif type_ == "Elixir Escapade": - diff = value - old_value - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$inc": {f"elixir.{season}": diff}}, upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append( - UpdateOne({"tag": clan_tag}, - {"$inc": {f"{season}.{tag}.elixir_looted": diff}}, - upsert=True)) - elif type_ == "Heroic Heist": - diff = value - old_value - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$inc": {f"dark_elixir.{season}": diff}}, - upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append(UpdateOne({"tag": clan_tag}, { - "$inc": {f"{season}.{tag}.dark_elixir_looted": diff}}, upsert=True)) - elif type_ == "Well Seasoned": - diff = value - old_value - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$inc": {f"season_pass.{games_season}": diff}}, - upsert=True)) - elif type_ == "Games Champion": - diff = value - old_value - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$inc": { - f"clan_games.{games_season}.points": diff}, - "$set": { - f"clan_games.{games_season}.clan": clan_tag} - }, upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append( - UpdateOne({"tag": clan_tag}, - {"$inc": {f"{games_season}.{tag}.clan_games": diff}}, - upsert=True)) - elif type_ == "attackWins": # remove in a future version, use cache instead - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$set": {f"attack_wins.{season}": value}}, - upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append( - UpdateOne({"tag": clan_tag}, - {"$set": {f"{season}.{tag}.attack_wins": value}}, - upsert=True)) - - elif type_ == "trophies": # remove in a future version, use cache instead - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$set": {f"season_trophies.{season}": value}},upsert=True)) - if clan_tag != "Unknown": - bulk_clan_changes.append( - UpdateOne({"tag": clan_tag}, - {"$set": {f"{season}.{tag}.trophies": value}}, - upsert=True)) - - elif type_ == "name": - bulk_db_changes.append(UpdateOne({"tag": tag}, {"$set": {"name": value}}, upsert=True)) - auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"name": value}})) - elif type_ == "clan": - bulk_db_changes.append(UpdateOne({"tag": tag}, {"$set": {"clan_tag": clan_tag}}, upsert=True)) - auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"clan": clan_tag}})) - - elif type_ == "townHallLevel": - bulk_db_changes.append(UpdateOne({"tag": tag}, {"$set": {"townhall": value}}, upsert=True)) - auto_complete.append(UpdateOne({"tag": tag}, {"$set": {"th": value}})) - - elif parent in {"troops", "heroes", "spells", "heroEquipment"}: - type_ = parent - if only_once[parent] == 1: - continue - only_once[parent] += 1 - - if type_ in online_types: - BEEN_ONLINE = True - - if type_ in ws_types and is_clan_member: - if type_ == "trophies": - if not (value >= 4900 and league == "Legend League"): - continue - json_data = {"type": type_, "old_player": previous_response, - "new_player": new_response} - producer.send("player", ujson.dumps(json_data).encode("utf-8"), timestamp_ms=int(datetime.now().timestamp()) * 1000) - - - # LEGENDS CODE, dont fix what aint broke - if new_response["trophies"] != previous_response["trophies"] and new_response["trophies"] >= 4900 and league == "Legend League": - diff_trophies = new_response["trophies"] - previous_response["trophies"] - diff_attacks = new_response["attackWins"] - previous_response["attackWins"] - - if diff_trophies <= - 1: - diff_trophies = abs(diff_trophies) - if diff_trophies <= 100: - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.defenses": diff_trophies}}, upsert=True)) - - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.new_defenses": { - "change" : diff_trophies, - "time" : int(datetime.now().timestamp()), - "trophies" : new_response["trophies"] - }}}, upsert=True)) - - elif diff_trophies >= 1: - heroes = new_response.get("heroes", []) - equipment = [] - for hero in heroes: - for gear in hero.get("equipment", []): - equipment.append({"name" : gear.get("name"), "level" : gear.get("level")}) - - bulk_db_changes.append( - UpdateOne({"tag": tag}, - {"$inc": {f"legends.{legend_date}.num_attacks": diff_attacks}}, - upsert=True)) - # if one attack - if diff_attacks == 1: - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": { - f"legends.{legend_date}.attacks": diff_trophies}}, - upsert=True)) - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.new_attacks": { - "change": diff_trophies, - "time": int(datetime.now().timestamp()), - "trophies": new_response["trophies"], - "hero_gear": equipment - }}}, upsert=True)) - if diff_trophies == 40: - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$inc": {f"legends.streak": 1}})) - - else: - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$set": {f"legends.streak": 0}})) - - # if multiple attacks, but divisible by 40 - elif int(diff_trophies / 40) == diff_attacks: - for x in range(0, diff_attacks): - bulk_db_changes.append( - UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.attacks": 40}}, - upsert=True)) - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.new_attacks": { - "change": 40, - "time": int(datetime.now().timestamp()), - "trophies": new_response["trophies"], - "hero_gear" : equipment - }}}, upsert=True)) - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$inc": {f"legends.streak": diff_attacks}})) - else: - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": { - f"legends.{legend_date}.attacks": diff_trophies}}, - upsert=True)) - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.new_attacks": { - "change": diff_trophies, - "time": int(datetime.now().timestamp()), - "trophies": new_response["trophies"], - "hero_gear": equipment - }}}, upsert=True)) - - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$set": {f"legends.streak": 0}}, upsert=True)) - - if new_response["defenseWins"] != previous_response["defenseWins"]: - diff_defenses = new_response["defenseWins"] - previous_response["defenseWins"] - for x in range(0, diff_defenses): - bulk_db_changes.append( - UpdateOne({"tag": tag}, {"$push": {f"legends.{legend_date}.defenses": 0}}, - upsert=True)) - bulk_db_changes.append(UpdateOne({"tag": tag}, - {"$push": {f"legends.{legend_date}.new_defenses": { - "change": 0, - "time": int(datetime.now().timestamp()), - "trophies": new_response["trophies"] - }}}, upsert=True)) - - if BEEN_ONLINE: - _time = int(datetime.now().timestamp()) - bulk_db_changes.append( - UpdateOne({"tag": tag}, { - "$inc": {f"activity.{season}": 1}, - "$push" : {f"last_online_times.{season}" : _time}, - "$set": {"last_online": _time} - }, upsert=True)) - bulk_clan_changes.append( - UpdateOne({"tag": clan_tag}, - {"$inc": {f"{season}.{tag}.activity": 1}}, - upsert=True)) - - await asyncio.gather(*ws_tasks) - - tasks.append(fetch(f"{url}{tag}", session3, {"Authorization": f"Bearer {keys[0]}"})) - await asyncio.gather(*tasks, return_exceptions=True) - await session3.close() - - - print(f"{time.time() - time_inside} seconds inside") - print(f"{len(players_tracked)} players tracked") - - - print(f"{len(bulk_db_changes)} db changes") + for count, tag_group in enumerate(split_tags, 1): + responses = await get_player_responses(keys=keys, tags=tag_group, cache=cache, player_stats=player_stats, player_search=player_search) + for response in responses: + if response is None: + continue + await player_response_handler(new_response=response, cache=cache, bulk_db_changes=bulk_db_changes, + bulk_insert=bulk_insert, bulk_clan_changes=bulk_clan_changes, auto_complete=auto_complete, + set_clan_tags=clan_tag_set) + logger.info(f"GROUP {count}: {time.time() - time_inside} seconds inside") + + + logger.info(f"{len(bulk_db_changes)} db changes") if bulk_db_changes != []: results = await player_stats.bulk_write(bulk_db_changes) - print(results.bulk_api_result) - print(f"STAT CHANGES INSERT: {time.time() - time_inside}") + logger.info(results.bulk_api_result) + logger.info(f"STAT CHANGES INSERT: {time.time() - time_inside}") if auto_complete != []: results = await player_search.bulk_write(auto_complete) - print(results.bulk_api_result) - print(f"AUTOCOMPLETE CHANGES: {time.time() - time_inside}") + logger.info(results.bulk_api_result) + logger.info(f"AUTOCOMPLETE CHANGES: {time.time() - time_inside}") if bulk_insert != []: - results = await client.new_looper.player_history.bulk_write(bulk_insert) - print(results.bulk_api_result) - print(f"HISTORY CHANGES INSERT: {time.time() - time_inside}") + results = await stats_mongo_client.new_looper.player_history.bulk_write(bulk_insert) + logger.info(results.bulk_api_result) + logger.info(f"HISTORY CHANGES INSERT: {time.time() - time_inside}") if bulk_clan_changes != []: results = await clan_stats.bulk_write(bulk_clan_changes) - print(results.bulk_api_result) - print(f"CLAN CHANGES UPDATE: {time.time() - time_inside}") + logger.info(results.bulk_api_result) + logger.info(f"CLAN CHANGES UPDATE: {time.time() - time_inside}") fix_changes = [] not_set_entirely = await player_stats.distinct("tag", filter={"$and": [{"paused": {"$ne" : False}}, {"$or": [{"name": None}, {"league": None}, {"townhall": None}, {"clan_tag": None}]}]}) - print(f'{len(not_set_entirely)} tags to fix') + logger.info(f'{len(not_set_entirely)} tags to fix') for tag in not_set_entirely: try: response = await cache.get(tag) @@ -552,63 +469,16 @@ async def fetch(url, session: aiohttp.ClientSession, headers): if fix_changes != []: results = await player_stats.bulk_write(fix_changes) - print(results.bulk_api_result) - print(f"FIX CHANGES: {time.time() - time_inside}") + logger.info(results.bulk_api_result) + logger.info(f"FIX CHANGES: {time.time() - time_inside}") except Exception: continue -def gen_raid_date(): - now = datetime.utcnow().replace(tzinfo=utc) - current_dayofweek = now.weekday() - if (current_dayofweek == 4 and now.hour >= 7) or (current_dayofweek == 5) or (current_dayofweek == 6) or ( - current_dayofweek == 0 and now.hour < 7): - if current_dayofweek == 0: - current_dayofweek = 7 - fallback = current_dayofweek - 4 - raidDate = (now - timedelta(fallback)).date() - return str(raidDate) - else: - forward = 4 - current_dayofweek - raidDate = (now + timedelta(forward)).date() - return str(raidDate) - -def gen_season_date(): - end = coc.utils.get_season_end().replace(tzinfo=utc).date() - month = end.month - if month <= 9: - month = f"0{month}" - return f"{end.year}-{month}" - -def gen_legend_date(): - now = datetime.utcnow() - hour = now.hour - if hour < 5: - date = (now - timedelta(1)).date() - else: - date = now.date() - return str(date) - -def gen_games_season(): - now = datetime.utcnow() - month = now.month - if month <= 9: - month = f"0{month}" - return f"{now.year}-{month}" - - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - producer = KafkaProducer(bootstrap_servers=["85.10.200.219:9092"], api_version=(3, 6, 0)) - coc_client = coc.Client(key_count=100, throttle_limit=1000, cache_max_size=0, raw_attribute=True) - keys = create_keys() - loop.run_until_complete(coc_client.login_with_tokens(*keys[:100])) - loop.create_task(main(producer=producer)) - loop.run_forever() diff --git a/tracking/player/utils.py b/tracking/player/utils.py index e69de29b..b30fcce1 100644 --- a/tracking/player/utils.py +++ b/tracking/player/utils.py @@ -0,0 +1,86 @@ +from msgspec import Struct +import pendulum as pend +from datetime import timedelta +import coc + +def gen_raid_date(): + now = pend.now(tz=pend.UTC) + current_dayofweek = now.weekday() + if (current_dayofweek == 4 and now.hour >= 7) or (current_dayofweek == 5) or (current_dayofweek == 6) or ( + current_dayofweek == 0 and now.hour < 7): + if current_dayofweek == 0: + current_dayofweek = 7 + fallback = current_dayofweek - 4 + raidDate = (now - timedelta(fallback)).date() + return str(raidDate) + else: + forward = 4 - current_dayofweek + raidDate = (now + timedelta(forward)).date() + return str(raidDate) + +def gen_season_date(): + end = coc.utils.get_season_end().replace(tzinfo=pend.UTC).date() + month = end.month + if month <= 9: + month = f"0{month}" + return f"{end.year}-{month}" + +def gen_legend_date(): + now = pend.now(tz=pend.UTC) + hour = now.hour + if hour < 5: + date = (now - timedelta(1)).date() + else: + date = now.date() + return str(date) + +def gen_games_season(): + now = pend.now(tz=pend.UTC) + month = now.month + if month <= 9: + month = f"0{month}" + return f"{now.year}-{month}" + +def get_player_changes(previous_response: dict, response: dict): + new_json = {} + fields_to_update = [] + ok_achievements = {"Gold Grab", "Elixir Escapade", "Heroic Heist", "Games Champion", "Aggressive Capitalism", + "Well Seasoned", "Nice and Tidy", "War League Legend", "Wall Buster"} + for key, item in response.items(): + old_item = previous_response.get(key) + if old_item != item: + fields_to_update.append(key) + not_ok_fields = {"labels", "legendStatistics", "playerHouse", "versusBattleWinCount"} + if key in not_ok_fields: + continue + if old_item != item: + if isinstance(item, list): + for count, spot in enumerate(item): + spot_name = spot["name"] + if key == "achievements" and spot_name not in ok_achievements: + continue + old_ = next((item for item in old_item if item["name"] == spot_name), None) + if old_ != spot: + if key == "achievements": + if old_ is not None: + new_json[(key, spot_name.replace(".", ""))] = (old_["value"], spot["value"]) + else: + new_json[(key, spot_name.replace(".", ""))] = (None, spot["value"]) + else: + if old_ is not None: + new_json[(key, spot_name.replace(".", ""))] = (old_["level"], spot["level"]) + else: + new_json[(key, spot_name.replace(".", ""))] = (None, spot["level"]) + else: + if key == "clan": + new_json[(key, key)] = (None, {"tag": item["tag"], "name": item["name"]}) + elif key == "league": + new_json[(key, key)] = (None, {"tag": item["id"], "name": item["name"]}) + else: + new_json[(key, key)] = (old_item, item) + + return (new_json, fields_to_update) + + +class Player(Struct): + tag: str \ No newline at end of file diff --git a/utility/components.py b/utility/components.py index 4399c316..4f56bb53 100644 --- a/utility/components.py +++ b/utility/components.py @@ -3,7 +3,6 @@ from classes.player import MyCustomPlayer from typing import List import coc -from utility.clash.other import gen_season_date from utility.constants import BOARD_TYPES from typing import Union