Skip to content

Commit

Permalink
Upload multiple filters
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinMind committed Nov 8, 2024
1 parent dcec5cc commit f30bf73
Show file tree
Hide file tree
Showing 11 changed files with 430 additions and 189 deletions.
99 changes: 53 additions & 46 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import List

import waffle
from django_statsd.clients import statsd
Expand All @@ -13,7 +14,7 @@

from .mlbf import MLBF
from .models import Block, BlocklistSubmission, BlockType
from .tasks import cleanup_old_files, process_blocklistsubmission, upload_filter
from .tasks import process_blocklistsubmission, upload_filter
from .utils import datetime_to_ts


Expand All @@ -28,8 +29,8 @@ def get_last_generation_time():
return get_config(MLBF_TIME_CONFIG_KEY, None, json_value=True)


def get_base_generation_time():
return get_config(MLBF_BASE_ID_CONFIG_KEY, None, json_value=True)
def get_base_generation_time(block_type: BlockType):
return get_config(MLBF_BASE_ID_CONFIG_KEY(block_type), None, json_value=True)


def get_blocklist_last_modified_time():
Expand Down Expand Up @@ -68,72 +69,78 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
# This timestamp represents the last time the MLBF was generated and uploaded.
# It could have been a base filter or a stash.
last_generation_time = get_last_generation_time()
# This timestamp represents the point in time when
# the base filter was generated and uploaded.
base_generation_time = get_base_generation_time()

mlbf = MLBF.generate_from_db(generation_time)

base_filter = (
MLBF.load_from_storage(base_generation_time)
if base_generation_time is not None
else None
)

previous_filter = (
# Only load previoous filter if there is a timestamp to use
# and that timestamp is not the same as the base_filter
MLBF.load_from_storage(last_generation_time)
if last_generation_time is not None
and (base_filter is None or base_filter.created_at != last_generation_time)
else base_filter
else None
)

changes_count = mlbf.blocks_changed_since_previous(
BlockType.BLOCKED, previous_filter
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_changed', changes_count
base_filters_to_update: List[BlockType] = []
update_stash = False

# Determine which base filters need to be re uploaded
# and whether the stash needs to be updated
for block_type in BlockType:
base_generation_time = get_base_generation_time(block_type)
base_filter = (
MLBF.load_from_storage(base_generation_time)
if base_generation_time is not None
else None
)

update_filter = (
force_base
or base_filter is None
or mlbf.blocks_changed_since_previous(block_type, base_filter)
> BASE_REPLACE_THRESHOLD
)

if update_filter:
base_filters_to_update.append(block_type)
elif (
mlbf.blocks_changed_since_previous(
block_type, previous_filter or base_filter
)
> 0
):
update_stash = True

previous_filter_is_stale = (
previous_filter is not None
and previous_filter.created_at < get_blocklist_last_modified_time()
)

need_update = (
force_base
or base_filter is None
or (
previous_filter is not None
and previous_filter.created_at < get_blocklist_last_modified_time()
)
or changes_count > 0
len(base_filters_to_update) > 0 or update_stash or previous_filter_is_stale
)
if not need_update:
log.info('No new/modified/deleted Blocks in database; skipping MLBF generation')
return
return mlbf.delete()

statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.blocked_count',
len(mlbf.data.blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.soft_blocked_count',
len(mlbf.data.soft_blocked_items),
)
statsd.incr(
'blocklist.cron.upload_mlbf_to_remote_settings.not_blocked_count',
len(mlbf.data.not_blocked_items),
)

make_base_filter = (
force_base
or base_filter is None
or previous_filter is None
or mlbf.blocks_changed_since_previous(BlockType.BLOCKED, base_filter)
> BASE_REPLACE_THRESHOLD
)
mlbf.generate_and_write_stash(previous_filter) if update_stash else None

if make_base_filter:
mlbf.generate_and_write_filter()
else:
mlbf.generate_and_write_stash(previous_filter)
for block_type in base_filters_to_update:
mlbf.generate_and_write_filter(block_type)

upload_filter.delay(generation_time, is_base=make_base_filter)

if base_filter:
cleanup_old_files.delay(base_filter_id=base_filter.created_at)
upload_filter.delay(
generation_time,
filters_to_update=[key.name for key in base_filters_to_update],
update_stash=update_stash,
)


def process_blocklistsubmissions():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import olympia.core.logger
from olympia.blocklist.mlbf import MLBF
from olympia.blocklist.models import BlockType


log = olympia.core.logger.getLogger('z.amo.blocklist')
Expand All @@ -29,6 +30,11 @@ def add_arguments(self, parser):
'the database',
default=None,
)
parser.add_argument(
'--block-type',
help='Block type to export',
default=None,
)

def load_json(self, json_path):
with open(json_path) as json_file:
Expand All @@ -38,6 +44,7 @@ def load_json(self, json_path):
def handle(self, *args, **options):
log.debug('Exporting blocklist to file')
mlbf = MLBF.generate_from_db(options.get('id'))
block_type = BlockType[options.get('block_type')]

if options.get('block_guids_input'):
mlbf.blocked_items = list(
Expand All @@ -52,4 +59,4 @@ def handle(self, *args, **options):
)
)

mlbf.generate_and_write_filter()
mlbf.generate_and_write_filter(block_type)
44 changes: 29 additions & 15 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,15 @@ class BaseMLBFLoader:
def __init__(self, storage: SafeStorage):
self.storage = storage

def data_type_key(self, key: MLBFDataType) -> str:
return key.name.lower()

@cached_property
def _raw(self):
"""
raw serializable data for the given MLBFLoader.
"""
return {self.data_type_key(key): self[key] for key in MLBFDataType}
return {MLBF.data_type_key(key): self[key] for key in MLBFDataType}

def __getitem__(self, key: MLBFDataType) -> List[str]:
return getattr(self, f'{self.data_type_key(key)}_items')
return getattr(self, f'{MLBF.data_type_key(key)}_items')

@cached_property
def _cache_path(self):
Expand All @@ -114,15 +111,15 @@ def __init__(self, storage: SafeStorage):

@cached_property
def blocked_items(self) -> List[str]:
return self._data.get(self.data_type_key(MLBFDataType.BLOCKED))
return self._data.get(MLBF.data_type_key(MLBFDataType.BLOCKED))

@cached_property
def soft_blocked_items(self) -> List[str]:
return self._data.get(self.data_type_key(MLBFDataType.SOFT_BLOCKED))
return self._data.get(MLBF.data_type_key(MLBFDataType.SOFT_BLOCKED))

@cached_property
def not_blocked_items(self) -> List[str]:
return self._data.get(self.data_type_key(MLBFDataType.NOT_BLOCKED))
return self._data.get(MLBF.data_type_key(MLBFDataType.NOT_BLOCKED))


class MLBFDataBaseLoader(BaseMLBFLoader):
Expand Down Expand Up @@ -197,6 +194,10 @@ def __init__(
)
self.data: BaseMLBFLoader = data_class(storage=self.storage)

@classmethod
def data_type_key(cls, key: MLBFDataType) -> str:
return key.name.lower()

@classmethod
def hash_filter_inputs(cls, input_list):
"""Returns a list"""
Expand All @@ -205,32 +206,45 @@ def hash_filter_inputs(cls, input_list):
for (guid, version) in input_list
]

@property
def filter_path(self):
return self.storage.path('filter')
def filter_path(self, block_type: BlockType):
return self.storage.path(f'filter-{self.data_type_key(block_type)}')

@property
def stash_path(self):
return self.storage.path('stash.json')

def generate_and_write_filter(self):
def delete(self):
if self.storage.exists(self.storage.base_location):
self.storage.rm_stored_dir(self.storage.base_location)

def generate_and_write_filter(self, block_type: BlockType):
stats = {}

blocked = self.data[block_type]

not_blocked = [
self.data[_block_type]
for _block_type in BlockType
if _block_type != block_type
]

bloomfilter = generate_mlbf(
stats=stats,
blocked=self.data.blocked_items,
not_blocked=self.data.not_blocked_items,
blocked=blocked,
not_blocked=not_blocked,
)

# write bloomfilter
mlbf_path = self.filter_path
mlbf_path = self.filter_path(block_type)
with self.storage.open(mlbf_path, 'wb') as filter_file:
log.info(f'Writing to file {mlbf_path}')
bloomfilter.tofile(filter_file)
stats['mlbf_filesize'] = os.stat(mlbf_path).st_size

log.info(json.dumps(stats))

return bloomfilter

def generate_diffs(
self, previous_mlbf: 'MLBF' = None
) -> Dict[BlockType, Tuple[List[str], List[str], int]]:
Expand Down
Loading

0 comments on commit f30bf73

Please sign in to comment.