Skip to content

Commit

Permalink
proxy all reports to public api
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all authored May 22, 2024
1 parent 7fd1105 commit f6b0ea6
Showing 1 changed file with 115 additions and 111 deletions.
226 changes: 115 additions & 111 deletions src/api/v1/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,134 +244,138 @@ async def insert_report(
"""
Inserts detections into to the plugin database.
"""
if random.randint(1, 10) == 1:
asyncio.create_task(insert_report_v2(detections))
# remove duplicates
df = pd.DataFrame([d.dict() for d in detections])
df.drop_duplicates(subset=["reporter", "reported", "region_id"], inplace=True)

# data validation, there can only be one reporter, and it is unrealistic to send more then 5k reports.
if len(df) > 5000 or df["reporter"].nunique() > 1:
logger.warning({"message": "Too many reports."})
return
asyncio.create_task(insert_report_v2(detections))
return {"detail": "ok"}

# data validation, checks for correct timing
now = int(time.time())
now_upper = int(now + 3600)
now_lower = int(now - 25200)

df_time = df.ts
mask = (df_time > now_upper) | (df_time < now_lower)
if len(df_time[mask].values) > 0:
logger.warning(
{
"message": "Data contains out of bounds time",
"reporter": df["reporter"].unique(),
"time": df_time[mask].values[0],
}
)
return
# if random.randint(1, 10) == 1:
# asyncio.create_task(insert_report_v2(detections))

# # remove duplicates
# df = pd.DataFrame([d.dict() for d in detections])
# df.drop_duplicates(subset=["reporter", "reported", "region_id"], inplace=True)

logger.info({"message": f"Received: {len(df)} from: {df['reporter'].unique()}"})
# # data validation, there can only be one reporter, and it is unrealistic to send more then 5k reports.
# if len(df) > 5000 or df["reporter"].nunique() > 1:
# logger.warning({"message": "Too many reports."})
# return

# Normalize names
df["reporter"] = df["reporter"].apply(
lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
)
df["reported"] = df["reported"].apply(
lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
)
# # data validation, checks for correct timing
# now = int(time.time())
# now_upper = int(now + 3600)
# now_lower = int(now - 25200)

# df_time = df.ts
# mask = (df_time > now_upper) | (df_time < now_lower)
# if len(df_time[mask].values) > 0:
# logger.warning(
# {
# "message": "Data contains out of bounds time",
# "reporter": df["reporter"].unique(),
# "time": df_time[mask].values[0],
# }
# )
# return

# Get a list of unqiue reported names and reporter name
names = list(df["reported"].unique())
names.extend(df["reporter"].unique())

# validate all names
valid_names = [
await functions.to_jagex_name(name)
for name in names
if await functions.is_valid_rsn(name)
]
# logger.debug(f"Valid names: {len(valid_names)}")
# logger.debug(f"{valid_names=}")

# Get IDs for all unique valid names
data = await sql_select_players(valid_names)
# logger.debug(f"Found players before insert: {len(data)}")
# logger.debug(f"{data=}")

# Create entries for players that do not yet exist in Players table
existing_names = [d["name"] for d in data]
# logger.debug(f"{existing_names=}")

new_names = set([name for name in valid_names]).difference(existing_names)
# logger.debug(f"{new_names=}")

# Get new player id's
if new_names:
param = [
{"name": name, "normalized_name": await functions.to_jagex_name(name)}
for name in new_names
]
await functions.batch_function(sql_insert_player, param)
players = await sql_select_players(new_names)
# logger.debug(f"Found players after insert: {len(players)=}, {len(new_names)=}")
data.extend(players)

# Insert detections into Reports table with user ids
# add reported & reporter id
df_names = pd.DataFrame(data)

if len(df) == 0:
logger.warning(
{"message": "empty dataframe, before merge", "detections": detections}
)
return
# logger.info({"message": f"Received: {len(df)} from: {df['reporter'].unique()}"})

# # Normalize names
# df["reporter"] = df["reporter"].apply(
# lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
# )
# df["reported"] = df["reported"].apply(
# lambda name: name.lower().replace("_", " ").replace("-", " ").strip()
# )

# # Get a list of unqiue reported names and reporter name
# names = list(df["reported"].unique())
# names.extend(df["reporter"].unique())

# # validate all names
# valid_names = [
# await functions.to_jagex_name(name)
# for name in names
# if await functions.is_valid_rsn(name)
# ]
# # logger.debug(f"Valid names: {len(valid_names)}")
# # logger.debug(f"{valid_names=}")

# # Get IDs for all unique valid names
# data = await sql_select_players(valid_names)
# # logger.debug(f"Found players before insert: {len(data)}")
# # logger.debug(f"{data=}")

# # Create entries for players that do not yet exist in Players table
# existing_names = [d["name"] for d in data]
# # logger.debug(f"{existing_names=}")

# new_names = set([name for name in valid_names]).difference(existing_names)
# # logger.debug(f"{new_names=}")

# # Get new player id's
# if new_names:
# param = [
# {"name": name, "normalized_name": await functions.to_jagex_name(name)}
# for name in new_names
# ]
# await functions.batch_function(sql_insert_player, param)
# players = await sql_select_players(new_names)
# # logger.debug(f"Found players after insert: {len(players)=}, {len(new_names)=}")
# data.extend(players)

# # Insert detections into Reports table with user ids
# # add reported & reporter id
# df_names = pd.DataFrame(data)

# if len(df) == 0:
# logger.warning(
# {"message": "empty dataframe, before merge", "detections": detections}
# )
# return

if len(df_names) == 0:
logger.warning(
{"message": "empty dataframe names, before merge", "detections": detections}
)
return
# if len(df_names) == 0:
# logger.warning(
# {"message": "empty dataframe names, before merge", "detections": detections}
# )
# return

df = df.merge(df_names, left_on="reported", right_on="name")
# df = df.merge(df_names, left_on="reported", right_on="name")

if len(df) == 0:
logger.warning(
{"message": "empty dataframe, after merge", "detections": detections}
)
return
# if len(df) == 0:
# logger.warning(
# {"message": "empty dataframe, after merge", "detections": detections}
# )
# return

reporter = df["reporter"].unique()
# reporter = df["reporter"].unique()

if len(reporter) != 1:
logger.warning({"message": "No reporter", "detections": detections})
return
# if len(reporter) != 1:
# logger.warning({"message": "No reporter", "detections": detections})
# return

reporter_id = df_names.query(f"name == {reporter}")["id"].to_list()
# reporter_id = df_names.query(f"name == {reporter}")["id"].to_list()

if len(reporter_id) == 0:
logger.warning({"message": "No reporter in df_names", "detections": detections})
return
# if len(reporter_id) == 0:
# logger.warning({"message": "No reporter in df_names", "detections": detections})
# return

asyncio.create_task(insert_active_reporter(reporter[0]))
# asyncio.create_task(insert_active_reporter(reporter[0]))

# if reporter_id[0] == 657248:
# logger.debug({"message": "Temporary ignoring anonymous reporter"})
# return
# # if reporter_id[0] == 657248:
# # logger.debug({"message": "Temporary ignoring anonymous reporter"})
# # return

df["reporter_id"] = reporter_id[0]
# df["reporter_id"] = reporter_id[0]

if manual_detect:
df["manual_detect"] = manual_detect
# if manual_detect:
# df["manual_detect"] = manual_detect

# Parse data to param
data = df.to_dict("records")
param = [await parse_detection(d) for d in data]
# # Parse data to param
# data = df.to_dict("records")
# param = [await parse_detection(d) for d in data]

# Parse query
await functions.batch_function(sql_insert_report, param)
return {"detail": "ok"}
# # Parse query
# await functions.batch_function(sql_insert_report, param)
# return {"detail": "ok"}


@router.get("/report/count", tags=["Report"])
Expand Down

0 comments on commit f6b0ea6

Please sign in to comment.