From 0732487add8aedb2fb93b1b808fddff2086c2d10 Mon Sep 17 00:00:00 2001 From: Kiran Jonnalagadda Date: Fri, 3 Jun 2016 17:49:17 +0530 Subject: [PATCH] Cache session active_at in Redis for periodic job processor --- README.md | 2 +- hasjob/models/user.py | 59 ++++++++++++++++++++++++++++++++++++++++++- manage.py | 10 +++----- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 337078a64..4dcbb76d2 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ Some functionality in Hasjob requires the presence of a sub-board named `www`. C Hasjob requires some tasks to be run in periodic background jobs. These can be called from cron. Use `crontab -e` as the user account running Hasjob and add: - */10 * * * * cd /path/to/hasjob; python manage.py periodic sessions -e dev + */5 * * * * cd /path/to/hasjob; python manage.py periodic sessions -e dev */5 * * * * cd /path/to/hasjob; python manage.py periodic impressions -e dev Switch from `dev` to `production` in a production environment. diff --git a/hasjob/models/user.py b/hasjob/models/user.py index a584c28b1..1ebb8d730 100644 --- a/hasjob/models/user.py +++ b/hasjob/models/user.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- +from time import mktime from datetime import datetime, timedelta +from uuid import UUID from flask import request from flask.ext.lastuser.sqlalchemy import UserBase2 from sqlalchemy_utils.types import UUIDType @@ -8,6 +10,7 @@ from coaster.sqlalchemy import JsonDict from baseframe import _, cache from . import db, BaseMixin +from .. import redis_store __all__ = ['User', 'UserActiveAt', 'AnonUser', 'EventSessionBase', 'EventSession', 'UserEventBase', 'UserEvent'] @@ -185,9 +188,63 @@ def get_session(cls, uuid, user=None, anon_user=None): ues.user = user ues.anon_user = anon_user db.session.add(ues) - ues.active_at = datetime.utcnow() + # ues.active_at = datetime.utcnow() + session_set_active_at(ues.uuid) return ues + @classmethod + def get(cls, uuid): + if isinstance(uuid, basestring): + uuid = UUID(uuid) + return cls.query.filter_by(uuid=uuid).one_or_none() + + @classmethod + def all(cls, uuids): + uuids = [u if isinstance(u, basestring) else UUID(u) for u in uuids] + return cls.query.filter(cls.uuid.in_(uuids)) + + @classmethod + def close_all_inactive(cls): + cls.query.filter(cls.ended_at == None, # NOQA + cls.active_at < (datetime.utcnow() - timedelta(minutes=30))).update( + {cls.ended_at: cls.active_at}) + + +# These should be class or instance methods +def session_set_active_at(sessionid): + sessionid = unicode(sessionid) # In case this is a UUID + now = datetime.utcnow() + active_at = repr(mktime(now.timetuple()) + now.microsecond / 1e6) # repr has more precision than unicode in Py2 + redis_store.hset('hasjob/session_active_at', sessionid, active_at) + + +def session_get_active_at(sessionid=None): + sessionid = unicode(sessionid) + if sessionid: + active_at = redis_store.hget('hasjob/session_active_at', sessionid) + if active_at: + return datetime.fromtimestamp(float(active_at)) + else: + result = redis_store.hgetall('hasjob/session_active_at') + for sessionid in result: + result[sessionid] = datetime.fromtimestamp(float(result[sessionid])) + return result + + +def session_save_active_at(sessionid=None): + if sessionid: + active_at = session_get_active_at(sessionid) + if active_at: + session = EventSession.get(sessionid) + session.active_at = active_at + db.session.commit() + else: + queue = session.get_active_at() # Returns a dict of sessionid (as UUID string) to active_at (as datetime) + sessions = EventSession.all(queue.keys()) + for session in sessions: + session.active_at = queue[unicode(session.uuid)] + db.session.commit() + class UserEventBase(object): @classmethod diff --git a/manage.py b/manage.py index 5bf4295f4..d7864f992 100755 --- a/manage.py +++ b/manage.py @@ -7,21 +7,19 @@ import hasjob.forms as forms import hasjob.views as views from hasjob.models import db +from hasjob.models.user import EventSession, session_save_active_at from hasjob import app, init_for -from datetime import datetime, timedelta periodic = Manager(usage="Periodic tasks from cron (with recommended intervals)") @periodic.option('-e', '--env', default='dev', help="runtime env [default 'dev']") def sessions(env): - """Sweep user sessions to close all inactive sessions (10m)""" + """Update activity timestamps and close inactive sessions (5m)""" manager.init_for(env) - es = models.EventSession + session_save_active_at() # Close all sessions that have been inactive for >= 30 minutes - es.query.filter(es.ended_at == None, # NOQA - es.active_at < (datetime.utcnow() - timedelta(minutes=30))).update( - {es.ended_at: es.active_at}) + EventSession.close_all_inactive() db.session.commit()