Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy all reports to public api #529

Merged
merged 1 commit into from
May 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 115 additions & 111 deletions src/api/v1/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,134 +244,138 @@ async def insert_report(
"""
Inserts detections into to the plugin database.
"""
if random.randint(1, 10) == 1:
asyncio.create_task(insert_report_v2(detections))
# remove duplicates
df = pd.DataFrame([d.dict() for d in detections])
df.drop_duplicates(subset=["reporter", "reported", "region_id"], inplace=True)

# data validation, there can only be one reporter, and it is unrealistic to send more then 5k reports.
if len(df) > 5000 or df["reporter"].nunique() > 1:
logger.warning({"message": "Too many reports."})
return
asyncio.create_task(insert_report_v2(detections))
return {"detail": "ok"}

# data validation, checks for correct timing
now = int(time.time())
now_upper = int(now + 3600)
now_lower = int(now - 25200)

df_time = df.ts
mask = (df_time > now_upper) | (df_time < now_lower)
if len(df_time[mask].values) > 0:
logger.warning(
{
"message": "Data contains out of bounds time",
"reporter": df["reporter"].unique(),
"time": df_time[mask].values[0],
}
)
return
# if random.randint(1, 10) == 1:
# asyncio.create_task(insert_report_v2(detections))

# # remove duplicates
# df = pd.DataFrame([d.dict() for d in detections])
# df.drop_duplicates(subset=["reporter", "reported", "region_id"], inplace=True)

logger.info({"message": f"Received: {len(df)} from: {df['reporter'].unique()}"})
# # data validation, there can only be one reporter, and it is unrealistic to send more then 5k reports.
# if len(df) > 5000 or df["reporter"].nunique() > 1:
# logger.warning({"message": "Too many reports."})
# return

# Normalize names
df["reporter"] = df["reporter"].apply(
lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
)
df["reported"] = df["reported"].apply(
lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
)
# # data validation, checks for correct timing
# now = int(time.time())
# now_upper = int(now + 3600)
# now_lower = int(now - 25200)

# df_time = df.ts
# mask = (df_time > now_upper) | (df_time < now_lower)
# if len(df_time[mask].values) > 0:
# logger.warning(
# {
# "message": "Data contains out of bounds time",
# "reporter": df["reporter"].unique(),
# "time": df_time[mask].values[0],
# }
# )
# return

# Get a list of unqiue reported names and reporter name
names = list(df["reported"].unique())
names.extend(df["reporter"].unique())

# validate all names
valid_names = [
await functions.to_jagex_name(name)
for name in names
if await functions.is_valid_rsn(name)
]
# logger.debug(f"Valid names: {len(valid_names)}")
# logger.debug(f"{valid_names=}")

# Get IDs for all unique valid names
data = await sql_select_players(valid_names)
# logger.debug(f"Found players before insert: {len(data)}")
# logger.debug(f"{data=}")

# Create entries for players that do not yet exist in Players table
existing_names = [d["name"] for d in data]
# logger.debug(f"{existing_names=}")

new_names = set([name for name in valid_names]).difference(existing_names)
# logger.debug(f"{new_names=}")

# Get new player id's
if new_names:
param = [
{"name": name, "normalized_name": await functions.to_jagex_name(name)}
for name in new_names
]
await functions.batch_function(sql_insert_player, param)
players = await sql_select_players(new_names)
# logger.debug(f"Found players after insert: {len(players)=}, {len(new_names)=}")
data.extend(players)

# Insert detections into Reports table with user ids
# add reported & reporter id
df_names = pd.DataFrame(data)

if len(df) == 0:
logger.warning(
{"message": "empty dataframe, before merge", "detections": detections}
)
return
# logger.info({"message": f"Received: {len(df)} from: {df['reporter'].unique()}"})

# # Normalize names
# df["reporter"] = df["reporter"].apply(
# lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
# )
# df["reported"] = df["reported"].apply(
# lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
# )

# # Get a list of unqiue reported names and reporter name
# names = list(df["reported"].unique())
# names.extend(df["reporter"].unique())

# # validate all names
# valid_names = [
# await functions.to_jagex_name(name)
# for name in names
# if await functions.is_valid_rsn(name)
# ]
# # logger.debug(f"Valid names: {len(valid_names)}")
# # logger.debug(f"{valid_names=}")

# # Get IDs for all unique valid names
# data = await sql_select_players(valid_names)
# # logger.debug(f"Found players before insert: {len(data)}")
# # logger.debug(f"{data=}")

# # Create entries for players that do not yet exist in Players table
# existing_names = [d["name"] for d in data]
# # logger.debug(f"{existing_names=}")

# new_names = set([name for name in valid_names]).difference(existing_names)
# # logger.debug(f"{new_names=}")

# # Get new player id's
# if new_names:
# param = [
# {"name": name, "normalized_name": await functions.to_jagex_name(name)}
# for name in new_names
# ]
# await functions.batch_function(sql_insert_player, param)
# players = await sql_select_players(new_names)
# # logger.debug(f"Found players after insert: {len(players)=}, {len(new_names)=}")
# data.extend(players)

# # Insert detections into Reports table with user ids
# # add reported & reporter id
# df_names = pd.DataFrame(data)

# if len(df) == 0:
# logger.warning(
# {"message": "empty dataframe, before merge", "detections": detections}
# )
# return

if len(df_names) == 0:
logger.warning(
{"message": "empty dataframe names, before merge", "detections": detections}
)
return
# if len(df_names) == 0:
# logger.warning(
# {"message": "empty dataframe names, before merge", "detections": detections}
# )
# return

df = df.merge(df_names, left_on="reported", right_on="name")
# df = df.merge(df_names, left_on="reported", right_on="name")

if len(df) == 0:
logger.warning(
{"message": "empty dataframe, after merge", "detections": detections}
)
return
# if len(df) == 0:
# logger.warning(
# {"message": "empty dataframe, after merge", "detections": detections}
# )
# return

reporter = df["reporter"].unique()
# reporter = df["reporter"].unique()

if len(reporter) != 1:
logger.warning({"message": "No reporter", "detections": detections})
return
# if len(reporter) != 1:
# logger.warning({"message": "No reporter", "detections": detections})
# return

reporter_id = df_names.query(f"name == {reporter}")["id"].to_list()
# reporter_id = df_names.query(f"name == {reporter}")["id"].to_list()

if len(reporter_id) == 0:
logger.warning({"message": "No reporter in df_names", "detections": detections})
return
# if len(reporter_id) == 0:
# logger.warning({"message": "No reporter in df_names", "detections": detections})
# return

asyncio.create_task(insert_active_reporter(reporter[0]))
# asyncio.create_task(insert_active_reporter(reporter[0]))

# if reporter_id[0] == 657248:
# logger.debug({"message": "Temporary ignoring anonymous reporter"})
# return
# # if reporter_id[0] == 657248:
# # logger.debug({"message": "Temporary ignoring anonymous reporter"})
# # return

df["reporter_id"] = reporter_id[0]
# df["reporter_id"] = reporter_id[0]

if manual_detect:
df["manual_detect"] = manual_detect
# if manual_detect:
# df["manual_detect"] = manual_detect

# Parse data to param
data = df.to_dict("records")
param = [await parse_detection(d) for d in data]
# # Parse data to param
# data = df.to_dict("records")
# param = [await parse_detection(d) for d in data]

# Parse query
await functions.batch_function(sql_insert_report, param)
return {"detail": "ok"}
# # Parse query
# await functions.batch_function(sql_insert_report, param)
# return {"detail": "ok"}


@router.get("/report/count", tags=["Report"])
Expand Down
Loading