Skip to content

Commit

Permalink
Improve automatic server info generation when --root-path is specified.
Browse files Browse the repository at this point in the history
  • Loading branch information
MikuAuahDark committed Apr 15, 2024
1 parent 651dca9 commit 4e93159
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 51 deletions.
2 changes: 1 addition & 1 deletion npps4/game/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def download_update(context: idol.SchoolIdolAuthParams, request: DownloadU

# Inject autogenerated server info
if config.inject_server_info():
filehash, size = svinfo.generate_server_info(context.request, int(platform), download.get_server_version())
filehash, size = svinfo.generate_server_info(context.request, download.get_server_version())
target_version_str = util.sif_version_string(download.get_server_version())
result.append(
DownloadUpdateInfo(
Expand Down
116 changes: 66 additions & 50 deletions npps4/svinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import json
import os
from typing import Annotated, Literal
import zipfile

import fastapi
Expand Down Expand Up @@ -38,28 +39,35 @@
}


def get_root_path(request: fastapi.Request):
root_path: str = request.scope.get("root_path") or ""
if len(root_path) > 0:
if root_path[0] != "/":
root_path = "/" + root_path
if root_path[-1] == "/":
root_path = root_path[:-1]
return root_path


def make_endpoint(route_prefix: str, scheme: str, request: fastapi.Request, endpoint: str = ""):
# Get root path
root_path: str = request.scope.get("root_path") or "/"
if root_path[-1] != "/":
root_path = root_path + "/"
if root_path[0] != "/":
root_path = "/" + root_path
root_path = get_root_path(request)

# Get main endpoint
if route_prefix[0] == "/":
route_prefix = route_prefix[1:]
if route_prefix[-1] == "/":
route_prefix = route_prefix[:-1]
if len(route_prefix) > 0:
if route_prefix[0] != "/":
route_prefix = "/" + route_prefix[1:]
if route_prefix[-1] == "/":
route_prefix = route_prefix[:-1]

if len(endpoint) > 0 and endpoint[0] != "/":
endpoint = "/" + endpoint

return f"{scheme}://{request.url.netloc}{root_path}{route_prefix}{endpoint}"


def get_hash_key(request: fastapi.Request, platform: int, version: tuple[int, int]):
root_path: str = request.scope.get("root_path") or "/"
def get_hash_key(request: fastapi.Request, version: tuple[int, int]):
root_path = get_root_path(request)
return (
root_path
+ request.url.scheme
Expand All @@ -68,61 +76,69 @@ def get_hash_key(request: fastapi.Request, platform: int, version: tuple[int, in
+ app.webview.prefix
+ config.get_consumer_key()
+ str(config.get_application_key(), "UTF-8")
+ str(platform)
+ util.sif_version_string(version)
)


def generate_server_info(request: fastapi.Request, platform: int, version: tuple[int, int]):
def build_server_info(request: fastapi.Request, version: tuple[int, int]):
# Get root path
root_path = get_root_path(request)
scheme = request.url.scheme
ver: str = util.sif_version_string(version)
server_info = copy.deepcopy(SERVERINFO_TEMPLATE)
server_info["domain"] = f"{scheme}://{request.url.netloc}{root_path}"
server_info["maintenance_uri"] = make_endpoint("", scheme, request, "/resources/maintenance/maintenance.php")
server_info["update_uri"] = make_endpoint("", scheme, request, "/resources/maintenance/update.php")
server_info["login_news_uri"] = make_endpoint(app.webview.prefix, scheme, request, "/announce/index")
server_info["locked_user_uri"] = make_endpoint(app.webview.prefix, scheme, request, "/static/index?id=13")
server_info["server_version"] = ver
server_info["end_point"] = app.main.prefix
server_info["consumer_key"] = config.get_consumer_key()
server_info["application_key"] = str(config.get_application_key(), "UTF-8")
server_info["api_uri"] = dict(
(k, make_endpoint(app.main.prefix, scheme, request, k)) for k in server_info["api_uri"].keys()
)
return server_info


def generate_server_info(request: fastapi.Request, version: tuple[int, int]):
datadir = config.get_data_directory() + "/server_info/"
key = hashlib.sha1(get_hash_key(request, platform, version).encode("UTF-8"), usedforsecurity=False).hexdigest()
key = hashlib.sha1(get_hash_key(request, version).encode("UTF-8"), usedforsecurity=False).hexdigest()

os.makedirs(datadir, exist_ok=True)
dest = datadir + key + ".zip"
if os.path.isfile(dest):
stat = os.stat(dest)
size = stat.st_size
else:
# Get root path
root_path: str = request.scope.get("root_path") or "/"
if root_path[-1] != "/":
root_path = root_path + "/"

scheme = request.url.scheme
ver: str = util.sif_version_string(version)
end_point = make_endpoint(app.main.prefix, scheme, request)
end_point = end_point[end_point.index("/", 8) :]
server_info = copy.deepcopy(SERVERINFO_TEMPLATE)
server_info["domain"] = f"{scheme}://{request.url.netloc}"
server_info["maintenance_uri"] = make_endpoint("/resources/maintenance", scheme, request, "maintenance.php")
server_info["update_uri"] = make_endpoint("/resources/maintenance", scheme, request, "update.php")
server_info["login_news_uri"] = make_endpoint(app.webview.prefix, scheme, request, "/announce/index")
server_info["locked_user_uri"] = make_endpoint(app.webview.prefix, scheme, request, "/static/index?id=13")
server_info["server_version"] = ver
server_info["end_point"] = end_point
server_info["consumer_key"] = config.get_consumer_key()
server_info["application_key"] = str(config.get_application_key(), "UTF-8")
server_info["api_uri"] = dict(
(k, make_endpoint(app.main.prefix, scheme, request, k)) for k in server_info["api_uri"].keys()
)

server_info = build_server_info(request, version)
jsondata = json.dumps(server_info).encode("UTF-8")
print(jsondata)
result = io.BytesIO()
with zipfile.ZipFile(result, "w") as z:
now = datetime.datetime.utcnow()
info = zipfile.ZipInfo(
"config/server_info.json", (now.year, now.month, now.day, now.hour, now.minute, now.second)
)
with z.open(info, "w") as f:
dctx = honkypy.encrypt_setup_by_gametype("JP", "config/server_info.json", 3)
f.write(dctx.emit_header())
f.write(dctx.decrypt_block(jsondata))

bytesresult = result.getvalue()

with io.BytesIO() as result:
with zipfile.ZipFile(result, "w") as z:
now = datetime.datetime.now(datetime.UTC)
info = zipfile.ZipInfo(
"config/server_info.json", (now.year, now.month, now.day, now.hour, now.minute, now.second)
)
with z.open(info, "w") as f:
dctx = honkypy.encrypt_setup_by_gametype("JP", "config/server_info.json", 3)
f.write(dctx.emit_header())
f.write(dctx.decrypt_block(jsondata))

bytesresult = result.getvalue()
with open(dest, "wb") as f:
f.write(bytesresult)

size = len(bytesresult)

return key, size


@app.core.get("/server_info.json")
async def server_info_json(
request: fastapi.Request,
version: Annotated[str, fastapi.Query(alias="v")] = "59.0",
):
server_info = build_server_info(request, (59, 0))
server_info["server_version"] = version
return fastapi.responses.JSONResponse(server_info)

0 comments on commit 4e93159

Please sign in to comment.