From f02e66008095b80aefe6816667d7bc1d5b8c9ab1 Mon Sep 17 00:00:00 2001 From: Andrew Dunham Date: Fri, 27 May 2016 15:54:29 -0700 Subject: [PATCH 1/5] Check for and reload rules when we get a log line Essentially, when the management frontend updates a rule, we need to ensure that our background tasks will reload it properly. In a perfect world, what we'd do is have the frontend simply notify the backend that it needs to reload, which it then does. Historically, that's what we've done: we had a `reload_rules` task that would force the manager to load rules from the database. However, this doesn't work for a combination of 2 reasons: 1. A Celery task, by default, goes to a single worker 2. Broadcast tasks do not work when using the Redis broker in Celery (see https://github.com/celery/celery/issues/2638) As such, there's no good way to have the Flask frontend notify all workers that they need to update their rules. The solution is to have the worker(s) reload all rules every time they get a new log line. This does put no small amount of load on the DB, but we implement a somewhat stupid caching mechanism that at least prevents us from re-fetching all the rules every time. As a long-term solution, we should figure out a way to either get broadcast tasks working in Celery, or use another notification method. --- doorman/extensions.py | 61 ++++++++- doorman/manage/views.py | 10 +- doorman/models.py | 4 +- doorman/tasks.py | 6 - ...f01adbe31_add_updated_at_column_to_rule.py | 28 ++++ tests/test_functional.py | 127 +++++++++--------- 6 files changed, 157 insertions(+), 79 deletions(-) create mode 100644 migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py diff --git a/doorman/extensions.py b/doorman/extensions.py index a437f78..228a89a 100644 --- a/doorman/extensions.py +++ b/doorman/extensions.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import time +from hashlib import sha256 from collections import defaultdict from celery import Celery @@ -49,8 +51,8 @@ def handle_result(self, data, **kwargs): class RuleManager(object): def __init__(self, app=None): - self.loaded_rules = False self.network = None + self.rules_hash = None if app is not None: self.init_app(app) @@ -84,13 +86,57 @@ def load_alerters(self): self.alerters[name] = klass(config) + def should_reload_rules(self): + """ + Checks if we need to reload the set of rules. Essentially, since + creating a rules network might be expensive, we want to avoid reloading + for every incoming log line. We get around this by storing a hash of + the current rules, and querying if the rules in the database have + differed from this hash on every new log entry. If they have, we then + re-build our network. + """ + from doorman.models import Rule + from sqlalchemy.exc import SQLAlchemyError + + if self.rules_hash is None: + return True + + limited_rules = [] + with self.app.app_context(): + try: + limited_rules = (Rule.query + .with_entities(Rule.id, Rule.updated_at) + .order_by(Rule.id).all()) + except SQLAlchemyError: + # Ignore DB errors when testing + if self.app.config['TESTING']: + return False + else: + raise + + # Feed everything into a hash + rules_hash = self.make_rules_hash(limited_rules) + return rules_hash != self.rules_hash + + def make_rules_hash(self, rules): + hash = sha256() + for rule in rules: + key = '{id} {updated_at}'.format( + id=rule.id, + updated_at=str(rule.updated_at) + ) + hash.update(key) + return hash.hexdigest() + def load_rules(self): """ Load rules from the database. """ from doorman.rules import Network from doorman.models import Rule from sqlalchemy.exc import SQLAlchemyError - self.rules = [] + if not self.should_reload_rules(): + return + with self.app.app_context(): try: all_rules = list(Rule.query.all()) @@ -111,16 +157,19 @@ def load_rules(self): # Create the rule. self.network.parse_query(rule.conditions, alerters=rule.alerters, rule_id=rule.id) + # Recalculate the rules hash and save it. + # Note: we do this here, and not in should_reload_rules, because it's + # possible that we've reloaded a rule in between the two functions, and + # thus we accidentally don't reload when we should. + self.rules_hash = self.make_rules_hash(all_rules) + def handle_log_entry(self, entry, node): """ The actual entrypoint for handling input log entries. """ from doorman.models import Rule from doorman.rules import RuleMatch from doorman.utils import extract_results - # Need to lazy-load rules - if not self.loaded_rules: - self.load_rules() - self.loaded_rules = True + self.load_rules() to_trigger = [] for name, action, columns, timestamp in extract_results(entry): diff --git a/doorman/manage/views.py b/doorman/manage/views.py index 7c6c6bb..35b7932 100644 --- a/doorman/manage/views.py +++ b/doorman/manage/views.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import json +import datetime as dt from flask import Blueprint, current_app, flash, jsonify, redirect, render_template, request, url_for from flask_login import login_required @@ -23,7 +24,6 @@ DistributedQuery, DistributedQueryTask, FilePath, Node, Pack, Query, Tag, Rule, StatusLog ) -from doorman.tasks import reload_rules from doorman.utils import ( create_query_pack_from_upload, flash_errors, get_paginate_options ) @@ -469,9 +469,9 @@ def add_rule(): rule = Rule(name=form.name.data, alerters=form.alerters.data, description=form.description.data, - conditions=form.conditions.data) + conditions=form.conditions.data, + updated_at=dt.datetime.utcnow()) rule.save() - reload_rules.delay() return redirect(url_for('manage.rule', rule_id=rule.id)) @@ -489,8 +489,8 @@ def rule(rule_id): rule = rule.update(name=form.name.data, alerters=form.alerters.data, description=form.description.data, - conditions=form.conditions.data) - reload_rules.delay() + conditions=form.conditions.data, + updated_at=dt.datetime.utcnow()) return redirect(url_for('manage.rule', rule_id=rule.id)) form = UpdateRuleForm(request.form, obj=rule) diff --git a/doorman/models.py b/doorman/models.py index a4a3d4f..a11af40 100644 --- a/doorman/models.py +++ b/doorman/models.py @@ -445,12 +445,14 @@ class Rule(SurrogatePK, Model): alerters = Column(ARRAY(db.String), nullable=False) description = Column(db.String, nullable=True) conditions = Column(JSONB) + updated_at = Column(db.DateTime, nullable=False, default=dt.datetime.utcnow) - def __init__(self, name, alerters, description=None, conditions=None): + def __init__(self, name, alerters, description=None, conditions=None, updated_at=None): self.name = name self.description = description self.alerters = alerters self.conditions = conditions + self.updated_at = updated_at class User(UserMixin, SurrogatePK, Model): diff --git a/doorman/tasks.py b/doorman/tasks.py index 3a75a95..c018aac 100644 --- a/doorman/tasks.py +++ b/doorman/tasks.py @@ -12,12 +12,6 @@ def analyze_result(result, node): return -@celery.task() -def reload_rules(): - current_app.rule_manager.load_rules() - return - - @celery.task() def example_task(one, two): print('Adding {0} and {1}'.format(one, two)) diff --git a/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py b/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py new file mode 100644 index 0000000..cb7f54c --- /dev/null +++ b/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py @@ -0,0 +1,28 @@ +"""Add "updated_at" column to Rule + +Revision ID: c17f01adbe31 +Revises: b50c705fea80 +Create Date: 2016-05-27 15:51:58.168840 + +""" + +# revision identifiers, used by Alembic. +revision = 'c17f01adbe31' +down_revision = 'b50c705fea80' + +from alembic import op +import sqlalchemy as sa +import doorman.database + + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.add_column('rule', + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now())) + ### end Alembic commands ### + + +def downgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('rule', 'updated_at') + ### end Alembic commands ### diff --git a/tests/test_functional.py b/tests/test_functional.py index 399bf9d..34cb613 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -980,20 +980,6 @@ class TestCreateTag: class TestAddRule: - def test_will_reload_rules(self, node, app, testapp): - from doorman.tasks import reload_rules - - with mock.patch.object(reload_rules, 'delay', return_value=None) as mock_delay: - resp = testapp.post(url_for('manage.add_rule'), { - 'name': 'Test Rule', - 'type': 'blacklist', - 'action': 'both', - 'alerters': 'debug', - 'config': '{"field_name": "foo", "blacklist": []}', - }) - - assert mock_delay.called - def test_supports_custom_operators(self, node, app, testapp): # Add a rule to the application rule = """ @@ -1047,60 +1033,79 @@ def test_supports_custom_operators(self, node, app, testapp): class TestUpdateRule: + pass - def test_will_reload_rules(self, db, node, app, testapp): - from doorman.tasks import reload_rules - rule_conds = { - "condition": "AND", - "rules": [ - { - "id": "query_name", - "field": "query_name", - "type": "string", - "input": "text", - "operator": "equal", - "value": "foo", - }, - ], +class TestRuleManager: + def test_will_load_rules_on_each_call(self, app, db): + """ + Verifies that each call to handle_log_entry will result in a call to load_rules. + """ + from doorman.rules import Network + + mgr = app.rule_manager + now = dt.datetime.utcnow() + + with mock.patch.object(mgr, 'load_rules', wraps=lambda: []) as mock_load_rules: + with mock.patch.object(mgr, 'network', wraps=Network()) as mock_network: + for i in range(0, 2): + mgr.handle_log_entry({ + 'data': [ + { + "diffResults": { + "added": [{'op': 'added'}], + "removed": "", + }, + "name": "fake", + "hostIdentifier": "hostname.local", + "calendarTime": "%s %s" % (now.ctime(), "UTC"), + "unixTime": now.strftime('%s') + } + ] + }, {'host_identifier': 'yes'}) + + assert mock_load_rules.call_count == 2 + + def test_will_reload_when_changed(self, app, db): + from doorman.models import Rule + + mgr = app.rule_manager + dummy_rule = { + "id": "query_name", + "field": "query_name", + "type": "string", + "input": "text", + "operator": "equal", + "value": "dummy-query", } - r = Rule( - name='Test-Rule', - alerters=['debug'], - description='A test rule', - conditions=rule_conds + # Insert a first rule. + rule = Rule( + name='foo', + alerters=[], + conditions={'condition': 'AND', 'rules': [dummy_rule]} ) - db.session.add(r) + db.session.add(rule) + db.session.commit() + + # Verify that we will reload these rules + assert mgr.should_reload_rules() is True + + # Actually load them + mgr.load_rules() + + # Verify that (with no changes made), we should NOT reload. + assert mgr.should_reload_rules() is False + + # Make a change to a rule. + rule.update( + conditions={'condition': 'OR', 'rules': [dummy_rule]}, + updated_at=dt.datetime.utcnow()) + db.session.add(rule) db.session.commit() - # Manually reload the rules here, and verify that we have the right - # rule in our list - app.rule_manager.load_rules() - assert len(app.rule_manager.network.conditions) == 2 - - condition_classes = [x.__class__.__name__ for x in app.rule_manager.network.conditions.values()] - assert sorted(condition_classes) == ['AndCondition', 'EqualCondition'] - - # Fake wrapper that just calls reload - def real_reload(*args, **kwargs): - app.rule_manager.load_rules() - - # Update the rule - rule_conds['condition'] = 'OR' - with mock.patch.object(reload_rules, 'delay', wraps=real_reload) as mock_delay: - resp = testapp.post(url_for('manage.rule', rule_id=r.id), { - 'name': 'Test-Rule', - 'alerters': 'debug', - 'conditions': json.dumps(rule_conds), - }) - - assert mock_delay.called - - # Trigger a manual reload again, and verify that it's been updated - app.rule_manager.load_rules() - condition_classes = [x.__class__.__name__ for x in app.rule_manager.network.conditions.values()] - assert sorted(condition_classes) == ['EqualCondition', 'OrCondition'] + # Verify that we will now reload + assert mgr.should_reload_rules() is True class TestRuleEndToEnd: From 05242c5847e2046db597182985ff39676a5c4db0 Mon Sep 17 00:00:00 2001 From: Andrew Dunham Date: Fri, 27 May 2016 16:05:14 -0700 Subject: [PATCH 2/5] Fix indentation --- tests/test_functional.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_functional.py b/tests/test_functional.py index 34cb613..53a7ace 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -1046,7 +1046,7 @@ def test_will_load_rules_on_each_call(self, app, db): mgr = app.rule_manager now = dt.datetime.utcnow() - with mock.patch.object(mgr, 'load_rules', wraps=lambda: []) as mock_load_rules: + with mock.patch.object(mgr, 'load_rules', wraps=lambda: []) as mock_load_rules: with mock.patch.object(mgr, 'network', wraps=Network()) as mock_network: for i in range(0, 2): mgr.handle_log_entry({ From 6300a718a71fd78b66400e5aebc762d7cfab8ee1 Mon Sep 17 00:00:00 2001 From: Andrew Dunham Date: Fri, 27 May 2016 16:09:17 -0700 Subject: [PATCH 3/5] Fix hashing on Python 3 --- doorman/extensions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doorman/extensions.py b/doorman/extensions.py index 228a89a..ded2ecf 100644 --- a/doorman/extensions.py +++ b/doorman/extensions.py @@ -125,7 +125,7 @@ def make_rules_hash(self, rules): id=rule.id, updated_at=str(rule.updated_at) ) - hash.update(key) + hash.update(key.encode('utf-8')) return hash.hexdigest() def load_rules(self): From 997976a91e38880ee92b760bbc1a1c0d43cf47dd Mon Sep 17 00:00:00 2001 From: Andrew Dunham Date: Fri, 27 May 2016 18:53:11 -0700 Subject: [PATCH 4/5] Remove silly hash stuff, just store greatest updated_at --- doorman/extensions.py | 47 +++++-------------- ...f01adbe31_add_updated_at_column_to_rule.py | 3 ++ 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/doorman/extensions.py b/doorman/extensions.py index ded2ecf..4b6c130 100644 --- a/doorman/extensions.py +++ b/doorman/extensions.py @@ -1,6 +1,4 @@ # -*- coding: utf-8 -*- -import time -from hashlib import sha256 from collections import defaultdict from celery import Celery @@ -52,7 +50,7 @@ def handle_result(self, data, **kwargs): class RuleManager(object): def __init__(self, app=None): self.network = None - self.rules_hash = None + self.last_update = None if app is not None: self.init_app(app) @@ -87,46 +85,23 @@ def load_alerters(self): self.alerters[name] = klass(config) def should_reload_rules(self): - """ - Checks if we need to reload the set of rules. Essentially, since - creating a rules network might be expensive, we want to avoid reloading - for every incoming log line. We get around this by storing a hash of - the current rules, and querying if the rules in the database have - differed from this hash on every new log entry. If they have, we then - re-build our network. - """ + """ Checks if we need to reload the set of rules. """ from doorman.models import Rule - from sqlalchemy.exc import SQLAlchemyError - - if self.rules_hash is None: - return True - limited_rules = [] with self.app.app_context(): try: - limited_rules = (Rule.query - .with_entities(Rule.id, Rule.updated_at) - .order_by(Rule.id).all()) + last_update = Rule.query.order_by(Rule.updated_at.desc()).limit(1).first() except SQLAlchemyError: # Ignore DB errors when testing - if self.app.config['TESTING']: - return False - else: + if not self.app.config['TESTING']: raise - # Feed everything into a hash - rules_hash = self.make_rules_hash(limited_rules) - return rules_hash != self.rules_hash + return False - def make_rules_hash(self, rules): - hash = sha256() - for rule in rules: - key = '{id} {updated_at}'.format( - id=rule.id, - updated_at=str(rule.updated_at) - ) - hash.update(key.encode('utf-8')) - return hash.hexdigest() + if self.last_update < last_update: + return True + + return False def load_rules(self): """ Load rules from the database. """ @@ -157,11 +132,11 @@ def load_rules(self): # Create the rule. self.network.parse_query(rule.conditions, alerters=rule.alerters, rule_id=rule.id) - # Recalculate the rules hash and save it. + # Save the last updated date # Note: we do this here, and not in should_reload_rules, because it's # possible that we've reloaded a rule in between the two functions, and # thus we accidentally don't reload when we should. - self.rules_hash = self.make_rules_hash(all_rules) + self.last_update = max(all_rules, key=lambda r: r.updated_at) def handle_log_entry(self, entry, node): """ The actual entrypoint for handling input log entries. """ diff --git a/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py b/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py index cb7f54c..e531d30 100644 --- a/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py +++ b/migrations/versions/c17f01adbe31_add_updated_at_column_to_rule.py @@ -21,8 +21,11 @@ def upgrade(): sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now())) ### end Alembic commands ### + op.create_index('idx__rule__updated_at', 'rule', ['updated_at']) + def downgrade(): + op.drop_index('idx__rule__updated_at') ### commands auto generated by Alembic - please adjust! ### op.drop_column('rule', 'updated_at') ### end Alembic commands ### From 1f55db5d0eaec2969f4dc950390566c540577867 Mon Sep 17 00:00:00 2001 From: Andrew Dunham Date: Fri, 27 May 2016 19:05:09 -0700 Subject: [PATCH 5/5] Simplify rules manager a bunch Since we're already in an app context, we don't need to re-enter when loading rules or alerters. --- doorman/extensions.py | 45 ++++++++++++++-------------------------- tests/test_functional.py | 8 +++++-- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/doorman/extensions.py b/doorman/extensions.py index 4b6c130..9cbef23 100644 --- a/doorman/extensions.py +++ b/doorman/extensions.py @@ -70,35 +70,28 @@ def load_alerters(self): alerters = self.app.config.get('DOORMAN_ALERTER_PLUGINS', {}) self.alerters = {} - with self.app.app_context(): - for name, (plugin, config) in alerters.items(): - package, classname = plugin.rsplit('.', 1) - module = import_module(package) - klass = getattr(module, classname, None) + for name, (plugin, config) in alerters.items(): + package, classname = plugin.rsplit('.', 1) + module = import_module(package) + klass = getattr(module, classname, None) - if klass is None: - raise ValueError('Could not find a class named "{0}" in package "{1}"'.format(classname, package)) + if klass is None: + raise ValueError('Could not find a class named "{0}" in package "{1}"'.format(classname, package)) - if not issubclass(klass, AbstractAlerterPlugin): - raise ValueError('{0} is not a subclass of AbstractAlerterPlugin'.format(name)) + if not issubclass(klass, AbstractAlerterPlugin): + raise ValueError('{0} is not a subclass of AbstractAlerterPlugin'.format(name)) - self.alerters[name] = klass(config) + self.alerters[name] = klass(config) def should_reload_rules(self): """ Checks if we need to reload the set of rules. """ from doorman.models import Rule - with self.app.app_context(): - try: - last_update = Rule.query.order_by(Rule.updated_at.desc()).limit(1).first() - except SQLAlchemyError: - # Ignore DB errors when testing - if not self.app.config['TESTING']: - raise - - return False + if self.last_update is None: + return True - if self.last_update < last_update: + newest_rule = Rule.query.order_by(Rule.updated_at.desc()).limit(1).first() + if self.last_update < newest_rule.updated_at: return True return False @@ -112,15 +105,7 @@ def load_rules(self): if not self.should_reload_rules(): return - with self.app.app_context(): - try: - all_rules = list(Rule.query.all()) - except SQLAlchemyError: - # Ignore DB errors when testing - if self.app.config['TESTING']: - all_rules = [] - else: - raise + all_rules = list(Rule.query.all()) self.network = Network() for rule in all_rules: @@ -136,7 +121,7 @@ def load_rules(self): # Note: we do this here, and not in should_reload_rules, because it's # possible that we've reloaded a rule in between the two functions, and # thus we accidentally don't reload when we should. - self.last_update = max(all_rules, key=lambda r: r.updated_at) + self.last_update = max(r.updated_at for r in all_rules) def handle_log_entry(self, entry, node): """ The actual entrypoint for handling input log entries. """ diff --git a/tests/test_functional.py b/tests/test_functional.py index 53a7ace..58c48c3 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -1079,11 +1079,15 @@ def test_will_reload_when_changed(self, app, db): "value": "dummy-query", } + now = dt.datetime.utcnow() + next = now + dt.timedelta(minutes=5) + # Insert a first rule. rule = Rule( name='foo', alerters=[], - conditions={'condition': 'AND', 'rules': [dummy_rule]} + conditions={'condition': 'AND', 'rules': [dummy_rule]}, + updated_at=now ) db.session.add(rule) db.session.commit() @@ -1100,7 +1104,7 @@ def test_will_reload_when_changed(self, app, db): # Make a change to a rule. rule.update( conditions={'condition': 'OR', 'rules': [dummy_rule]}, - updated_at=dt.datetime.utcnow()) + updated_at=next) db.session.add(rule) db.session.commit()