Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache session active_at in Redis for periodic job processor #340

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Some functionality in Hasjob requires the presence of a sub-board named `www`. C

Hasjob requires some tasks to be run in periodic background jobs. These can be called from cron. Use `crontab -e` as the user account running Hasjob and add:

*/10 * * * * cd /path/to/hasjob; python manage.py periodic sessions -e dev
*/5 * * * * cd /path/to/hasjob; python manage.py periodic sessions -e dev
*/5 * * * * cd /path/to/hasjob; python manage.py periodic impressions -e dev

Switch from `dev` to `production` in a production environment.
Expand Down
59 changes: 58 additions & 1 deletion hasjob/models/user.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# -*- coding: utf-8 -*-

from time import mktime
from datetime import datetime, timedelta
from uuid import UUID
from flask import request
from flask.ext.lastuser.sqlalchemy import UserBase2
from sqlalchemy_utils.types import UUIDType
from coaster.utils import unicode_http_header, uuid1mc
from coaster.sqlalchemy import JsonDict
from baseframe import _, cache
from . import db, BaseMixin
from .. import redis_store

__all__ = ['User', 'UserActiveAt', 'AnonUser', 'EventSessionBase', 'EventSession', 'UserEventBase', 'UserEvent']

Expand Down Expand Up @@ -185,9 +188,63 @@ def get_session(cls, uuid, user=None, anon_user=None):
ues.user = user
ues.anon_user = anon_user
db.session.add(ues)
ues.active_at = datetime.utcnow()
# ues.active_at = datetime.utcnow()
session_set_active_at(ues.uuid)
return ues

@classmethod
def get(cls, uuid):
if isinstance(uuid, basestring):
uuid = UUID(uuid)
return cls.query.filter_by(uuid=uuid).one_or_none()

@classmethod
def all(cls, uuids):
uuids = [u if isinstance(u, basestring) else UUID(u) for u in uuids]
return cls.query.filter(cls.uuid.in_(uuids))

@classmethod
def close_all_inactive(cls):
cls.query.filter(cls.ended_at == None, # NOQA
cls.active_at < (datetime.utcnow() - timedelta(minutes=30))).update(
{cls.ended_at: cls.active_at})


# These should be class or instance methods
def session_set_active_at(sessionid):
sessionid = unicode(sessionid) # In case this is a UUID
now = datetime.utcnow()
active_at = repr(mktime(now.timetuple()) + now.microsecond / 1e6) # repr has more precision than unicode in Py2
redis_store.hset('hasjob/session_active_at', sessionid, active_at)


def session_get_active_at(sessionid=None):
sessionid = unicode(sessionid)
if sessionid:
active_at = redis_store.hget('hasjob/session_active_at', sessionid)
if active_at:
return datetime.fromtimestamp(float(active_at))
else:
result = redis_store.hgetall('hasjob/session_active_at')
for sessionid in result:
result[sessionid] = datetime.fromtimestamp(float(result[sessionid]))
return result


def session_save_active_at(sessionid=None):
if sessionid:
active_at = session_get_active_at(sessionid)
if active_at:
session = EventSession.get(sessionid)
session.active_at = active_at
db.session.commit()
else:
queue = session.get_active_at() # Returns a dict of sessionid (as UUID string) to active_at (as datetime)
sessions = EventSession.all(queue.keys())
for session in sessions:
session.active_at = queue[unicode(session.uuid)]
db.session.commit()


class UserEventBase(object):
@classmethod
Expand Down
10 changes: 4 additions & 6 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@
import hasjob.forms as forms
import hasjob.views as views
from hasjob.models import db
from hasjob.models.user import EventSession, session_save_active_at
from hasjob import app, init_for
from datetime import datetime, timedelta

periodic = Manager(usage="Periodic tasks from cron (with recommended intervals)")


@periodic.option('-e', '--env', default='dev', help="runtime env [default 'dev']")
def sessions(env):
"""Sweep user sessions to close all inactive sessions (10m)"""
"""Update activity timestamps and close inactive sessions (5m)"""
manager.init_for(env)
es = models.EventSession
session_save_active_at()
# Close all sessions that have been inactive for >= 30 minutes
es.query.filter(es.ended_at == None, # NOQA
es.active_at < (datetime.utcnow() - timedelta(minutes=30))).update(
{es.ended_at: es.active_at})
EventSession.close_all_inactive()
db.session.commit()


Expand Down