From f201ff6c68538fe4042bd7890e8597f146a266ee Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Thu, 12 Dec 2024 18:25:28 -0300 Subject: [PATCH 1/9] feat: craete environment variable --- app/config.py | 3 +++ app/culling.py | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/app/config.py b/app/config.py index c9abb867c..79855a39b 100644 --- a/app/config.py +++ b/app/config.py @@ -6,6 +6,7 @@ from app.common import get_build_version from app.culling import days_to_seconds +from app.culling import hours_to_seconds from app.environment import RuntimeEnvironment from app.logging import get_logger @@ -313,6 +314,8 @@ def __init__(self, runtime_environment): "IMMUTABLE_TIME_TO_DELETE_SECONDS", days_to_seconds(730) ) + self.stale_validation_window_seconds = int(os.getenv("STALE_VALIDATION_WINDOW_SECONDS", hours_to_seconds(1))) + self.host_delete_chunk_size = int(os.getenv("HOST_DELETE_CHUNK_SIZE", "1000")) self.script_chunk_size = int(os.getenv("SCRIPT_CHUNK_SIZE", "500")) self.export_svc_batch_size = int(os.getenv("EXPORT_SVC_BATCH_SIZE", "500")) diff --git a/app/culling.py b/app/culling.py index 7f8b1f170..7a26b91b9 100644 --- a/app/culling.py +++ b/app/culling.py @@ -38,9 +38,10 @@ def culled_timestamp(self, stale_timestamp, culled_seconds): class Conditions: - def __init__(self, staleness, host_type): + def __init__(self, staleness, host_type, config): self.now = datetime.now(timezone.utc) self.host_type = host_type + self.config = config self.staleness_host_type = { None: { @@ -82,6 +83,11 @@ def _culled_timestamp(self): offset = timedelta(seconds=self.staleness_host_type[self.host_type]["culled"]) return self.now - offset + def _stale_in_last_hour(self): + stale = self._stale_timestamp() + offset = timedelta(seconds=self.config.stale_validation_window_seconds) + return stale - offset + @staticmethod def find_host_state(stale_timestamp, stale_warning_timestamp): now = datetime.now(timezone.utc) @@ -96,3 +102,8 @@ def find_host_state(stale_timestamp, stale_warning_timestamp): def days_to_seconds(n_days: int) -> int: factor = 86400 return n_days * factor + + +def hours_to_seconds(n_hours: int) -> int: + factor = 3600 + return n_hours * factor From 223c4c8111c31466bc9f7c67da612532e324cc2d Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Thu, 12 Dec 2024 18:27:32 -0300 Subject: [PATCH 2/9] feat: create host stale filtering logic --- app/culling.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/app/culling.py b/app/culling.py index 7a26b91b9..a860b3495 100644 --- a/app/culling.py +++ b/app/culling.py @@ -71,6 +71,9 @@ def culled(self): def not_culled(self): return self._culled_timestamp(), None + def stale_in(self): + return self._stale_in_last_hour(), self._stale_timestamp() + def _stale_timestamp(self): offset = timedelta(seconds=self.staleness_host_type[self.host_type]["stale"]) return self.now - offset @@ -99,6 +102,12 @@ def find_host_state(stale_timestamp, stale_warning_timestamp): return "stale warning" +def staleness_to_conditions(staleness, staleness_states, host_type, timestamp_filter_func, config=None): + condition = Conditions(staleness, host_type, config) + filtered_states = (state for state in staleness_states if state != "unknown") + return (timestamp_filter_func(*getattr(condition, state)(), host_type=host_type) for state in filtered_states) + + def days_to_seconds(n_days: int) -> int: factor = 86400 return n_days * factor From 6fcb194cddffa60acf2b354ccca741e02068eab2 Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Thu, 12 Dec 2024 18:33:52 -0300 Subject: [PATCH 3/9] refact: refact host-reaper --- host_reaper.py | 111 ++++------------------------------------- jobs/__init__.py | 0 jobs/common.py | 106 +++++++++++++++++++++++++++++++++++++++ lib/host_repository.py | 13 +++-- 4 files changed, 125 insertions(+), 105 deletions(-) create mode 100644 jobs/__init__.py create mode 100644 jobs/common.py diff --git a/host_reaper.py b/host_reaper.py index 03b569e47..87bc48bbd 100755 --- a/host_reaper.py +++ b/host_reaper.py @@ -1,32 +1,19 @@ -#!/usr/bin/python import sys from functools import partial -from prometheus_client import CollectorRegistry -from prometheus_client import push_to_gateway -from sqlalchemy import and_ -from sqlalchemy import create_engine from sqlalchemy import or_ -from sqlalchemy.orm import sessionmaker -from api.cache import init_cache -from app import create_app -from app.auth.identity import create_mock_identity_with_org_id -from app.config import Config from app.environment import RuntimeEnvironment from app.logging import get_logger from app.logging import threadctx from app.models import Host -from app.models import Staleness -from app.queue.event_producer import EventProducer from app.queue.metrics import event_producer_failure from app.queue.metrics import event_producer_success from app.queue.metrics import event_serialization_time -from lib.handlers import ShutdownHandler -from lib.handlers import register_shutdown +from jobs.common import excepthook +from jobs.common import find_hosts_in_state +from jobs.common import main from lib.host_delete import delete_hosts -from lib.host_repository import find_hosts_by_staleness_reaper -from lib.host_repository import find_hosts_sys_default_staleness from lib.metrics import delete_host_count from lib.metrics import delete_host_processing_time from lib.metrics import host_reaper_fail_count @@ -46,65 +33,10 @@ RUNTIME_ENVIRONMENT = RuntimeEnvironment.JOB -def _init_config(): - config = Config(RUNTIME_ENVIRONMENT) - config.log_configuration() - return config - - -def _init_db(config): - engine = create_engine(config.db_uri) - return sessionmaker(bind=engine) - - -def _prometheus_job(namespace): - return f"{PROMETHEUS_JOB}-{namespace}" if namespace else PROMETHEUS_JOB - - -def _excepthook(logger, type, value, traceback): - logger.exception("Host reaper failed", exc_info=value) - - -def filter_culled_hosts_using_custom_staleness(logger, session): - staleness_objects = session.query(Staleness).all() - org_ids = [] - - query_filters = [] - for staleness_obj in staleness_objects: - # Validate which host types for a given org_id never get deleted - logger.debug(f"Looking for hosts from org_id {staleness_obj.org_id} that use custom staleness") - org_ids.append(staleness_obj.org_id) - identity = create_mock_identity_with_org_id(staleness_obj.org_id) - query_filters.append( - and_( - (Host.org_id == staleness_obj.org_id), - find_hosts_by_staleness_reaper(["culled"], identity), - ) - ) - return query_filters, org_ids - - -def filter_culled_hosts_using_sys_default_staleness(logger, org_ids): - # Use the hosts_ids_list to exclude hosts that were found with custom staleness - logger.debug("Looking for hosts that use system default staleness") - return and_(~Host.org_id.in_(org_ids), find_hosts_sys_default_staleness(["culled"])) - - -def find_hosts_to_delete(logger, session): - # Find all host ids that are using custom staleness - query_filters, org_ids = filter_culled_hosts_using_custom_staleness(logger, session) - - # Find all host ids that are not using custom staleness, - # excluding the hosts for the org_ids that use custom staleness - query_filters.append(filter_culled_hosts_using_sys_default_staleness(logger, org_ids)) - - return query_filters - - @host_reaper_fail_count.count_exceptions() def run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application): with application.app.app_context(): - filter_hosts_to_delete = find_hosts_to_delete(logger, session) + filter_hosts_to_delete = find_hosts_in_state(logger, session, ["culled"]) query = session.query(Host).filter(or_(False, *filter_hosts_to_delete)) hosts_processed = config.host_delete_chunk_size @@ -129,36 +61,13 @@ def run(config, logger, session, event_producer, notification_event_producer, sh deletions_remaining -= hosts_processed -def main(logger): - config = _init_config() - application = create_app(RUNTIME_ENVIRONMENT) - init_cache(config, application) - - registry = CollectorRegistry() - for metric in COLLECTED_METRICS: - registry.register(metric) - job = _prometheus_job(config.kubernetes_namespace) - prometheus_shutdown = partial(push_to_gateway, config.prometheus_pushgateway, job, registry) - register_shutdown(prometheus_shutdown, "Pushing metrics") - - Session = _init_db(config) - session = Session() - register_shutdown(session.get_bind().dispose, "Closing database") - - event_producer = EventProducer(config, config.event_topic) - register_shutdown(event_producer.close, "Closing producer") - - notification_event_producer = EventProducer(config, config.notification_topic) - register_shutdown(notification_event_producer.close, "Closing notification producer") - - shutdown_handler = ShutdownHandler() - shutdown_handler.register() - run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application) - - if __name__ == "__main__": logger = get_logger(LOGGER_NAME) - sys.excepthook = partial(_excepthook, logger) + job_type = "Host reaper" + sys.excepthook = partial(excepthook, logger, job_type) threadctx.request_id = None - main(logger) + config, logger, session, event_producer, notification_event_producer, shutdown_handler, application = main( + logger, COLLECTED_METRICS, PROMETHEUS_JOB + ) + run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application) diff --git a/jobs/__init__.py b/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/jobs/common.py b/jobs/common.py new file mode 100644 index 000000000..87086de91 --- /dev/null +++ b/jobs/common.py @@ -0,0 +1,106 @@ +from functools import partial + +from prometheus_client import CollectorRegistry +from prometheus_client import push_to_gateway +from sqlalchemy import ColumnElement +from sqlalchemy import and_ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from api.cache import init_cache +from app import create_app +from app.auth.identity import create_mock_identity_with_org_id +from app.config import Config +from app.environment import RuntimeEnvironment +from app.models import Host +from app.models import Staleness +from app.queue.event_producer import EventProducer +from lib.handlers import ShutdownHandler +from lib.handlers import register_shutdown +from lib.host_repository import find_hosts_by_staleness_job +from lib.host_repository import find_hosts_sys_default_staleness + +RUNTIME_ENVIRONMENT = RuntimeEnvironment.JOB + + +def init_config(): + config = Config(RUNTIME_ENVIRONMENT) + config.log_configuration() + return config + + +def init_db(config): + engine = create_engine(config.db_uri) + return sessionmaker(bind=engine) + + +def prometheus_job(namespace, prometheus_job): + return f"{prometheus_job}-{namespace}" if namespace else prometheus_job + + +def excepthook(logger, job_type, value, traceback): + logger.exception("%s failed", job_type, exc_info=value) + + +def filter_hosts_in_state_using_custom_staleness(logger, session, state: list, config=None): + staleness_objects = session.query(Staleness).all() + org_ids = [] + + query_filters = [] + for staleness_obj in staleness_objects: + # Validate which host types for a given org_id never get deleted + logger.debug(f"Looking for hosts from org_id {staleness_obj.org_id} that use custom staleness") + org_ids.append(staleness_obj.org_id) + identity = create_mock_identity_with_org_id(staleness_obj.org_id) + query_filters.append( + and_( + (Host.org_id == staleness_obj.org_id), + find_hosts_by_staleness_job(state, identity, config), + ) + ) + return query_filters, org_ids + + +def filter_hosts_in_state_using_sys_default_staleness(logger, org_ids, state: list, config=None) -> ColumnElement: + # Use the hosts_ids_list to exclude hosts that were found with custom staleness + logger.debug("Looking for hosts that use system default staleness") + return and_(~Host.org_id.in_(org_ids), find_hosts_sys_default_staleness(state, config)) + + +def find_hosts_in_state(logger, session, state: list, config=None): + # Find all host ids that are using custom staleness + query_filters, org_ids = filter_hosts_in_state_using_custom_staleness(logger, session, state, config) + + # Find all host ids that are not using custom staleness, + # excluding the hosts for the org_ids that use custom staleness + query_filters.append(filter_hosts_in_state_using_sys_default_staleness(logger, org_ids, state, config)) + + return query_filters + + +def main(logger, collected_metrics, prometheus_job_name): + config = init_config() + application = create_app(RUNTIME_ENVIRONMENT) + init_cache(config, application) + + registry = CollectorRegistry() + for metric in collected_metrics: + registry.register(metric) + job = prometheus_job(config.kubernetes_namespace, prometheus_job_name) + prometheus_shutdown = partial(push_to_gateway, config.prometheus_pushgateway, job, registry) + register_shutdown(prometheus_shutdown, "Pushing metrics") + + Session = init_db(config) + session = Session() + register_shutdown(session.get_bind().dispose, "Closing database") + + event_producer = EventProducer(config, config.event_topic) + register_shutdown(event_producer.close, "Closing producer") + + notification_event_producer = EventProducer(config, config.notification_topic) + register_shutdown(notification_event_producer.close, "Closing notification producer") + + shutdown_handler = ShutdownHandler() + shutdown_handler.register() + # run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application) + return config, logger, session, event_producer, notification_event_producer, shutdown_handler, application diff --git a/lib/host_repository.py b/lib/host_repository.py index 58bf5f21e..fb102eb08 100644 --- a/lib/host_repository.py +++ b/lib/host_repository.py @@ -210,22 +210,27 @@ def find_hosts_by_staleness(staleness_types, query, identity): return query.filter(or_(False, *staleness_conditions)) -def find_hosts_by_staleness_reaper(staleness_types, identity): +def find_hosts_by_staleness_job(staleness_types, identity, config=None): logger.debug("find_hosts_by_staleness(%s)", staleness_types) staleness_obj = serialize_staleness_to_dict(get_staleness_obj(identity=identity)) staleness_conditions = [ - or_(False, *staleness_to_conditions(staleness_obj, staleness_types, host_type, stale_timestamp_filter)) + or_(False, *staleness_to_conditions(staleness_obj, staleness_types, host_type, stale_timestamp_filter, config)) for host_type in HOST_TYPES ] return or_(False, *staleness_conditions) -def find_hosts_sys_default_staleness(staleness_types): +def find_hosts_sys_default_staleness(staleness_types, config=None): logger.debug("find hosts with system default staleness") sys_default_staleness = serialize_staleness_to_dict(get_sys_default_staleness()) staleness_conditions = [ - or_(False, *staleness_to_conditions(sys_default_staleness, staleness_types, host_type, stale_timestamp_filter)) + or_( + False, + *staleness_to_conditions( + sys_default_staleness, staleness_types, host_type, stale_timestamp_filter, config + ), + ) for host_type in HOST_TYPES ] From dc088cf58561c87dea7b133ba2c4fa8e0b6d902a Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Thu, 12 Dec 2024 18:34:34 -0300 Subject: [PATCH 4/9] feat: implement stale host notification --- api/filtering/db_filters.py | 6 ++ app/culling.py | 17 +----- app/host_stale.py | 41 +++++++++++++ app/instrumentation.py | 5 ++ host_reaper.py | 1 + jobs/common.py | 14 ++--- lib/host_repository.py | 43 +++++++++++-- stale_host_notification.py | 118 ++++++++++++++++++++++++++++++++++++ 8 files changed, 216 insertions(+), 29 deletions(-) create mode 100644 app/host_stale.py create mode 100644 stale_host_notification.py diff --git a/api/filtering/db_filters.py b/api/filtering/db_filters.py index 516765cf3..a7f72c837 100644 --- a/api/filtering/db_filters.py +++ b/api/filtering/db_filters.py @@ -21,6 +21,7 @@ from app.config import HOST_TYPES from app.culling import Conditions from app.exceptions import ValidationException +from app.host_stale import HostStale from app.logging import get_logger from app.models import OLD_TO_NEW_REPORTER_MAP from app.models import Group @@ -195,6 +196,11 @@ def _timestamp_and_host_type_filter(condition, state): return (_timestamp_and_host_type_filter(condition, state) for state in filtered_states) +def find_stale_host_in_window(staleness, host_type, timestamp_filter_func): + host_stale = HostStale(staleness, host_type) + return (timestamp_filter_func(*host_stale.stale_in_window()),) + + def _registered_with_filter(registered_with: list[str], host_type_filter: set[str | None]) -> list: _query_filter: list = [] if not registered_with: diff --git a/app/culling.py b/app/culling.py index a860b3495..df3fd6bc9 100644 --- a/app/culling.py +++ b/app/culling.py @@ -38,10 +38,9 @@ def culled_timestamp(self, stale_timestamp, culled_seconds): class Conditions: - def __init__(self, staleness, host_type, config): + def __init__(self, staleness, host_type): self.now = datetime.now(timezone.utc) self.host_type = host_type - self.config = config self.staleness_host_type = { None: { @@ -71,9 +70,6 @@ def culled(self): def not_culled(self): return self._culled_timestamp(), None - def stale_in(self): - return self._stale_in_last_hour(), self._stale_timestamp() - def _stale_timestamp(self): offset = timedelta(seconds=self.staleness_host_type[self.host_type]["stale"]) return self.now - offset @@ -86,11 +82,6 @@ def _culled_timestamp(self): offset = timedelta(seconds=self.staleness_host_type[self.host_type]["culled"]) return self.now - offset - def _stale_in_last_hour(self): - stale = self._stale_timestamp() - offset = timedelta(seconds=self.config.stale_validation_window_seconds) - return stale - offset - @staticmethod def find_host_state(stale_timestamp, stale_warning_timestamp): now = datetime.now(timezone.utc) @@ -102,12 +93,6 @@ def find_host_state(stale_timestamp, stale_warning_timestamp): return "stale warning" -def staleness_to_conditions(staleness, staleness_states, host_type, timestamp_filter_func, config=None): - condition = Conditions(staleness, host_type, config) - filtered_states = (state for state in staleness_states if state != "unknown") - return (timestamp_filter_func(*getattr(condition, state)(), host_type=host_type) for state in filtered_states) - - def days_to_seconds(n_days: int) -> int: factor = 86400 return n_days * factor diff --git a/app/host_stale.py b/app/host_stale.py new file mode 100644 index 000000000..3e9f5c6c1 --- /dev/null +++ b/app/host_stale.py @@ -0,0 +1,41 @@ +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +from app.config import Config +from app.environment import RuntimeEnvironment + +RUNTIME_ENVIRONMENT = RuntimeEnvironment.JOB + + +class HostStale: + def __init__(self, staleness, host_type): + self.now = datetime.now(timezone.utc) + self.staleness = staleness + self.host_type = host_type + self.config = Config(RUNTIME_ENVIRONMENT) + + self.staleness_host_type = { + None: { + "stale": staleness["conventional_time_to_stale"], + "warning": staleness["conventional_time_to_stale_warning"], + "culled": staleness["conventional_time_to_delete"], + }, + "edge": { + "stale": staleness["immutable_time_to_stale"], + "warning": staleness["immutable_time_to_stale_warning"], + "culled": staleness["immutable_time_to_delete"], + }, + } + + def _stale_timestamp(self): + offset = timedelta(seconds=self.staleness_host_type[self.host_type]["stale"]) + return self.now - offset + + def _stale_in_last_seconds(self): # default to 3600s / 1h + stale = self._stale_timestamp() + offset = timedelta(seconds=self.config.stale_validation_window_seconds) + return stale - offset + + def stale_in_window(self): + return self._stale_in_last_seconds(), self._stale_timestamp() diff --git a/app/instrumentation.py b/app/instrumentation.py index cd267d2ba..f616b5e06 100644 --- a/app/instrumentation.py +++ b/app/instrumentation.py @@ -352,3 +352,8 @@ def log_patch_staleness_succeeded(logger, staleness_id): def log_create_staleness_failed(logger, org_id): logger.info("Failed to create staleness for account with org_id %s", org_id) + + +# stale host notification +def log_host_stale_notification_succeeded(logger, host_id, control_rule): + logger.info("Sent Notification for stale host: %s", host_id, extra={"access_rule": control_rule}) diff --git a/host_reaper.py b/host_reaper.py index 87bc48bbd..004f59616 100755 --- a/host_reaper.py +++ b/host_reaper.py @@ -1,3 +1,4 @@ +#!/usr/bin/python import sys from functools import partial diff --git a/jobs/common.py b/jobs/common.py index 87086de91..c7c432c1b 100644 --- a/jobs/common.py +++ b/jobs/common.py @@ -42,7 +42,7 @@ def excepthook(logger, job_type, value, traceback): logger.exception("%s failed", job_type, exc_info=value) -def filter_hosts_in_state_using_custom_staleness(logger, session, state: list, config=None): +def filter_hosts_in_state_using_custom_staleness(logger, session, state: list): staleness_objects = session.query(Staleness).all() org_ids = [] @@ -55,25 +55,25 @@ def filter_hosts_in_state_using_custom_staleness(logger, session, state: list, c query_filters.append( and_( (Host.org_id == staleness_obj.org_id), - find_hosts_by_staleness_job(state, identity, config), + find_hosts_by_staleness_job(state, identity), ) ) return query_filters, org_ids -def filter_hosts_in_state_using_sys_default_staleness(logger, org_ids, state: list, config=None) -> ColumnElement: +def filter_hosts_in_state_using_sys_default_staleness(logger, org_ids, state: list) -> ColumnElement: # Use the hosts_ids_list to exclude hosts that were found with custom staleness logger.debug("Looking for hosts that use system default staleness") - return and_(~Host.org_id.in_(org_ids), find_hosts_sys_default_staleness(state, config)) + return and_(~Host.org_id.in_(org_ids), find_hosts_sys_default_staleness(state)) -def find_hosts_in_state(logger, session, state: list, config=None): +def find_hosts_in_state(logger, session, state: list): # Find all host ids that are using custom staleness - query_filters, org_ids = filter_hosts_in_state_using_custom_staleness(logger, session, state, config) + query_filters, org_ids = filter_hosts_in_state_using_custom_staleness(logger, session, state) # Find all host ids that are not using custom staleness, # excluding the hosts for the org_ids that use custom staleness - query_filters.append(filter_hosts_in_state_using_sys_default_staleness(logger, org_ids, state, config)) + query_filters.append(filter_hosts_in_state_using_sys_default_staleness(logger, org_ids, state)) return query_filters diff --git a/lib/host_repository.py b/lib/host_repository.py index fb102eb08..38a44c503 100644 --- a/lib/host_repository.py +++ b/lib/host_repository.py @@ -5,6 +5,7 @@ from sqlalchemy import not_ from sqlalchemy import or_ +from api.filtering.db_filters import find_stale_host_in_window from api.filtering.db_filters import stale_timestamp_filter from api.filtering.db_filters import staleness_to_conditions from api.filtering.db_filters import update_query_for_owner_id @@ -210,26 +211,56 @@ def find_hosts_by_staleness(staleness_types, query, identity): return query.filter(or_(False, *staleness_conditions)) -def find_hosts_by_staleness_job(staleness_types, identity, config=None): +def find_hosts_by_staleness_job(staleness_types, identity): logger.debug("find_hosts_by_staleness(%s)", staleness_types) staleness_obj = serialize_staleness_to_dict(get_staleness_obj(identity=identity)) staleness_conditions = [ - or_(False, *staleness_to_conditions(staleness_obj, staleness_types, host_type, stale_timestamp_filter, config)) + or_( + False, + *staleness_to_conditions(staleness_obj, staleness_types, host_type, stale_timestamp_filter), + ) + for host_type in HOST_TYPES + ] + + return or_(False, *staleness_conditions) + + +def find_stale_hosts(identity): + logger.debug("finding stale hosts") + + staleness_obj = serialize_staleness_to_dict(get_staleness_obj(identity=identity)) + staleness_conditions = [ + or_( + False, + *find_stale_host_in_window(staleness_obj, host_type, stale_timestamp_filter), + ) + for host_type in HOST_TYPES + ] + + return or_(False, *staleness_conditions) + + +def find_stale_host_sys_default_staleness(): + logger.debug("find hosts with system default staleness") + sys_default_staleness = serialize_staleness_to_dict(get_sys_default_staleness()) + staleness_conditions = [ + or_( + False, + *find_stale_host_in_window(sys_default_staleness, host_type, stale_timestamp_filter), + ) for host_type in HOST_TYPES ] return or_(False, *staleness_conditions) -def find_hosts_sys_default_staleness(staleness_types, config=None): +def find_hosts_sys_default_staleness(staleness_types): logger.debug("find hosts with system default staleness") sys_default_staleness = serialize_staleness_to_dict(get_sys_default_staleness()) staleness_conditions = [ or_( False, - *staleness_to_conditions( - sys_default_staleness, staleness_types, host_type, stale_timestamp_filter, config - ), + *staleness_to_conditions(sys_default_staleness, staleness_types, host_type, stale_timestamp_filter), ) for host_type in HOST_TYPES ] diff --git a/stale_host_notification.py b/stale_host_notification.py new file mode 100644 index 000000000..da4dbe43e --- /dev/null +++ b/stale_host_notification.py @@ -0,0 +1,118 @@ +#!/usr/bin/python +import sys +from functools import partial + +from sqlalchemy import ColumnElement +from sqlalchemy import and_ +from sqlalchemy import or_ + +from app.auth.identity import create_mock_identity_with_org_id +from app.auth.identity import to_auth_header +from app.instrumentation import log_host_stale_notification_succeeded +from app.logging import get_logger +from app.logging import threadctx +from app.models import Host +from app.models import Staleness +from app.queue.host_mq import OperationResult +from app.queue.notifications import NotificationType +from app.queue.notifications import send_notification +from jobs.common import excepthook +from jobs.common import main +from lib.host_repository import find_stale_host_sys_default_staleness +from lib.host_repository import find_stale_hosts +from lib.metrics import stale_host_notification_count +from lib.metrics import stale_host_notification_fail_count +from lib.metrics import stale_host_notification_processing_time + +LOGGER_NAME = "stale_host_notification" +PROMETHEUS_JOB = "inventory-stale-host-notification" +COLLECTED_METRICS = ( + # add metric +) + + +def _create_host_operation_result(host, identity, logger): + return OperationResult( + host, + {"b64_identity": to_auth_header(identity)} if identity else None, + None, + None, + None, + partial(log_host_stale_notification_succeeded, logger, host.id, control_rule="HOST_STALE_NOTIFICATION"), + ) + + +def _find_stale_hosts_using_custom_staleness(logger, session) -> ColumnElement: + staleness_objects = session.query(Staleness).all() + org_ids = [] + + query_filters = [] + for staleness_obj in staleness_objects: + # Validate which host types for a given org_id never get deleted + logger.debug(f"Looking for hosts from org_id {staleness_obj.org_id} that use custom staleness") + org_ids.append(staleness_obj.org_id) + identity = create_mock_identity_with_org_id(staleness_obj.org_id) + query_filters.append( + and_( + (Host.org_id == staleness_obj.org_id), + find_stale_hosts(identity), + ) + ) + return query_filters, org_ids + + +def _find_stale_hosts_using_sys_default_staleness(logger, org_ids) -> ColumnElement: + # Use the hosts_ids_list to exclude hosts that were found with custom staleness + logger.debug("Looking for hosts that use system default staleness") + return and_(~Host.org_id.in_(org_ids), find_stale_host_sys_default_staleness()) + + +def _find_stale_hosts(logger, session): + # Find all host ids that are using custom staleness + query_filters, org_ids = _find_stale_hosts_using_custom_staleness(logger, session) + + # Find all host ids that are not using custom staleness, + # excluding the hosts for the org_ids that use custom staleness + query_filters.append(_find_stale_hosts_using_sys_default_staleness(logger, org_ids)) + + return query_filters + + +@stale_host_notification_fail_count.count_exceptions() +def run(logger, session, notification_event_producer, shutdown_handler, application): + with application.app.app_context(), stale_host_notification_processing_time.time(): + filter_stale_hosts = _find_stale_hosts(logger, session) + + query = session.query(Host).filter(or_(False, *filter_stale_hosts)) + stale_hosts = query.all() + if len(stale_hosts) > 0: + logger.info("%s hosts found as stale") + for host in stale_hosts: + identity = create_mock_identity_with_org_id(host.org_id) + result = _create_host_operation_result(host, identity, logger) + try: + send_notification( + notification_event_producer, NotificationType.system_became_stale, vars(result.host_row) + ) + + # TODO: Persist the timestamp off the success run into a new table + + stale_host_notification_count.inc() + result.success_logger() + + except Exception: + logger.error("Error when sending notification") + else: + logger.info("No hosts found as stale") + + +if __name__ == "__main__": + logger = get_logger(LOGGER_NAME) + job_type = "Stale host notification" + sys.excepthook = partial(excepthook, logger, job_type) + + threadctx.request_id = None + _, logger, session, _, notification_event_producer, shutdown_handler, application = main( + logger, COLLECTED_METRICS, PROMETHEUS_JOB + ) + run(logger, session, notification_event_producer, shutdown_handler, application) From bfaaf036ff776bb14500701dfa3699a9f4648dae Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Thu, 12 Dec 2024 18:35:11 -0300 Subject: [PATCH 5/9] tests: Add tests --- tests/helpers/mq_utils.py | 21 ++++++++++ tests/test_custom_staleness.py | 9 +++++ tests/test_notifications.py | 70 ++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+) diff --git a/tests/helpers/mq_utils.py b/tests/helpers/mq_utils.py index 5cb0ef82b..27856ba57 100644 --- a/tests/helpers/mq_utils.py +++ b/tests/helpers/mq_utils.py @@ -183,6 +183,27 @@ def assert_delete_notification_is_valid(notification_event_producer, host): assert host.canonical_facts.get("insights_id") == event["events"][0]["payload"]["insights_id"] +def assert_stale_notification_is_valid(notification_event_producer, host): + event = json.loads(notification_event_producer.event) + + assert isinstance(event, dict) + + expected_keys = { + "timestamp", + "event_type", + "org_id", + "application", + "bundle", + "context", + "events", + } + assert set(event.keys()) == expected_keys + + assert event["event_type"] == "system-became-stale" + + assert host.canonical_facts.get("insights_id") == event["events"][0]["payload"]["insights_id"] + + def assert_patch_event_is_valid( host, event_producer, diff --git a/tests/test_custom_staleness.py b/tests/test_custom_staleness.py index 197a975b7..f05f396cc 100644 --- a/tests/test_custom_staleness.py +++ b/tests/test_custom_staleness.py @@ -43,6 +43,15 @@ "immutable_time_to_delete": 15552000, } +CUSTOM_STALENESS_HOST_BECAME_STALE = { + "conventional_time_to_stale": 1, + "conventional_time_to_stale_warning": 604800, + "conventional_time_to_delete": 1209600, + "immutable_time_to_stale": 1, + "immutable_time_to_stale_warning": 10368000, + "immutable_time_to_delete": 15552000, +} + def test_delete_only_immutable_hosts( flask_app, diff --git a/tests/test_notifications.py b/tests/test_notifications.py index 71afe4d37..618c75d77 100644 --- a/tests/test_notifications.py +++ b/tests/test_notifications.py @@ -1,12 +1,23 @@ import json +from datetime import datetime +from datetime import timedelta +from unittest import mock +from unittest.mock import patch import pytest from app.exceptions import ValidationException +from app.logging import threadctx +from app.models import db +from stale_host_notification import run as run_stale_host_notification +from tests.helpers.db_utils import minimal_db_host +from tests.helpers.mq_utils import assert_stale_notification_is_valid from tests.helpers.mq_utils import assert_system_registered_notification_is_valid from tests.helpers.test_utils import SYSTEM_IDENTITY from tests.helpers.test_utils import generate_uuid from tests.helpers.test_utils import minimal_host +from tests.test_custom_staleness import CUSTOM_STALENESS_HOST_BECAME_STALE +from tests.test_custom_staleness import CUSTOM_STALENESS_NO_HOSTS_TO_DELETE OWNER_ID = SYSTEM_IDENTITY["system"]["cn"] @@ -68,6 +79,65 @@ def test_add_host_fail(mq_create_or_update_host, notification_event_producer_moc # System Became Stale +def test_host_became_stale( + notification_event_producer_mock, + db_create_staleness_culling, + flask_app, + db_create_host, + db_get_host, +): + db_create_staleness_culling(**CUSTOM_STALENESS_HOST_BECAME_STALE) + + with patch("app.models.datetime") as models_datetime, patch("app.host_stale.datetime") as culling_datetime: + models_datetime.now.return_value = datetime.now() - timedelta(minutes=5) + culling_datetime.now.return_value = datetime.now() + + host = minimal_db_host(reporter="some reporter") + created_host = db_create_host(host=host) + assert db_get_host(created_host.id) + + threadctx.request_id = None + run_stale_host_notification( + mock.Mock(), + db.session, + notification_event_producer=notification_event_producer_mock, + shutdown_handler=mock.Mock(**{"shut_down.return_value": False}), + application=flask_app, + ) + + assert_stale_notification_is_valid( + notification_event_producer=notification_event_producer_mock, host=created_host + ) + + +def test_host_did_not_became_stale( + notification_event_producer_mock, + db_create_staleness_culling, + flask_app, + db_create_host, + db_get_host, +): + db_create_staleness_culling(**CUSTOM_STALENESS_NO_HOSTS_TO_DELETE) + + with patch("app.models.datetime") as models_datetime, patch("app.host_stale.datetime") as culling_datetime: + models_datetime.now.return_value = datetime.now() - timedelta(minutes=5) + culling_datetime.now.return_value = datetime.now() + + host = minimal_db_host(reporter="some reporter") + created_host = db_create_host(host=host) + assert db_get_host(created_host.id) + + threadctx.request_id = None + run_stale_host_notification( + mock.Mock(), + db.session, + notification_event_producer=notification_event_producer_mock, + shutdown_handler=mock.Mock(**{"shut_down.return_value": False}), + application=flask_app, + ) + + assert notification_event_producer_mock.event is None + # System Deleted From e2e9b1a149e1527571f38a0e9089e863175f8bb1 Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Mon, 16 Dec 2024 19:51:34 -0300 Subject: [PATCH 6/9] feat: Add job to Clowdapp --- deploy/clowdapp.yml | 69 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/deploy/clowdapp.yml b/deploy/clowdapp.yml index d0092b5b5..99a953ef9 100644 --- a/deploy/clowdapp.yml +++ b/deploy/clowdapp.yml @@ -932,6 +932,64 @@ objects: requests: cpu: ${CPU_REQUEST_REAPER} memory: ${MEMORY_REQUEST_REAPER} + - name: stale-host-notification + schedule: '@hourly' + concurrencyPolicy: "Forbid" + suspend: ${{STALE_HOST_NOTIFICATION_SUSPEND}} + restartPolicy: Never + podSpec: + image: ${IMAGE}:${IMAGE_TAG} + args: ["./stale_host_notification.py"] + env: + - name: INVENTORY_LOG_LEVEL + value: ${LOG_LEVEL} + - name: INVENTORY_DB_SSL_MODE + value: ${INVENTORY_DB_SSL_MODE} + - name: INVENTORY_DB_SSL_CERT + value: ${INVENTORY_DB_SSL_CERT} + - name: KAFKA_BOOTSTRAP_SERVERS + value: ${KAFKA_BOOTSTRAP_HOST}:${KAFKA_BOOTSTRAP_PORT} + - name: PROMETHEUS_PUSHGATEWAY + value: ${PROMETHEUS_PUSHGATEWAY} + - name: KAFKA_EVENT_TOPIC + value: ${KAFKA_EVENT_TOPIC} + - name: KAFKA_NOTIFICATION_TOPIC + value: ${KAFKA_NOTIFICATION_TOPIC} + - name: KAFKA_PRODUCER_ACKS + value: ${KAFKA_PRODUCER_ACKS} + - name: KAFKA_PRODUCER_RETRIES + value: ${KAFKA_PRODUCER_RETRIES} + - name: KAFKA_PRODUCER_RETRY_BACKOFF_MS + value: ${KAFKA_PRODUCER_RETRY_BACKOFF_MS} + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: KAFKA_SECURITY_PROTOCOL + value: ${KAFKA_SECURITY_PROTOCOL} + - name: KAFKA_SASL_MECHANISM + value: ${KAFKA_SASL_MECHANISM} + - name: CLOWDER_ENABLED + value: "true" + - name: INVENTORY_DB_SCHEMA + value: "${INVENTORY_DB_SCHEMA}" + - name: UNLEASH_URL + value: ${UNLEASH_URL} + - name: UNLEASH_TOKEN + valueFrom: + secretKeyRef: + name: ${UNLEASH_SECRET_NAME} + key: CLIENT_ACCESS_TOKEN + optional: true + - name: BYPASS_UNLEASH + value: ${BYPASS_UNLEASH} + resources: + limits: + cpu: ${CPU_LIMIT_STALE_HOST_NOTIFICAION} + memory: ${MEMORY_LIMIT_STALE_HOST_NOTIFICAION} + requests: + cpu: ${CPU_REQUEST_STALE_HOST_NOTIFICAION} + memory: ${MEMORY_REQUEST_STALE_HOST_NOTIFICAION} - name: syndicator schedule: ${SYNDICATOR_CRON_SCHEDULE} concurrencyPolicy: "Forbid" @@ -1384,6 +1442,15 @@ parameters: - name: MEMORY_LIMIT_REAPER value: 512Mi +- name: CPU_REQUEST_STALE_HOST_NOTIFICAION + value: 250m +- name: CPU_LIMIT_STALE_HOST_NOTIFICAION + value: 500m +- name: MEMORY_REQUEST_STALE_HOST_NOTIFICAION + value: 256Mi +- name: MEMORY_LIMIT_STALE_HOST_NOTIFICAION + value: 512Mi + - name: CPU_REQUEST_SP_VALIDATOR value: 250m - name: CPU_LIMIT_SP_VALIDATOR @@ -1560,6 +1627,8 @@ parameters: value: 'true' - name: REAPER_SUSPEND value: 'true' +- name: STALE_HOST_NOTIFICATION_SUSPEND + value: 'true' - name: SYNDICATOR_SUSPEND value: 'false' - name: SYNDICATOR_CRON_SCHEDULE From 0f35c64c5a199b350aafb13ee4fe478ef93517ca Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Tue, 17 Dec 2024 11:16:50 -0300 Subject: [PATCH 7/9] feat: Update Dockerfile --- Dockerfile | 2 ++ dev.dockerfile | 2 ++ jobs/__init__.py | 0 jobs/common.py | 0 stale_host_notification.py | 0 5 files changed, 4 insertions(+) mode change 100644 => 100755 jobs/__init__.py mode change 100644 => 100755 jobs/common.py mode change 100644 => 100755 stale_host_notification.py diff --git a/Dockerfile b/Dockerfile index 7d9943dfd..802eb6300 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,7 +42,9 @@ COPY run_command.sh run_command.sh COPY run.py run.py COPY system_profile_validator.py system_profile_validator.py COPY inv_migration_runner.py inv_migration_runner.py +COPY stale_host_notification.py stale_host_notification.py COPY app_migrations/ app_migrations/ +COPY jobs/ jobs/ ENV PIP_NO_CACHE_DIR=1 ENV PIPENV_CLEAR=1 diff --git a/dev.dockerfile b/dev.dockerfile index 69a60cfb0..88c8859da 100644 --- a/dev.dockerfile +++ b/dev.dockerfile @@ -14,6 +14,7 @@ COPY migrations/ migrations/ COPY swagger/ swagger/ COPY tests/ tests/ COPY utils/ utils/ +COPY jobs/ jobs/ COPY Makefile Makefile COPY gunicorn.conf.py gunicorn.conf.py COPY host_reaper.py host_reaper.py @@ -31,6 +32,7 @@ COPY run_gunicorn.py run_gunicorn.py COPY run_command.sh run_command.sh COPY run.py run.py COPY system_profile_validator.py system_profile_validator.py +COPY stale_host_notification.py stale_host_notification.py RUN chown -R 1001:0 ./ USER 1001 diff --git a/jobs/__init__.py b/jobs/__init__.py old mode 100644 new mode 100755 diff --git a/jobs/common.py b/jobs/common.py old mode 100644 new mode 100755 diff --git a/stale_host_notification.py b/stale_host_notification.py old mode 100644 new mode 100755 From bddad3b49b033051db96ed5f9de56db958426cdc Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Tue, 17 Dec 2024 17:03:12 -0300 Subject: [PATCH 8/9] feat: improve logging and add metrics --- lib/metrics.py | 11 +++++++++++ stale_host_notification.py | 6 ++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/lib/metrics.py b/lib/metrics.py index 70654b334..b80057ce9 100644 --- a/lib/metrics.py +++ b/lib/metrics.py @@ -52,3 +52,14 @@ "inventory_new_export_seconds", "Time spent to create a new host export report" ) create_export_count = Counter("inventory_create_export", "The total amount of host exports created") + +# Stale Host Notification +stale_host_notification_count = Counter( + "inventory_stale_host_notification_count", "The total amount of stale hosts notified" +) +stale_host_notification_processing_time = Summary( + "inventory_stale_host_notification_processing_time", "Time spent notifying stale hosts from the database" +) +stale_host_notification_fail_count = Counter( + "inventory_stale_host_notification_fail_count", "The total amount of Stale Host Notification failures." +) diff --git a/stale_host_notification.py b/stale_host_notification.py index da4dbe43e..89c8083e1 100755 --- a/stale_host_notification.py +++ b/stale_host_notification.py @@ -27,7 +27,9 @@ LOGGER_NAME = "stale_host_notification" PROMETHEUS_JOB = "inventory-stale-host-notification" COLLECTED_METRICS = ( - # add metric + stale_host_notification_count, + stale_host_notification_processing_time, + stale_host_notification_fail_count, ) @@ -86,7 +88,7 @@ def run(logger, session, notification_event_producer, shutdown_handler, applicat query = session.query(Host).filter(or_(False, *filter_stale_hosts)) stale_hosts = query.all() if len(stale_hosts) > 0: - logger.info("%s hosts found as stale") + logger.info("%s hosts found as stale", len(stale_hosts)) for host in stale_hosts: identity = create_mock_identity_with_org_id(host.org_id) result = _create_host_operation_result(host, identity, logger) From 40db8f68db6f04ef87830787441558c5ec1ac0c5 Mon Sep 17 00:00:00 2001 From: Joao Paulo Ramos <jramos@redhat.com> Date: Fri, 24 Jan 2025 15:03:32 -0300 Subject: [PATCH 9/9] feat: Add Prometheus pushgateway container --- dev.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dev.yml b/dev.yml index 42d33b571..d7cf3a418 100644 --- a/dev.yml +++ b/dev.yml @@ -107,6 +107,12 @@ services: db: condition: service_healthy + prometheus-gateway: + container_name: prometheus_gateway + image: prom/pushgateway + ports: + - 9091:9091 + minio: container_name: minio image: docker.io/minio/minio