Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/store user traffic #84

Merged
merged 8 commits into from
Nov 7, 2023
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ docker="6.1.2"
pandas = "2.1.1"
numpy = "1.26.0"
xlsxwriter = "3.1.7"
pybloom-live="4.0.0"
[requires]
python_version = "3.11"
12 changes: 8 additions & 4 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.api import stats
from .config import config

from .api import members, auth, events, admin, mail, jobs
Expand All @@ -27,17 +29,19 @@ def create_app():
allow_headers=["*"],
)

# Fetch config object
env = os.getenv('API_ENV', 'default')
app.config = config[env]

# Routers
app.include_router(members.router, prefix="/api/member", tags=['members'])
app.include_router(auth.router, prefix="/api/auth", tags=['auth'])
app.include_router(events.router, prefix="/api/event", tags=['event'])
app.include_router(admin.router, prefix="/api/admin", tags=['admin'])
app.include_router(mail.router, prefix="/api/mail", tags=['mail'])
app.include_router(jobs.router, prefix="/api/jobs", tags=['job'])

# Fetch config object
env = os.getenv('API_ENV', 'default')
app.config = config[env]
# only visible in development
app.include_router(stats.router, prefix="/api/stats", tags=['stats'], include_in_schema=app.config!='production')

setup_db(app)
# Set tokens to expire at at "exp"
Expand Down
221 changes: 221 additions & 0 deletions app/api/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import math
import logging
from datetime import datetime, timedelta
from typing import Optional
from uuid import UUID
from app.api.utils import find_object_title_from_path
from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Response
from app.auth_helpers import authorize, authorize_admin
from app.db import get_database
import pickle as pkl
from app.utils.date import Interval, add_continous_datapoints_to_log, format_date
from pybloom_live import ScalableBloomFilter

from app.models import AccessTokenPayload, PageVisit


router = APIRouter()

# groups all timestamps to their day, but can be extended to group to an arbitrary time
@router.get('/unique-visit')
def get_number_of_unique_visitors(request: Request, start: Optional[str] = None, end:Optional[str] = None, token: AccessTokenPayload = Depends(authorize_admin)):
''' Get unique visitors with an optional interval \n
start: if start date is not provided it uses the first recorded timestamp as start date \n
end: if end date is not provided it uses the current date
'''
db = get_database(request)
datetime_format = "%Y-%m-%d %H:%M:%S"
interval = Interval.day
if end == None:
end_date = datetime.now()
else :
end_date = format_date(end, datetime_format)

match_query = {"timestamp": {"$lte": end_date}}
# format on the aggregate query i.e the interval visits are grouped in
query_format = "%Y-%m-%d"
start_date = None
if start:
start_date = format_date(start, datetime_format)
start_query = {"timestamp": {"$gte": start_date}}
# redefine match_query to contain start
match_query = {"$and": [start_query, match_query]}

if start_date >= end_date:
raise HTTPException(400, "Start date cannot be larger then end date")
date_range = math.ceil((end_date-start_date).total_seconds()/3600)
# checks for interval less than one day to group the dates in more detailed groups
if date_range <= 48:
interval = Interval.hour
query_format = f"{query_format}T%H"

pipeline = [
{"$match": match_query},
{"$group": {
# could only use $dateToString in _id field
"_id": {"$dateToString": {"format": query_format, "date": "$timestamp"}},
"count" : {"$sum": 1},
}},
{"$sort": {"_id": 1}},
{"$project": {
"date": "$_id",
"count": 1,
"_id": 0,
}}
]

res = db.uniqueVisitLog.aggregate(pipeline)
visits = list(res)
if not len(visits):
return visits

if not start_date:
# if start date is not provided the first timestamp is used as start date
start_date = format_date(visits[0]["date"], query_format)
return add_continous_datapoints_to_log(visits, start_date, end_date, interval)

def register_new_visit(db, user_id):
member = db.members.find_one({'id': UUID(user_id)})
if not member:
logging.error(f"404 - User with {user_id} could not be found")
return
today = datetime.today().strftime('%Y-%m-%d')
stats = db.uniqueFilter.find_one({'entry_date': today})
update_dict = {}
# checks if bloomfilter is added for the day
if not stats:
update_dict = {
"createdAt": datetime.utcnow(),
"entry_date": today,
}
# adds a fresh bloomfilter
bf = ScalableBloomFilter(initial_capacity=1000, error_rate=0.001, mode=ScalableBloomFilter.SMALL_SET_GROWTH)
else:
# load the existing filter into memory
bf = pkl.loads(stats["bloom_filter"])

# add returns True if key exists in filter and false on successful insert(for some reason)
exists = bf.add(user_id)
if exists:
return

# stores time object instead of string representation since we don't need to format
# 26 bytes per entry timestamp and _id
ts = datetime.now()
res = db.uniqueVisitLog.insert_one({"timestamp": ts})
if not res:
logging.error(f"could not insert timestamp: {ts}")
return

update_dict.update({"bloom_filter": pkl.dumps(bf)})

# upsert=True to create the document if non is found for this date
res = db.uniqueFilter.update_one({'entry_date': today}, {"$set": update_dict}, upsert=True)
if not res:
logging.error(f"could not update bloom_filter with: {today}, {update_dict}")

@router.post('/unique-visit')
async def add_unique_member_visit(request: Request, background_task: BackgroundTasks, token: AccessTokenPayload = Depends(authorize)):
''' endpoint to track unique visitors per day. Uses background_tasks as we want to return 200 ok at once.
The reason is that this endpoint should have minimal effect on the user and errors should only be logged.
'''
db = get_database(request)
background_task.add_task(register_new_visit, db, token.user_id)
return Response(status_code=200)

def register_page_visit(db, page):
ts = datetime.now()
res = db.pageVisitLog.insert_one({"timestamp": ts, "metaData": page, "visits": 1})
if not res:
logging.error(f"could not insert visit on page: {page}")

# builds on top of the react-router-dom location.pathname for identifying the page
@router.post('/page-visit')
async def add_page_visit(request: Request, payload: PageVisit, background_task: BackgroundTasks):
db = get_database(request)
background_task.add_task(register_page_visit, db, payload.page)
return Response(status_code=200)

# gets the numbers of visit for a page between start and end
# if end is not specified its set to datetime.now()
@router.get('/page-visits')
def get_page_visits(request: Request, page: str, start: Optional[str] = None, end: Optional[str] = None, token: AccessTokenPayload = Depends(authorize_admin)):
db = get_database(request)
datetime_format = "%Y-%m-%dT%H:%M:%S"
search_consitions = []

if end == None:
end_date = datetime.now()
else :
end_date = format_date(end, datetime_format)

end_query = {"timestamp": {"$lte": end_date}}
search_consitions.append(end_query)
start_date = None

if start:
start_date = format_date(start, datetime_format)
start_query = {"timestamp": {"$gte": start_date}}
# redefine match_query to contain start
search_consitions.append(start_query)

if start_date >= end_date:
raise HTTPException(400, "Start date cannot be larger then end date")
search_consitions.append({"metaData": page})
pipeline = [
{"$match": {"$and": search_consitions}},
{"$group": {
# could only use $dateToString in _id field
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$timestamp"}},
"count" : {"$sum": 1},
}},
{"$sort": {"_id": 1}},
{"$project": {
"date": "$_id",
"count": 1,
"_id": 0,
}}
]

res = db.pageVisitLog.aggregate(pipeline)
visits = list(res)

if not len(visits):
return visits

if not start_date:
# if start date is not provided the first timestamp is used as start date
start_date = format_date(visits[0]["date"], "%Y-%m-%d")

return add_continous_datapoints_to_log(visits, start_date, end_date, Interval.day)

@router.get('/most_visited_pages_last_month')
def get_most_visited_page(request: Request, token: AccessTokenPayload = Depends(authorize_admin)):
db = get_database(request)
now = datetime.now()
start_date = now - timedelta(weeks=4)
pipeline = [
{"$match": {"$and": [{"timestamp": {"$lte": now}}, {"timestamp": {"$gte": start_date}}]}},
{"$group": {
# could only use $dateToString in _id field
"_id": "$metaData",
"count" : {"$sum": 1},
}},
{"$sort": {"count": -1}},
{"$limit": 5},
{"$project":{
"_id": 0,
"path": "$_id",
"count": "$count"
}}
]
res = db.pageVisitLog.aggregate(pipeline)
pages = list(res)

for page in pages:
path = page["path"]
title = find_object_title_from_path(path, db)
if not title:
title = path
page.update({"title": title})
return pages
40 changes: 40 additions & 0 deletions app/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Dict
from fastapi import HTTPException
from uuid import UUID
from datetime import datetime

from pymongo import UpdateOne
from pymongo.collection import Collection

from app.models import EventDB

Expand Down Expand Up @@ -95,3 +97,41 @@ async def penalize(db, uid: UUID):

if not res:
raise HTTPException(500)

def get_uuid(id: str):
try:
return UUID(id)
except ValueError:
return None

# get event title from path, can be extended to retrieve other fields
def get_event_title_from_path(db, path):
id = get_uuid(path)
if not id:
return None
event = db.events.find_one({ 'eid': id })
return event["title"]

# get job title from path, can be extended to retrieve other fields
def get_job_title_from_path(db, path):
id = get_uuid(path)
if not id:
return None
job = db.jobs.find_one({ 'id': id })
return job["title"]

# function for finding objects for the paths containing uuid
# such as events and jobs
def find_object_title_from_path(path: str, db: Collection):
path_to_db = {
"event": get_event_title_from_path,
"jobs": get_job_title_from_path,
}
sub_paths = path.split("/")
for i, sub_path in enumerate(sub_paths):
if sub_path not in path_to_db:
continue
if i == len(sub_path) - 1:
return None
return path_to_db[sub_path](db, sub_paths[i+1])
return None
24 changes: 20 additions & 4 deletions app/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pymongo.database import Database
from app import config
from app.config import config
from pymongo import MongoClient
from fastapi import Request

Expand All @@ -22,22 +22,38 @@ def get_qr_path(request: Request) -> str:
def get_export_path(request: Request) -> str:
return request.app.export_path

def setup_stats_collections(app):
vists_collection_name = "uniqueVisitLog"
if not vists_collection_name in app.db.list_collection_names():
# create timeseries collection for logging unique users
app.db.create_collection(vists_collection_name, timeseries={"timeField": "timestamp"})

# setup timeseries for logging page visits
page_vists_collection_name = "pageVisitLog"
if not page_vists_collection_name in app.db.list_collection_names():
# create timeseries collection for logging unique users
app.db.create_collection(page_vists_collection_name, timeseries={"timeField": "timestamp", "metaField": "metaData"})

# bloom_filter will be removed after 24 hours as its not used after the day is over
app.db.uniqueFilter.create_index("createdAt", expireAfterSeconds=24*60*60 )

def setup_db(app):
app.db = MongoClient(app.config.MONGO_URI, uuidRepresentation="standard")[
app.config.MONGO_DBNAME]
file_storage_path = "db/file_storage"
app.image_path = f'{file_storage_path}/event_images'
app.jobImage_path = f'{file_storage_path}/job_images'
app.export_path = f'{file_storage_path}/event_exports'

# setup all collections needed for tracking user activity
setup_stats_collections(app)

# Expire reset password codes after 10 minutes
app.db.passwordResets.create_index("createdAt", expireAfterSeconds=60 * 10)
app.qr_path = f'{file_storage_path}/qr'
if app.config.MONGO_DBNAME == 'test':
app.image_path = 'db/test_event_images'
return
app.image_path = f'{file_storage_path}/event_images'
app.jobImage_path = f'{file_storage_path}/job_images'


def get_test_db():
test_config = config['test']
Expand Down
Loading
Loading