From f30bf73c39e9402c58e60660f237e7c6629b0f88 Mon Sep 17 00:00:00 2001 From: Kevin Meinhardt Date: Thu, 7 Nov 2024 12:18:21 +0100 Subject: [PATCH] Upload multiple filters --- src/olympia/blocklist/cron.py | 99 +++++----- .../management/commands/export_blocklist.py | 9 +- src/olympia/blocklist/mlbf.py | 44 +++-- src/olympia/blocklist/tasks.py | 86 +++++--- src/olympia/blocklist/tests/test_commands.py | 10 +- src/olympia/blocklist/tests/test_cron.py | 111 +++++++---- src/olympia/blocklist/tests/test_mlbf.py | 46 ++++- src/olympia/blocklist/tests/test_tasks.py | 184 ++++++++++++------ src/olympia/constants/blocklist.py | 9 +- src/olympia/lib/remote_settings.py | 8 + src/olympia/lib/tests/test_remote_settings.py | 13 ++ 11 files changed, 430 insertions(+), 189 deletions(-) diff --git a/src/olympia/blocklist/cron.py b/src/olympia/blocklist/cron.py index 29399e83c68f..8452642abab6 100644 --- a/src/olympia/blocklist/cron.py +++ b/src/olympia/blocklist/cron.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import List import waffle from django_statsd.clients import statsd @@ -13,7 +14,7 @@ from .mlbf import MLBF from .models import Block, BlocklistSubmission, BlockType -from .tasks import cleanup_old_files, process_blocklistsubmission, upload_filter +from .tasks import process_blocklistsubmission, upload_filter from .utils import datetime_to_ts @@ -28,8 +29,8 @@ def get_last_generation_time(): return get_config(MLBF_TIME_CONFIG_KEY, None, json_value=True) -def get_base_generation_time(): - return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True) +def get_base_generation_time(block_type: BlockType): + return get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), None, json_value=True) def get_blocklist_last_modified_time(): @@ -68,72 +69,78 @@ def _upload_mlbf_to_remote_settings(*, force_base=False): # This timestamp represents the last time the MLBF was generated and uploaded. # It could have been a base filter or a stash. last_generation_time = get_last_generation_time() - # This timestamp represents the point in time when - # the base filter was generated and uploaded. - base_generation_time = get_base_generation_time() - mlbf = MLBF.generate_from_db(generation_time) - - base_filter = ( - MLBF.load_from_storage(base_generation_time) - if base_generation_time is not None - else None - ) - previous_filter = ( - # Only load previoous filter if there is a timestamp to use - # and that timestamp is not the same as the base_filter MLBF.load_from_storage(last_generation_time) if last_generation_time is not None - and (base_filter is None or base_filter.created_at != last_generation_time) - else base_filter + else None ) - changes_count = mlbf.blocks_changed_since_previous( - BlockType.BLOCKED, previous_filter - ) - statsd.incr( - 'blocklist.cron.upload_mlbf_to_remote_settings.blocked_changed', changes_count + base_filters_to_update: List[BlockType] = [] + update_stash = False + + # Determine which base filters need to be re uploaded + # and whether the stash needs to be updated + for block_type in BlockType: + base_generation_time = get_base_generation_time(block_type) + base_filter = ( + MLBF.load_from_storage(base_generation_time) + if base_generation_time is not None + else None + ) + + update_filter = ( + force_base + or base_filter is None + or mlbf.blocks_changed_since_previous(block_type, base_filter) + > BASE_REPLACE_THRESHOLD + ) + + if update_filter: + base_filters_to_update.append(block_type) + elif ( + mlbf.blocks_changed_since_previous( + block_type, previous_filter or base_filter + ) + > 0 + ): + update_stash = True + + previous_filter_is_stale = ( + previous_filter is not None + and previous_filter.created_at < get_blocklist_last_modified_time() ) + need_update = ( - force_base - or base_filter is None - or ( - previous_filter is not None - and previous_filter.created_at < get_blocklist_last_modified_time() - ) - or changes_count > 0 + len(base_filters_to_update) > 0 or update_stash or previous_filter_is_stale ) if not need_update: log.info('No new/modified/deleted Blocks in database; skipping MLBF generation') - return + return mlbf.delete() statsd.incr( 'blocklist.cron.upload_mlbf_to_remote_settings.blocked_count', len(mlbf.data.blocked_items), ) + statsd.incr( + 'blocklist.cron.upload_mlbf_to_remote_settings.soft_blocked_count', + len(mlbf.data.soft_blocked_items), + ) statsd.incr( 'blocklist.cron.upload_mlbf_to_remote_settings.not_blocked_count', len(mlbf.data.not_blocked_items), ) - make_base_filter = ( - force_base - or base_filter is None - or previous_filter is None - or mlbf.blocks_changed_since_previous(BlockType.BLOCKED, base_filter) - > BASE_REPLACE_THRESHOLD - ) + mlbf.generate_and_write_stash(previous_filter) if update_stash else None - if make_base_filter: - mlbf.generate_and_write_filter() - else: - mlbf.generate_and_write_stash(previous_filter) + for block_type in base_filters_to_update: + mlbf.generate_and_write_filter(block_type) - upload_filter.delay(generation_time, is_base=make_base_filter) - - if base_filter: - cleanup_old_files.delay(base_filter_id=base_filter.created_at) + upload_filter.delay( + generation_time, + filters_to_update=[key.name for key in base_filters_to_update], + update_stash=update_stash, + ) def process_blocklistsubmissions(): diff --git a/src/olympia/blocklist/management/commands/export_blocklist.py b/src/olympia/blocklist/management/commands/export_blocklist.py index 1846ac92246f..85402295034e 100644 --- a/src/olympia/blocklist/management/commands/export_blocklist.py +++ b/src/olympia/blocklist/management/commands/export_blocklist.py @@ -4,6 +4,7 @@ import olympia.core.logger from olympia.blocklist.mlbf import MLBF +from olympia.blocklist.models import BlockType log = olympia.core.logger.getLogger('z.amo.blocklist') @@ -29,6 +30,11 @@ def add_arguments(self, parser): 'the database', default=None, ) + parser.add_argument( + '--block-type', + help='Block type to export', + default=None, + ) def load_json(self, json_path): with open(json_path) as json_file: @@ -38,6 +44,7 @@ def load_json(self, json_path): def handle(self, *args, **options): log.debug('Exporting blocklist to file') mlbf = MLBF.generate_from_db(options.get('id')) + block_type = BlockType[options.get('block_type')] if options.get('block_guids_input'): mlbf.blocked_items = list( @@ -52,4 +59,4 @@ def handle(self, *args, **options): ) ) - mlbf.generate_and_write_filter() + mlbf.generate_and_write_filter(block_type) diff --git a/src/olympia/blocklist/mlbf.py b/src/olympia/blocklist/mlbf.py index b5e7bbbfad40..e89b3253daca 100644 --- a/src/olympia/blocklist/mlbf.py +++ b/src/olympia/blocklist/mlbf.py @@ -76,18 +76,15 @@ class BaseMLBFLoader: def __init__(self, storage: SafeStorage): self.storage = storage - def data_type_key(self, key: MLBFDataType) -> str: - return key.name.lower() - @cached_property def _raw(self): """ raw serializable data for the given MLBFLoader. """ - return {self.data_type_key(key): self[key] for key in MLBFDataType} + return {MLBF.data_type_key(key): self[key] for key in MLBFDataType} def __getitem__(self, key: MLBFDataType) -> List[str]: - return getattr(self, f'{self.data_type_key(key)}_items') + return getattr(self, f'{MLBF.data_type_key(key)}_items') @cached_property def _cache_path(self): @@ -114,15 +111,15 @@ def __init__(self, storage: SafeStorage): @cached_property def blocked_items(self) -> List[str]: - return self._data.get(self.data_type_key(MLBFDataType.BLOCKED)) + return self._data.get(MLBF.data_type_key(MLBFDataType.BLOCKED)) @cached_property def soft_blocked_items(self) -> List[str]: - return self._data.get(self.data_type_key(MLBFDataType.SOFT_BLOCKED)) + return self._data.get(MLBF.data_type_key(MLBFDataType.SOFT_BLOCKED)) @cached_property def not_blocked_items(self) -> List[str]: - return self._data.get(self.data_type_key(MLBFDataType.NOT_BLOCKED)) + return self._data.get(MLBF.data_type_key(MLBFDataType.NOT_BLOCKED)) class MLBFDataBaseLoader(BaseMLBFLoader): @@ -197,6 +194,10 @@ def __init__( ) self.data: BaseMLBFLoader = data_class(storage=self.storage) + @classmethod + def data_type_key(cls, key: MLBFDataType) -> str: + return key.name.lower() + @classmethod def hash_filter_inputs(cls, input_list): """Returns a list""" @@ -205,25 +206,36 @@ def hash_filter_inputs(cls, input_list): for (guid, version) in input_list ] - @property - def filter_path(self): - return self.storage.path('filter') + def filter_path(self, block_type: BlockType): + return self.storage.path(f'filter-{self.data_type_key(block_type)}') @property def stash_path(self): return self.storage.path('stash.json') - def generate_and_write_filter(self): + def delete(self): + if self.storage.exists(self.storage.base_location): + self.storage.rm_stored_dir(self.storage.base_location) + + def generate_and_write_filter(self, block_type: BlockType): stats = {} + blocked = self.data[block_type] + + not_blocked = [ + self.data[_block_type] + for _block_type in BlockType + if _block_type != block_type + ] + bloomfilter = generate_mlbf( stats=stats, - blocked=self.data.blocked_items, - not_blocked=self.data.not_blocked_items, + blocked=blocked, + not_blocked=not_blocked, ) # write bloomfilter - mlbf_path = self.filter_path + mlbf_path = self.filter_path(block_type) with self.storage.open(mlbf_path, 'wb') as filter_file: log.info(f'Writing to file {mlbf_path}') bloomfilter.tofile(filter_file) @@ -231,6 +243,8 @@ def generate_and_write_filter(self): log.info(json.dumps(stats)) + return bloomfilter + def generate_diffs( self, previous_mlbf: 'MLBF' = None ) -> Dict[BlockType, Tuple[List[str], List[str], int]]: diff --git a/src/olympia/blocklist/tasks.py b/src/olympia/blocklist/tasks.py index 713f3f2332d6..681f7bcc88e7 100644 --- a/src/olympia/blocklist/tasks.py +++ b/src/olympia/blocklist/tasks.py @@ -2,6 +2,7 @@ import os import re from datetime import datetime, timedelta +from typing import List from django.conf import settings from django.contrib.admin.models import CHANGE, LogEntry @@ -21,10 +22,10 @@ REMOTE_SETTINGS_COLLECTION_MLBF, ) from olympia.lib.remote_settings import RemoteSettings -from olympia.zadmin.models import set_config +from olympia.zadmin.models import get_config, set_config from .mlbf import MLBF -from .models import BlocklistSubmission +from .models import BlocklistSubmission, BlockType from .utils import ( datetime_to_ts, ) @@ -88,28 +89,17 @@ def monitor_remote_settings(): @task -def upload_filter(generation_time, is_base=True): +def upload_filter(generation_time, filter_list=None, upload_stash=False): + filter_list: List[BlockType] = ( + [] if filter_list is None else [BlockType[filter] for filter in filter_list] + ) bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET server = RemoteSettings( bucket, REMOTE_SETTINGS_COLLECTION_MLBF, sign_off_needed=False ) mlbf = MLBF.load_from_storage(generation_time, error_on_missing=True) - if is_base: - # clear the collection for the base - we want to be the only filter - server.delete_all_records() - statsd.incr('blocklist.tasks.upload_filter.reset_collection') - # Then the bloomfilter - data = { - 'key_format': MLBF.KEY_FORMAT, - 'generation_time': generation_time, - 'attachment_type': BLOCKLIST_RECORD_MLBF_BASE, - } - with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file: - attachment = ('filter.bin', filter_file, 'application/octet-stream') - server.publish_attachment(data, attachment) - statsd.incr('blocklist.tasks.upload_filter.upload_mlbf') - statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base') - else: + + if upload_stash: with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file: stash_data = json.load(stash_file) # If we have a stash, write that @@ -121,14 +111,64 @@ def upload_filter(generation_time, is_base=True): server.publish_record(stash_upload_data) statsd.incr('blocklist.tasks.upload_filter.upload_stash') + oldest_base_filter_id = None + + if len(filter_list) > 0: + for block_type in filter_list: + data = { + 'key_format': MLBF.KEY_FORMAT, + 'generation_time': generation_time, + 'attachment_type': BLOCKLIST_RECORD_MLBF_BASE, + } + with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file: + file_name = os.path.basename(filter_file.name) + attachment = ( + f'{file_name}.bin', + filter_file, + 'application/octet-stream', + ) + server.publish_attachment(data, attachment) + base_filter_id = get_config( + MLBF_BASE_ID_CONFIG_KEY(block_type), json_value=True + ) + + if base_filter_id is not None: + if oldest_base_filter_id is None: + oldest_base_filter_id = base_filter_id + else: + oldest_base_filter_id = min(oldest_base_filter_id, base_filter_id) + + statsd.incr('blocklist.tasks.upload_filter.upload_mlbf') + statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base') + + if oldest_base_filter_id is not None: + for record in server.records(): + record_time = ( + record['stash_time'] + if 'stash_time' in record + else record['generation_time'] + ) + if record_time < oldest_base_filter_id: + server.delete_record(record['id']) + + cleanup_old_files.delay(oldest_base_filter_id) + statsd.incr('blocklist.tasks.upload_filter.reset_collection') + server.complete_session() set_config(MLBF_TIME_CONFIG_KEY, generation_time, json_value=True) - if is_base: - set_config(MLBF_BASE_ID_CONFIG_KEY, generation_time, json_value=True) + + for block_type in filter_list: + set_config( + MLBF_BASE_ID_CONFIG_KEY(block_type), generation_time, json_value=True + ) @task -def cleanup_old_files(*, base_filter_id): +def cleanup_old_files(base_filter_id=None): + if base_filter_id is None: + log.info('No base filter id provided, skipping cleanup') + return + log.info('Starting clean up of old MLBF folders...') six_months_ago = datetime_to_ts(datetime.now() - timedelta(weeks=26)) base_filter_ts = int(base_filter_id) @@ -141,7 +181,7 @@ def cleanup_old_files(*, base_filter_id): continue dir_ts = int(dir) dir_as_date = datetime.fromtimestamp(dir_ts / 1000) - # delete if >6 months old and 6 months old and > base_filter_id if dir_ts > six_months_ago: log.info('Skipping %s because < 6 months old (%s)', dir, dir_as_date) elif dir_ts > base_filter_ts: diff --git a/src/olympia/blocklist/tests/test_commands.py b/src/olympia/blocklist/tests/test_commands.py index 3df8df064b6a..c33d9fc44482 100644 --- a/src/olympia/blocklist/tests/test_commands.py +++ b/src/olympia/blocklist/tests/test_commands.py @@ -9,6 +9,7 @@ version_factory, ) from olympia.blocklist.mlbf import MLBF +from olympia.blocklist.models import BlockType class TestExportBlocklist(TestCase): @@ -36,6 +37,11 @@ def test_command(self): updated_by=user, ) - call_command('export_blocklist', '1') + call_command('export_blocklist', '1', '--block-type', BlockType.BLOCKED.name) mlbf = MLBF.load_from_storage(1) - assert mlbf.storage.exists(mlbf.filter_path) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + call_command( + 'export_blocklist', '1', '--block-type', BlockType.SOFT_BLOCKED.name + ) + mlbf = MLBF.load_from_storage(1) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.SOFT_BLOCKED)) diff --git a/src/olympia/blocklist/tests/test_cron.py b/src/olympia/blocklist/tests/test_cron.py index 63e0e7caf7f2..6570cb62220a 100644 --- a/src/olympia/blocklist/tests/test_cron.py +++ b/src/olympia/blocklist/tests/test_cron.py @@ -45,7 +45,6 @@ def setUp(self): self.mocks: dict[str, mock.Mock] = {} for mock_name in ( 'olympia.blocklist.cron.statsd.incr', - 'olympia.blocklist.cron.cleanup_old_files.delay', 'olympia.blocklist.cron.upload_filter.delay', 'olympia.blocklist.cron.get_generation_time', 'olympia.blocklist.cron.get_last_generation_time', @@ -98,9 +97,10 @@ def test_skip_update_unless_force_base(self): assert self.mocks['olympia.blocklist.cron.upload_filter.delay'].called - # Check that a filter was created on the second attempt + # Check that both filters were created on the second attempt mlbf = MLBF.load_from_storage(self.current_time) - assert mlbf.storage.exists(mlbf.filter_path) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.SOFT_BLOCKED)) assert not mlbf.storage.exists(mlbf.stash_path) def test_skip_update_unless_no_base_mlbf(self): @@ -123,10 +123,13 @@ def test_missing_last_filter_uses_base_filter(self): When there is a base filter and no last filter, fallback to using the base filter """ - self._block_version(is_signed=True) - # Re-created the last filter created after the new block + block_version = self._block_version(is_signed=True) + + # Re-create the last filter so we ensure + # the block is already processed comparing to previous MLBF.generate_from_db(self.last_time) + assert datetime_to_ts(block_version.modified) < self.last_time # We skip the update at this point because the new last filter already # accounted for the new block. upload_mlbf_to_remote_settings(force_base=False) @@ -138,13 +141,13 @@ def test_missing_last_filter_uses_base_filter(self): 'olympia.blocklist.cron.get_last_generation_time' ].return_value = None upload_mlbf_to_remote_settings(force_base=False) - assert self.mocks['olympia.blocklist.cron.upload_filter.delay'].called assert ( mock.call( - 'blocklist.cron.upload_mlbf_to_remote_settings.blocked_changed', 1 + self.current_time, + filters_to_update=[], + update_stash=True, ) - in self.mocks['olympia.blocklist.cron.statsd.incr'].call_args_list - ) + ) in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list def test_skip_update_unless_recent_modified_blocks(self): """ @@ -212,20 +215,38 @@ def test_upload_stash_unless_force_base(self): We expect to upload a stash, unless the force_base is true, in which case we upload a new filter. """ - force_base = False self._block_version(is_signed=True) - upload_mlbf_to_remote_settings(force_base=force_base) + upload_mlbf_to_remote_settings(force_base=False) assert self.mocks[ 'olympia.blocklist.cron.upload_filter.delay' ].call_args_list == [ mock.call( self.current_time, - is_base=force_base, + filters_to_update=[], + update_stash=True, ) ] + assert ( + mock.call( + self.current_time, + filters_to_update=[], + update_stash=True, + ) + ) in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list + mlbf = MLBF.load_from_storage(self.current_time) - assert mlbf.storage.exists(mlbf.filter_path) == force_base - assert mlbf.storage.exists(mlbf.stash_path) != force_base + assert not mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + assert mlbf.storage.exists(mlbf.stash_path) + + upload_mlbf_to_remote_settings(force_base=True) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + assert ( + mock.call( + self.current_time, + filters_to_update=[BlockType.BLOCKED.name, BlockType.SOFT_BLOCKED.name], + update_stash=False, + ) + ) in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list def test_upload_stash_unless_missing_base_filter(self): """ @@ -238,11 +259,13 @@ def test_upload_stash_unless_missing_base_filter(self): ].call_args_list == [ mock.call( self.current_time, - is_base=False, + filters_to_update=[], + update_stash=True, ) ] mlbf = MLBF.load_from_storage(self.current_time) - assert not mlbf.storage.exists(mlbf.filter_path) + assert not mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + assert not mlbf.storage.exists(mlbf.filter_path(BlockType.SOFT_BLOCKED)) assert mlbf.storage.exists(mlbf.stash_path) self.mocks[ @@ -252,11 +275,13 @@ def test_upload_stash_unless_missing_base_filter(self): assert ( mock.call( self.current_time, - is_base=True, + filters_to_update=[BlockType.BLOCKED.name, BlockType.SOFT_BLOCKED.name], + update_stash=False, ) in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list ) - assert mlbf.storage.exists(mlbf.filter_path) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.BLOCKED)) + assert mlbf.storage.exists(mlbf.filter_path(BlockType.SOFT_BLOCKED)) @mock.patch('olympia.blocklist.cron.BASE_REPLACE_THRESHOLD', 1) def test_upload_stash_unless_enough_changes(self): @@ -271,7 +296,8 @@ def test_upload_stash_unless_enough_changes(self): ].call_args_list == [ mock.call( self.current_time, - is_base=False, + filters_to_update=[], + update_stash=True, ) ] mlbf = MLBF.load_from_storage(self.current_time) @@ -288,30 +314,42 @@ def test_upload_stash_unless_enough_changes(self): assert ( mock.call( self.current_time, - is_base=True, + filters_to_update=[BlockType.BLOCKED.name], + update_stash=False, ) in self.mocks['olympia.blocklist.cron.upload_filter.delay'].call_args_list ) new_mlbf = MLBF.load_from_storage(self.current_time) - assert new_mlbf.storage.exists(new_mlbf.filter_path) + assert new_mlbf.storage.exists(new_mlbf.filter_path(BlockType.BLOCKED)) assert not new_mlbf.storage.exists(new_mlbf.stash_path) - def test_cleanup_old_files(self): + @mock.patch('olympia.blocklist.cron.BASE_REPLACE_THRESHOLD', 1) + def test_upload_stash_even_if_filter_is_updated(self): """ - Cleanup old files only if a base filter already exists. + If enough changes of one type are made, update the filter, but still upload + a stash if there are changes of other types. """ - upload_mlbf_to_remote_settings(force_base=True) + self._block_version(is_signed=True, block_type=BlockType.BLOCKED) + self._block_version(is_signed=True, block_type=BlockType.BLOCKED) + self._block_version(is_signed=True, block_type=BlockType.SOFT_BLOCKED) + upload_mlbf_to_remote_settings() assert self.mocks[ - 'olympia.blocklist.cron.cleanup_old_files.delay' - ].call_args_list == [mock.call(base_filter_id=self.base_time)] + 'olympia.blocklist.cron.upload_filter.delay' + ].call_args_list == [ + mock.call( + self.current_time, + filters_to_update=[BlockType.BLOCKED.name], + update_stash=True, + ) + ] - self.mocks[ - 'olympia.blocklist.cron.get_base_generation_time' - ].return_value = None - upload_mlbf_to_remote_settings(force_base=True) - assert ( - self.mocks['olympia.blocklist.cron.cleanup_old_files.delay'].call_count == 1 - ) + def test_remove_storage_if_no_update(self): + """ + If there is no update, remove the storage used by the current mlbf. + """ + upload_mlbf_to_remote_settings(force_base=False) + assert not self.mocks['olympia.blocklist.cron.upload_filter.delay'].called + assert MLBF.load_from_storage(self.current_time) is None def test_creates_base_filter_if_base_generation_time_invalid(self): """ @@ -355,9 +393,10 @@ def test_get_last_generation_time(self): assert get_last_generation_time() == 1 def test_get_base_generation_time(self): - assert get_base_generation_time() is None - set_config(MLBF_BASE_ID_CONFIG_KEY, 1) - assert get_base_generation_time() == 1 + for block_type in BlockType: + assert get_base_generation_time(block_type) is None + set_config(MLBF_BASE_ID_CONFIG_KEY(block_type), 1) + assert get_base_generation_time(block_type) == 1 @pytest.mark.django_db diff --git a/src/olympia/blocklist/tests/test_mlbf.py b/src/olympia/blocklist/tests/test_mlbf.py index 03dce0723117..ee0b64696bc3 100644 --- a/src/olympia/blocklist/tests/test_mlbf.py +++ b/src/olympia/blocklist/tests/test_mlbf.py @@ -104,7 +104,7 @@ def test_cache_raw_data(self): loader = self.TestStaticLoader(self.storage) for data_type in MLBFDataType: - assert loader[data_type] == loader._raw[loader.data_type_key(data_type)] + assert loader[data_type] == loader._raw[MLBF.data_type_key(data_type)] # The source of truth should ultimately be the named cached properties # Even though _raw is cached, it should still return @@ -467,6 +467,34 @@ def test_generate_stash_returns_expected_stash(self): ), } + def test_generate_filter_returns_expected_data(self): + addon, block = self._blocked_addon() + not_blocked = self._version(addon) + not_blocked_version = not_blocked.version + hard_blocked = self._block_version( + block, self._version(addon), block_type=BlockType.BLOCKED + ) + hard_blocked_version = hard_blocked.version.version + soft_blocked = self._block_version( + block, self._version(addon), block_type=BlockType.SOFT_BLOCKED + ) + soft_blocked_version = soft_blocked.version.version + mlbf = MLBF.generate_from_db('test') + + mlbf.generate_and_write_filter(BlockType.BLOCKED).verify( + include=MLBF.hash_filter_inputs([(addon.guid, hard_blocked_version)]), + exclude=MLBF.hash_filter_inputs( + [(addon.guid, soft_blocked_version), (addon.guid, not_blocked_version)] + ), + ) + + mlbf.generate_and_write_filter(BlockType.SOFT_BLOCKED).verify( + include=MLBF.hash_filter_inputs([(addon.guid, soft_blocked_version)]), + exclude=MLBF.hash_filter_inputs( + [(addon.guid, hard_blocked_version), (addon.guid, not_blocked_version)] + ), + ) + def test_changed_count_returns_expected_count(self): addon, block = self._blocked_addon() self._block_version(block, self._version(addon), block_type=BlockType.BLOCKED) @@ -520,16 +548,19 @@ def test_changed_count_returns_expected_count(self): == 1 ) + def _test_not_raises_if_versions_blocked(self, block_type: BlockType): + mlbf = MLBF.generate_from_db('test') + self._blocked_addon(file_kw={'is_signed': True}, block_type=block_type) + assert mlbf.data[block_type] == [] + mlbf.generate_and_write_filter(block_type) + def test_generate_filter_not_raises_if_all_versions_unblocked(self): """ When we create a bloom filter where all versions fall into the "not filtered" category This can create invalid error rates because the error rate depends on these numbers being non-zero. """ - mlbf = MLBF.generate_from_db('test') - self._blocked_addon(file_kw={'is_signed': True}) - assert mlbf.data.blocked_items == [] - mlbf.generate_and_write_filter() + self._test_not_raises_if_versions_blocked(BlockType.BLOCKED) def test_generate_filter_not_raises_if_all_versions_blocked(self): """ @@ -537,10 +568,7 @@ def test_generate_filter_not_raises_if_all_versions_blocked(self): the "not filtered" category This can create invalid error rates because the error rate depends on these numbers being non-zero. """ - mlbf = MLBF.generate_from_db('test') - self._blocked_addon(file_kw={'is_signed': False}) - assert mlbf.data.not_blocked_items == [] - mlbf.generate_and_write_filter() + self._test_not_raises_if_versions_blocked(BlockType.SOFT_BLOCKED) def test_duplicate_guid_is_blocked(self): """ diff --git a/src/olympia/blocklist/tests/test_tasks.py b/src/olympia/blocklist/tests/test_tasks.py index 40e924621a8e..048455c38759 100644 --- a/src/olympia/blocklist/tests/test_tasks.py +++ b/src/olympia/blocklist/tests/test_tasks.py @@ -1,6 +1,7 @@ import json import os from datetime import datetime, timedelta +from typing import Dict, List from unittest import TestCase, mock from django.conf import settings @@ -18,6 +19,7 @@ ) from olympia.blocklist.mlbf import MLBF from olympia.constants.blocklist import MLBF_BASE_ID_CONFIG_KEY, MLBF_TIME_CONFIG_KEY +from olympia.zadmin.models import set_config from ..models import BlocklistSubmission, BlockType, BlockVersion from ..tasks import ( @@ -104,12 +106,14 @@ def setUp(self): prefix = 'olympia.blocklist.tasks.' self.mocks = { - 'delete_all_records': f'{prefix}RemoteSettings.delete_all_records', + 'records': f'{prefix}RemoteSettings.records', + 'delete_record': f'{prefix}RemoteSettings.delete_record', 'publish_attachment': f'{prefix}RemoteSettings.publish_attachment', 'publish_record': f'{prefix}RemoteSettings.publish_record', 'complete_session': f'{prefix}RemoteSettings.complete_session', 'set_config': f'{prefix}set_config', 'statsd.incr': f'{prefix}statsd.incr', + 'cleanup_old_files.delay': f'{prefix}cleanup_old_files.delay', } for mock_name, mock_path in self.mocks.items(): patcher = mock.patch(mock_path) @@ -129,43 +133,124 @@ def _block_version( block=block, version=version, block_type=block_type ) - def test_upload_base_filter(self): + def _test_upload_base_filter(self, *block_types: BlockType): self._block_version(is_signed=True) mlbf = MLBF.generate_from_db(self.generation_time) - mlbf.generate_and_write_filter() + for block_type in block_types: + mlbf.generate_and_write_filter(block_type) - upload_filter.delay(self.generation_time, is_base=True) + upload_filter.delay( + self.generation_time, + filter_list=[block_type.name for block_type in block_types], + ) - assert self.mocks['delete_all_records'].called - with mlbf.storage.open(mlbf.filter_path, 'rb') as filter_file: - actual_data, actual_attchment = self.mocks[ - 'publish_attachment' - ].call_args_list[0][0] + assert not self.mocks['delete_record'].called + actual_files = [ + mock[0][1][1].name + for mock in self.mocks['publish_attachment'].call_args_list + ] - assert actual_data == { - 'key_format': MLBF.KEY_FORMAT, - 'generation_time': self.generation_time, - 'attachment_type': BLOCKLIST_RECORD_MLBF_BASE, - } - name, file, content_type = actual_attchment - assert name == 'filter.bin' - assert file.name == filter_file.name - assert content_type == 'application/octet-stream' - - assert all( - call in self.mocks['statsd.incr'].call_args_list - for call in [ - mock.call('blocklist.tasks.upload_filter.reset_collection'), - mock.call('blocklist.tasks.upload_filter.upload_mlbf.base'), - mock.call('blocklist.tasks.upload_filter.upload_mlbf'), - ] - ) + for block_type in block_types: + with mlbf.storage.open(mlbf.filter_path(block_type), 'rb') as filter_file: + assert filter_file.name in actual_files + expected_call = mock.call( + { + 'key_format': MLBF.KEY_FORMAT, + 'generation_time': self.generation_time, + 'attachment_type': BLOCKLIST_RECORD_MLBF_BASE, + }, + ( + f'filter-{MLBF.data_type_key(block_type)}.bin', + mock.ANY, + 'application/octet-stream', + ), + ) + assert expected_call in self.mocks['publish_attachment'].call_args_list assert self.mocks['complete_session'].called - assert self.mocks['set_config'].call_args_list == [ - mock.call(MLBF_TIME_CONFIG_KEY, self.generation_time, json_value=True), - mock.call(MLBF_BASE_ID_CONFIG_KEY, self.generation_time, json_value=True), - ] + assert ( + mock.call(MLBF_TIME_CONFIG_KEY, self.generation_time, json_value=True) + ) in self.mocks['set_config'].call_args_list + + for block_type in block_types: + assert ( + mock.call( + MLBF_BASE_ID_CONFIG_KEY(block_type), + self.generation_time, + json_value=True, + ) + ) in self.mocks['set_config'].call_args_list + + def test_upload_blocked_filter(self): + self._test_upload_base_filter(BlockType.BLOCKED) + + def test_upload_soft_blocked_filter(self): + self._test_upload_base_filter(BlockType.SOFT_BLOCKED) + + def test_upload_soft_and_blocked_filter(self): + self._test_upload_base_filter(BlockType.BLOCKED, BlockType.SOFT_BLOCKED) + + def _test_cleanup_old_records( + self, + filter_list: Dict[BlockType, int], + records: List[Dict[str, int]], + expected_calls: List[any], + ): + self._block_version(is_signed=True) + mlbf = MLBF.generate_from_db(self.generation_time) + for block_type, base_id in filter_list.items(): + mlbf.generate_and_write_filter(block_type) + set_config(MLBF_BASE_ID_CONFIG_KEY(block_type), base_id, json_value=True) + + self.mocks['records'].return_value = records + upload_filter.delay( + self.generation_time, + filter_list=[block_type.name for block_type in filter_list], + ) + + assert self.mocks['delete_record'].call_args_list == expected_calls + + if len(filter_list.values()) > 0: + self.mocks['cleanup_old_files.delay'].assert_called_with( + min(filter_list.values()) + ) + self.mocks['statsd.incr'].assert_called_with( + 'blocklist.tasks.upload_filter.reset_collection' + ) + + def test_skip_cleanup_when_no_filters(self): + self._test_cleanup_old_records( + filter_list={}, + records=[{'id': '0', 'generation_time': self.generation_time}], + expected_calls=[], + ) + + def test_cleanup_old_records(self): + self._test_cleanup_old_records( + filter_list={ + BlockType.BLOCKED: self.generation_time, + }, + records=[ + {'id': '0', 'generation_time': self.generation_time - 1}, + {'id': '1', 'generation_time': self.generation_time}, + {'id': '2', 'generation_time': self.generation_time + 1}, + ], + expected_calls=[mock.call('0')], + ) + + def test_cleanup_oldest_records(self): + self._test_cleanup_old_records( + filter_list={ + BlockType.BLOCKED: self.generation_time + 2, + BlockType.SOFT_BLOCKED: self.generation_time + 1, + }, + records=[ + {'id': '0', 'generation_time': self.generation_time - 1}, + {'id': '1', 'generation_time': self.generation_time}, + {'id': '2', 'generation_time': self.generation_time + 1}, + ], + expected_calls=[mock.call('0'), mock.call('1')], + ) def test_upload_stashed_filter(self): old_mlbf = MLBF.generate_from_db(self.generation_time - 1) @@ -173,9 +258,9 @@ def test_upload_stashed_filter(self): mlbf = MLBF.generate_from_db(self.generation_time) mlbf.generate_and_write_stash(old_mlbf) - upload_filter.delay(self.generation_time, is_base=False) + upload_filter.delay(self.generation_time, upload_stash=True) - assert not self.mocks['delete_all_records'].called + assert not self.mocks['delete_record'].called with mlbf.storage.open(mlbf.stash_path, 'rb') as stash_file: actual_stash = self.mocks['publish_record'].call_args_list[0][0][0] stash_data = json.load(stash_file) @@ -205,31 +290,18 @@ def test_upload_stashed_filter(self): def test_raises_when_no_filter_exists(self): with self.assertRaises(FileNotFoundError): - upload_filter.delay(self.generation_time) + upload_filter.delay( + self.generation_time, filter_list=[BlockType.BLOCKED.name] + ) def test_raises_when_no_stash_exists(self): with self.assertRaises(FileNotFoundError): - upload_filter.delay(self.generation_time) - - def test_default_is_base_is_true(self): - MLBF.generate_from_db(self.generation_time).generate_and_write_filter() - upload_filter.delay(self.generation_time) - assert self.mocks['delete_all_records'].called - - def test_raises_missing_stash(self): - mlbf = MLBF.generate_from_db(self.generation_time) - mlbf.generate_and_write_filter() - - with self.assertRaises(FileNotFoundError): - upload_filter.delay(self.generation_time, is_base=False) + upload_filter.delay(self.generation_time, upload_stash=True) + def test_default_is_no_op(self): + MLBF.generate_from_db(self.generation_time).generate_and_write_filter( + BlockType.BLOCKED + ) upload_filter.delay(self.generation_time) - - def test_raises_missing_filter(self): - mlbf = MLBF.generate_from_db(self.generation_time) - mlbf.generate_and_write_stash(mlbf) - - with self.assertRaises(FileNotFoundError): - upload_filter.delay(self.generation_time, is_base=True) - - upload_filter.delay(self.generation_time, is_base=False) + assert not self.mocks['delete_record'].called + assert not self.mocks['publish_record'].called diff --git a/src/olympia/constants/blocklist.py b/src/olympia/constants/blocklist.py index 8cda2e5c9b11..779004bf03da 100644 --- a/src/olympia/constants/blocklist.py +++ b/src/olympia/constants/blocklist.py @@ -1,8 +1,15 @@ # How many guids should there be in the stashes before we make a new base. +from olympia.blocklist.models import BlockType + + BASE_REPLACE_THRESHOLD = 5_000 # Config keys used to track recent mlbf ids MLBF_TIME_CONFIG_KEY = 'blocklist_mlbf_generation_time' -MLBF_BASE_ID_CONFIG_KEY = 'blocklist_mlbf_base_id' + + +def MLBF_BASE_ID_CONFIG_KEY(block_type: BlockType): + return f'blocklist_mlbf_base_id_{block_type.name.lower()}' + REMOTE_SETTINGS_COLLECTION_MLBF = 'addons-bloomfilters' diff --git a/src/olympia/lib/remote_settings.py b/src/olympia/lib/remote_settings.py index fdb08a6694dd..4778baa4be20 100644 --- a/src/olympia/lib/remote_settings.py +++ b/src/olympia/lib/remote_settings.py @@ -105,6 +105,14 @@ def publish_attachment(self, data, attachment, legacy_id=None): self._changes = True return response.json().get('data', {}) + def records(self): + url = ( + f'{settings.REMOTE_SETTINGS_WRITER_URL}buckets/{self.bucket}/' + f'collections/{self.collection}/records' + ) + response = requests.get(url, headers=self.headers) + return response.json().get('data', []) + def delete_record(self, legacy_id): url = ( f'{settings.REMOTE_SETTINGS_WRITER_URL}buckets/{self.bucket}/' diff --git a/src/olympia/lib/tests/test_remote_settings.py b/src/olympia/lib/tests/test_remote_settings.py index 45012f38112d..9b677ba319a3 100644 --- a/src/olympia/lib/tests/test_remote_settings.py +++ b/src/olympia/lib/tests/test_remote_settings.py @@ -21,6 +21,19 @@ def test_bucket_not_altered(self): server = RemoteSettings('foo', 'baa') assert server.bucket == 'foo' + def test_records(self): + server = RemoteSettings('foo', 'baa') + + for data in [{'id': 'an-id'}], []: + responses.add( + responses.GET, + settings.REMOTE_SETTINGS_WRITER_URL + + 'buckets/foo/collections/baa/records', + content_type='application/json', + json={'data': data}, + ) + assert server.records() == data + def test_publish_record(self): server = RemoteSettings('foo', 'baa') server._setup_done = True