Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
MagicTheDev committed Dec 18, 2024
1 parent 3f4896e commit a7d71b2
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 163 deletions.
271 changes: 109 additions & 162 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,23 @@
import uvloop
import asyncio
from fastapi import FastAPI, HTTPException, Request
import aiohttp
import os
from datetime import datetime
from base64 import b64decode
from json import loads as json_loads
from multiprocessing import shared_memory, Lock as ProcessLock
import numpy as np
from typing import List, Deque
from collections import deque
import uvicorn
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from dotenv import load_dotenv
load_dotenv()
from utils import create_keys, get_public_ip, generate_credentials

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# ------------------------------------------
# Utility Functions
# ------------------------------------------
def generate_credentials() -> tuple[List[str], List[str]]:
"""Generate email and password lists from environment variables."""
min_idx = int(os.getenv('MIN_EMAIL_INDEX'))
max_idx = int(os.getenv('MAX_EMAIL_INDEX'))

email_template = os.getenv('EMAIL_TEMPLATE')
password = os.getenv('API_PASSWORD')

emails = [email_template.format(x=x) for x in range(min_idx, max_idx + 1)]
passwords = [password] * (max_idx + 1 - min_idx)

return emails, passwords


async def get_keys(emails: list, passwords: list, key_names: str, key_count: int) -> list:
"""Fetch API keys or create new ones."""
total_keys = []

for email, password in zip(emails, passwords):
keys = []
async with aiohttp.ClientSession() as session:
# Authenticate and get IP
login_resp = await session.post(
"https://developer.clashofclans.com/api/login",
json={"email": email, "password": password},
)
if login_resp.status == 403:
raise RuntimeError("Invalid Credentials")

login_payload = await login_resp.json()
ip = json_loads(
b64decode(login_payload["temporaryAPIToken"].split(".")[1] + "====").decode()
)["limits"][1]["cidrs"][0].split("/")[0]

# List keys
keys_resp = await session.post("https://developer.clashofclans.com/api/apikey/list")
existing_keys = (await keys_resp.json())["keys"]

# Revoke keys with incorrect IP
for key in existing_keys:
if ip not in key["cidrRanges"]:
await session.post(
"https://developer.clashofclans.com/api/apikey/revoke", json={"id": key["id"]}
)

# Create new keys if needed
while len(keys) < key_count:
data = {
"name": key_names,
"description": f"Created on {datetime.now().strftime('%c')}",
"cidrRanges": [ip],
"scopes": ["clash"],
}
create_resp = await session.post(
"https://developer.clashofclans.com/api/apikey/create", json=data
)
new_key = await create_resp.json()

# Debug response structure
print("API Key Creation Response:", new_key)

if "error" in new_key:
raise RuntimeError(f"API Key Creation Failed: {new_key['error']}")

if "key" not in new_key or "key" not in new_key["key"]:
raise RuntimeError("Invalid response structure: Missing 'key' field.")

keys.append(new_key["key"]["key"])

total_keys.extend(keys)

return total_keys


# ------------------------------------------
# Shared Key Manager
# ------------------------------------------
class SharedKeyManager:
"""Manages shared API keys across processes."""

def __init__(self):
try:
self.shm = shared_memory.SharedMemory(name="key_index", create=True, size=8)
except FileExistsError:
self.shm = shared_memory.SharedMemory(name="key_index")

self.current_index = np.ndarray((1,), dtype=np.int64, buffer=self.shm.buf)
self.current_index[0] = 0
self.lock = ProcessLock()
self.keys = []

def initialize_keys(self, keys: list[str]):
self.keys = keys

def get_next_key(self) -> str:
with self.lock:
index = self.current_index[0]
self.current_index[0] = (index + 1) % len(self.keys)
return self.keys[index]

def cleanup(self):
self.shm.close()
try:
self.shm.unlink()
except Exception:
pass


# ------------------------------------------
# CoCProxy
# ------------------------------------------
class CoCProxy:
def __init__(self):
self.key_manager = SharedKeyManager()
self.keys = deque()
self.session: aiohttp.ClientSession | None = None

async def startup(self):
Expand All @@ -136,63 +28,118 @@ async def startup(self):
async def cleanup(self):
if self.session:
await self.session.close()
self.key_manager.cleanup()


# ------------------------------------------
# FastAPI Application
# ------------------------------------------
app = FastAPI()

middleware = [
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
),
Middleware(
GZipMiddleware,
minimum_size=500
)
]

app = FastAPI(middleware=middleware)
proxy = CoCProxy()


@app.on_event("startup")
async def startup_event():
await proxy.startup()
if os.getenv("WORKER_ID", "0") == "0":
print("Main process initializing keys...")
emails, passwords = generate_credentials()
try:
keys = await get_keys(emails, passwords, "test", 10)
proxy.key_manager.initialize_keys(keys)
except Exception as e:
print(f"Error during key initialization: {e}")
raise RuntimeError("Failed to initialize API keys.")
emails, passwords = generate_credentials()
try:
keys = await create_keys(emails=emails, passwords=passwords, ip=get_public_ip())
proxy.keys = deque(keys)
if not proxy.keys:
raise RuntimeError("No API keys available after initialization.")
print(f"Initialized {len(proxy.keys)} API keys.")
except Exception as e:
print(f"Error during key initialization: {e}")
raise RuntimeError("Failed to initialize API keys.")


@app.on_event("shutdown")
async def shutdown_event():
await proxy.cleanup()


@app.get("/v1/{url:path}")
async def proxy_get(url: str, request: Request):
try:
key = proxy.key_manager.get_next_key()
full_url = f"https://api.clashofclans.com/v1/{url}?{request.query_params}"
headers = {"Accept": "application/json", "Authorization": f"Bearer {key}"}

async with proxy.session.get(full_url, headers=headers) as resp:
if resp.status != 200:
raise HTTPException(status_code=resp.status, detail=await resp.text())
return await resp.json()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def read_root():
return {"message": "CoC Proxy Server is running."}

@app.get("/v1/{url:path}",
name="Test a coc api endpoint, very high ratelimit, only for testing without auth",
include_in_schema=False)
async def test_endpoint(url: str, request: Request):
global KEYS

@app.post("/v1/{url:path}")
async def proxy_post(url: str, request: Request):
try:
key = proxy.key_manager.get_next_key()
full_url = f"https://api.clashofclans.com/v1/{url}?{request.query_params}"
headers = {"Accept": "application/json", "Authorization": f"Bearer {key}"}
body = await request.json()

async with proxy.session.post(full_url, json=body, headers=headers) as resp:
if resp.status != 200:
raise HTTPException(status_code=resp.status, detail=await resp.text())
return await resp.json()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
query_string = "&".join([f"{key}={value}" for key, value in request.query_params.items()])

headers = {"Accept": "application/json", "Authorization": f"Bearer {proxy.keys[0]}"}
proxy.keys.rotate(1)

full_url = f"https://api.clashofclans.com/v1/{url}"
full_url = full_url.replace("#", '%23').replace("!", '%23')
if query_string:
full_url = f"{full_url}?{query_string}"

async with aiohttp.ClientSession() as session:
async with session.get(full_url, headers=headers) as api_response:
if api_response.status != 200:
content = await api_response.text()
raise HTTPException(status_code=api_response.status, detail=content)
item = await api_response.json()

return item


@app.post("/v1/{url:path}",
name="Test a coc api endpoint, very high ratelimit, only for testing without auth",
include_in_schema=False)
async def test_post_endpoint(url: str, request: Request):
global KEYS

# Extract query parameters
query_params = request.query_params
fields = query_params.get("fields")

# Remove the "fields" parameter from the query parameters
query_params = {key: value for key, value in query_params.items() if key != "fields"}
query_string = "&".join([f"{key}={value}" for key, value in query_params.items()])

headers = {"Accept": "application/json", "Authorization": f"Bearer {proxy.keys[0]}"}
proxy.keys.rotate(1)

# Construct the full URL with query parameters if any
full_url = f"https://api.clashofclans.com/v1/{url}"
if query_string:
full_url = f"{full_url}?{query_string}"

full_url = full_url.replace("#", '%23').replace("!", '%23')

# Extract JSON body from the request
body = await request.json()

async with aiohttp.ClientSession() as session:
async with session.post(full_url, json=body, headers=headers) as api_response:
if api_response.status != 200:
content = await api_response.text()
raise HTTPException(status_code=api_response.status, detail=content)
item = await api_response.json()

return item


if __name__ == "__main__":
# Determine host and port based on environment variables or default values
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8011"))
reload = os.getenv("RELOAD", "false").lower() == "true"

# Run with: WORKER_ID=0 uvicorn main:app --workers 4 --loop uvloop
# Run the Uvicorn server with uvloop already set as the event loop policy
uvicorn.run(app, host=host, port=port, reload=reload)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ fastapi>=0.104.0
uvicorn[standard]>=0.24.0
uvloop>=0.19.0
aiohttp>=3.9.0
numpy>=2.2.0
requests==2.32.3
python-dotenv==1.0.1
Loading

0 comments on commit a7d71b2

Please sign in to comment.