-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
75 lines (62 loc) · 2.37 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from typing import Dict, Optional
from fastapi import FastAPI
from httpx import (
AsyncClient,
RemoteProtocolError,
UnsupportedProtocol,
Response as HttpxResponse,
ConnectError,
)
from starlette.requests import Request
from starlette.responses import Response
from env import proxy
app = FastAPI()
WHITE_LIST = ["mihoyo.com", "miyoushe.com", "hoyolab.com", "hoyoverse.com"]
async def req_client(method: str, target_url: str, headers, body, _proxy: Optional[str]) -> Response:
async with AsyncClient(timeout=120, follow_redirects=True, proxy=proxy) as client:
try:
async with client.stream(
method, target_url, headers=headers, data=body
) as r:
headers = dict(r.headers)
r: HttpxResponse
_content = b"".join([part async for part in r.aiter_raw(1024 * 10)])
return Response(_content, headers=headers, status_code=r.status_code)
except (RemoteProtocolError, UnsupportedProtocol, ConnectError):
if _proxy is not None:
return await req_client(method, target_url, headers, body, None)
return Response(content="UnsupportedProtocol", status_code=400)
def rewrite_headers(old_headers: Dict[str, str]) -> Dict[str, str]:
remove_keys = ["host"]
headers = {}
for k, v in old_headers.items():
if k.lower() not in remove_keys:
headers[k] = v
return headers
async def get_proxy(req: Request) -> Response:
path = req.path_params.get("path")
if not path:
return Response(status_code=400, content="path is required")
for domain in WHITE_LIST:
if domain in path:
break
else:
return Response(status_code=400, content="domain is not allowed")
query = str(req.query_params)
headers = rewrite_headers(dict(req.headers))
method = req.method
try:
body = await req.body()
except Exception as e:
return Response(status_code=400, content=f"get request body info error: {e}")
q = "?" + query if query else ""
target_url = path + q
return await req_client(method, target_url, headers, body, proxy)
app.add_route(
"/{path:path}",
get_proxy,
methods=["OPTIONS", "HEAD", "GET", "POST", "PUT", "PATCH", "DELETE"],
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5677)