Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RHINENG-7833] - Send notification newly-stale hosts #2123

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ COPY run_command.sh run_command.sh
COPY run.py run.py
COPY system_profile_validator.py system_profile_validator.py
COPY inv_migration_runner.py inv_migration_runner.py
COPY stale_host_notification.py stale_host_notification.py
COPY app_migrations/ app_migrations/
COPY jobs/ jobs/

ENV PIP_NO_CACHE_DIR=1
ENV PIPENV_CLEAR=1
Expand Down
3 changes: 3 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from app.common import get_build_version
from app.culling import days_to_seconds
from app.culling import hours_to_seconds
from app.environment import RuntimeEnvironment
from app.logging import get_logger

Expand Down Expand Up @@ -312,6 +313,8 @@ def __init__(self, runtime_environment):
"IMMUTABLE_TIME_TO_DELETE_SECONDS", days_to_seconds(730)
)

self.stale_validation_window_seconds = int(os.getenv("STALE_VALIDATION_WINDOW_SECONDS", hours_to_seconds(1)))

self.host_delete_chunk_size = int(os.getenv("HOST_DELETE_CHUNK_SIZE", "1000"))
self.script_chunk_size = int(os.getenv("SCRIPT_CHUNK_SIZE", "500"))
self.export_svc_batch_size = int(os.getenv("EXPORT_SVC_BATCH_SIZE", "500"))
Expand Down
20 changes: 17 additions & 3 deletions app/culling.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ def culled_timestamp(self, stale_timestamp, culled_seconds):


class Conditions:
def __init__(self, staleness, host_type):
def __init__(self, staleness, host_type, config):
self.now = datetime.now(timezone.utc)
self.host_type = host_type
self.config = config

self.staleness_host_type = {
None: {
Expand Down Expand Up @@ -70,6 +71,9 @@ def culled(self):
def not_culled(self):
return self._culled_timestamp(), None

def stale_in(self):
return self._stale_in_last_hour(), self._stale_timestamp()

def _stale_timestamp(self):
offset = timedelta(seconds=self.staleness_host_type[self.host_type]["stale"])
return self.now - offset
Expand All @@ -82,6 +86,11 @@ def _culled_timestamp(self):
offset = timedelta(seconds=self.staleness_host_type[self.host_type]["culled"])
return self.now - offset

def _stale_in_last_hour(self):
stale = self._stale_timestamp()
offset = timedelta(seconds=self.config.stale_validation_window_seconds)
return stale - offset

@staticmethod
def find_host_state(stale_timestamp, stale_warning_timestamp):
now = datetime.now(timezone.utc)
Expand All @@ -93,12 +102,17 @@ def find_host_state(stale_timestamp, stale_warning_timestamp):
return "stale warning"


def staleness_to_conditions(staleness, staleness_states, host_type, timestamp_filter_func):
condition = Conditions(staleness, host_type)
def staleness_to_conditions(staleness, staleness_states, host_type, timestamp_filter_func, config=None):
condition = Conditions(staleness, host_type, config)
filtered_states = (state for state in staleness_states if state != "unknown")
return (timestamp_filter_func(*getattr(condition, state)(), host_type=host_type) for state in filtered_states)


def days_to_seconds(n_days: int) -> int:
factor = 86400
return n_days * factor


def hours_to_seconds(n_hours: int) -> int:
factor = 3600
return n_hours * factor
5 changes: 5 additions & 0 deletions app/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,8 @@ def log_patch_staleness_succeeded(logger, staleness_id):

def log_create_staleness_failed(logger, org_id):
logger.info("Failed to create staleness for account with org_id %s", org_id)


# stale host notification
def log_host_stale_notification_succeeded(logger, host_id, control_rule):
logger.info("Sent Notification for stale host: %s", host_id, extra={"access_rule": control_rule})
83 changes: 83 additions & 0 deletions deploy/clowdapp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,78 @@ objects:
requests:
cpu: ${CPU_REQUEST_REAPER}
memory: ${MEMORY_REQUEST_REAPER}
- name: stale-host-notification
schedule: '@hourly'
concurrencyPolicy: "Forbid"
suspend: ${{STALE_HOST_NOTIFICATION_SUSPEND}}
restartPolicy: Never
podSpec:
image: ${IMAGE}:${IMAGE_TAG}
args: ["./stale_host_notification.py"]
env:
- name: INVENTORY_LOG_LEVEL
value: ${LOG_LEVEL}
- name: INVENTORY_DB_SSL_MODE
value: ${INVENTORY_DB_SSL_MODE}
- name: INVENTORY_DB_SSL_CERT
value: ${INVENTORY_DB_SSL_CERT}
- name: KAFKA_BOOTSTRAP_SERVERS
value: ${KAFKA_BOOTSTRAP_HOST}:${KAFKA_BOOTSTRAP_PORT}
- name: PAYLOAD_TRACKER_KAFKA_TOPIC
value: ${PAYLOAD_TRACKER_KAFKA_TOPIC}
- name: PAYLOAD_TRACKER_SERVICE_NAME
value: inventory-mq-service
- name: PAYLOAD_TRACKER_ENABLED
value: 'true'
- name: PROMETHEUS_PUSHGATEWAY
value: ${PROMETHEUS_PUSHGATEWAY}
- name: KAFKA_EVENT_TOPIC
value: ${KAFKA_EVENT_TOPIC}
- name: KAFKA_NOTIFICATION_TOPIC
value: ${KAFKA_NOTIFICATION_TOPIC}
- name: KAFKA_PRODUCER_ACKS
value: ${KAFKA_PRODUCER_ACKS}
- name: KAFKA_PRODUCER_RETRIES
value: ${KAFKA_PRODUCER_RETRIES}
- name: KAFKA_PRODUCER_RETRY_BACKOFF_MS
value: ${KAFKA_PRODUCER_RETRY_BACKOFF_MS}
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: KAFKA_SECURITY_PROTOCOL
value: ${KAFKA_SECURITY_PROTOCOL}
- name: KAFKA_SASL_MECHANISM
value: ${KAFKA_SASL_MECHANISM}
- name: CLOWDER_ENABLED
value: "true"
- name: INVENTORY_DB_SCHEMA
value: "${INVENTORY_DB_SCHEMA}"
- name: INVENTORY_API_CACHE_TIMEOUT_SECONDS
value: "${INVENTORY_API_CACHE_TIMEOUT_SECONDS}"
- name: INVENTORY_API_CACHE_TYPE
value: "${INVENTORY_API_CACHE_TYPE}"
- name: INVENTORY_CACHE_INSIGHTS_CLIENT_SYSTEM_TIMEOUT_SEC
value: "${INVENTORY_CACHE_INSIGHTS_CLIENT_SYSTEM_TIMEOUT_SEC}"
- name: INVENTORY_CACHE_THREAD_POOL_MAX_WORKERS
value: "${INVENTORY_CACHE_THREAD_POOL_MAX_WORKERS}"
- name: UNLEASH_URL
value: ${UNLEASH_URL}
- name: UNLEASH_TOKEN
valueFrom:
secretKeyRef:
name: ${UNLEASH_SECRET_NAME}
key: CLIENT_ACCESS_TOKEN
optional: true
- name: BYPASS_UNLEASH
value: ${BYPASS_UNLEASH}
resources:
limits:
cpu: ${CPU_LIMIT_STALE_HOST_NOTIFICAION}
memory: ${MEMORY_LIMIT_STALE_HOST_NOTIFICAION}
requests:
cpu: ${CPU_REQUEST_STALE_HOST_NOTIFICAION}
memory: ${MEMORY_REQUEST_STALE_HOST_NOTIFICAION}
- name: syndicator
schedule: ${SYNDICATOR_CRON_SCHEDULE}
concurrencyPolicy: "Forbid"
Expand Down Expand Up @@ -1377,6 +1449,15 @@ parameters:
- name: MEMORY_LIMIT_REAPER
value: 512Mi

- name: CPU_REQUEST_STALE_HOST_NOTIFICAION
value: 250m
- name: CPU_LIMIT_STALE_HOST_NOTIFICAION
value: 500m
- name: MEMORY_REQUEST_STALE_HOST_NOTIFICAION
value: 256Mi
- name: MEMORY_LIMIT_STALE_HOST_NOTIFICAION
value: 512Mi

- name: CPU_REQUEST_SP_VALIDATOR
value: 250m
- name: CPU_LIMIT_SP_VALIDATOR
Expand Down Expand Up @@ -1553,6 +1634,8 @@ parameters:
value: 'true'
- name: REAPER_SUSPEND
value: 'true'
- name: STALE_HOST_NOTIFICATION_SUSPEND
value: 'true'
- name: SYNDICATOR_SUSPEND
value: 'false'
- name: SYNDICATOR_CRON_SCHEDULE
Expand Down
2 changes: 2 additions & 0 deletions dev.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ COPY migrations/ migrations/
COPY swagger/ swagger/
COPY tests/ tests/
COPY utils/ utils/
COPY jobs/ jobs/
COPY Makefile Makefile
COPY gunicorn.conf.py gunicorn.conf.py
COPY host_reaper.py host_reaper.py
Expand All @@ -31,6 +32,7 @@ COPY run_gunicorn.py run_gunicorn.py
COPY run_command.sh run_command.sh
COPY run.py run.py
COPY system_profile_validator.py system_profile_validator.py
COPY stale_host_notification.py stale_host_notification.py
RUN chown -R 1001:0 ./
USER 1001

Expand Down
110 changes: 10 additions & 100 deletions host_reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,19 @@
import sys
from functools import partial

from prometheus_client import CollectorRegistry
from prometheus_client import push_to_gateway
from sqlalchemy import and_
from sqlalchemy import create_engine
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker

from api.cache import init_cache
from app import create_app
from app.auth.identity import create_mock_identity_with_org_id
from app.config import Config
from app.environment import RuntimeEnvironment
from app.logging import get_logger
from app.logging import threadctx
from app.models import Host
from app.models import Staleness
from app.queue.event_producer import EventProducer
from app.queue.metrics import event_producer_failure
from app.queue.metrics import event_producer_success
from app.queue.metrics import event_serialization_time
from lib.handlers import ShutdownHandler
from lib.handlers import register_shutdown
from jobs.common import excepthook
from jobs.common import find_hosts_in_state
from jobs.common import main
from lib.host_delete import delete_hosts
from lib.host_repository import find_hosts_by_staleness_reaper
from lib.host_repository import find_hosts_sys_default_staleness
from lib.metrics import delete_host_count
from lib.metrics import delete_host_processing_time
from lib.metrics import host_reaper_fail_count
Expand All @@ -46,65 +34,10 @@
RUNTIME_ENVIRONMENT = RuntimeEnvironment.JOB


def _init_config():
config = Config(RUNTIME_ENVIRONMENT)
config.log_configuration()
return config


def _init_db(config):
engine = create_engine(config.db_uri)
return sessionmaker(bind=engine)


def _prometheus_job(namespace):
return f"{PROMETHEUS_JOB}-{namespace}" if namespace else PROMETHEUS_JOB


def _excepthook(logger, type, value, traceback):
logger.exception("Host reaper failed", exc_info=value)


def filter_culled_hosts_using_custom_staleness(logger, session):
staleness_objects = session.query(Staleness).all()
org_ids = []

query_filters = []
for staleness_obj in staleness_objects:
# Validate which host types for a given org_id never get deleted
logger.debug(f"Looking for hosts from org_id {staleness_obj.org_id} that use custom staleness")
org_ids.append(staleness_obj.org_id)
identity = create_mock_identity_with_org_id(staleness_obj.org_id)
query_filters.append(
and_(
(Host.org_id == staleness_obj.org_id),
find_hosts_by_staleness_reaper(["culled"], identity),
)
)
return query_filters, org_ids


def filter_culled_hosts_using_sys_default_staleness(logger, org_ids):
# Use the hosts_ids_list to exclude hosts that were found with custom staleness
logger.debug("Looking for hosts that use system default staleness")
return and_(~Host.org_id.in_(org_ids), find_hosts_sys_default_staleness(["culled"]))


def find_hosts_to_delete(logger, session):
# Find all host ids that are using custom staleness
query_filters, org_ids = filter_culled_hosts_using_custom_staleness(logger, session)

# Find all host ids that are not using custom staleness,
# excluding the hosts for the org_ids that use custom staleness
query_filters.append(filter_culled_hosts_using_sys_default_staleness(logger, org_ids))

return query_filters


@host_reaper_fail_count.count_exceptions()
def run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application):
with application.app.app_context():
filter_hosts_to_delete = find_hosts_to_delete(logger, session)
filter_hosts_to_delete = find_hosts_in_state(logger, session, ["culled"])

query = session.query(Host).filter(or_(False, *filter_hosts_to_delete))
hosts_processed = config.host_delete_chunk_size
Expand All @@ -129,36 +62,13 @@ def run(config, logger, session, event_producer, notification_event_producer, sh
deletions_remaining -= hosts_processed


def main(logger):
config = _init_config()
application = create_app(RUNTIME_ENVIRONMENT)
init_cache(config, application)

registry = CollectorRegistry()
for metric in COLLECTED_METRICS:
registry.register(metric)
job = _prometheus_job(config.kubernetes_namespace)
prometheus_shutdown = partial(push_to_gateway, config.prometheus_pushgateway, job, registry)
register_shutdown(prometheus_shutdown, "Pushing metrics")

Session = _init_db(config)
session = Session()
register_shutdown(session.get_bind().dispose, "Closing database")

event_producer = EventProducer(config, config.event_topic)
register_shutdown(event_producer.close, "Closing producer")

notification_event_producer = EventProducer(config, config.notification_topic)
register_shutdown(notification_event_producer.close, "Closing notification producer")

shutdown_handler = ShutdownHandler()
shutdown_handler.register()
run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application)


if __name__ == "__main__":
logger = get_logger(LOGGER_NAME)
sys.excepthook = partial(_excepthook, logger)
job_type = "Host reaper"
sys.excepthook = partial(excepthook, logger, job_type)

threadctx.request_id = None
main(logger)
config, logger, session, event_producer, notification_event_producer, shutdown_handler, application = main(
logger, COLLECTED_METRICS, PROMETHEUS_JOB
)
run(config, logger, session, event_producer, notification_event_producer, shutdown_handler, application)
Empty file added jobs/__init__.py
Empty file.
Loading