Skip to content

Commit

Permalink
system culling M1
Browse files Browse the repository at this point in the history
https://projects.engineering.redhat.com/browse/RHCLOUD-2575

* Save stale_timestamp and reporter

Added stale_timestamp and reporter fields to the add_hosts operation.
If one of the fields is present, the other one must be filled in too.
The stale_timestamp gets updated if

* it’s either not set at all
* or if the reporter is the same and the new timestamp is longer
* or if the reporter is different and the new timestamp is shorter

* Save stale_timestamp and reporter

Added stale_timestamp and reporter fields to the add_hosts operation.
If one of the fields is present, the other one must be filled in too.
The stale_timestamp gets updated if

* it’s either not set at all
* or if the reporter is the same and the new timestamp is longer
* or if the reporter is different and the new timestamp is shorter

* Unit test stale timestamp update

Added unit tests for the Host._update_stale_timestamp method. It
contains some rather complex logic, which is now covered by tests.

* Fix unit test

Fixed failing host deserialization unit tests. The tests didn’t consider
new Host constructor field. Added a new test for their default values.

* Add basic culling fields to response

Added the stale_timestamp and reporter fields to the serialized Host –
to the create and get host output.

* Test new fields in response

Added test checking that the new stale_timestamp and reporter fields are
present in all responses: create, update, get all and get by ID.

* Add computed timestamps to responses.

Add stale_warning_timestamp and culled_timestamp to the API responses.
These are computed from the stale_warning timestamp. Added explicit
tests for the timestamps.

* added stale_timestamp input, and stale_warning_timestamp, culled_timestamp output to MQ

* Support filtering by staleness

Added a new staleness parameter to get_host_list and get_host_by_id. It
supports fresh, stale, stale_warning, culled and unknown states.

* add reporter label to ingress metrics

so that we can easily categorize the success/failure/error metrics by reporter

* Added some tests for MQ and the new stale functionality

* actually commiting the new test file this time

* fixed failing test

* master merge in

* made comment match function

* Make culling timestamps configurable

Added application configuration values for the stale_warning and culled
timestamps offsets. They are in days and are loaded from the
environment. This required to add the application configuration to the
application config contextual object and to pass it around for the
serialization.

* Test culling timestamps configuration

Added tests for the new configuration values. They verifies that the
stale warning and culled timestamps are computed correctly using the
values in the configuration object.

* Test culling configuration

* Added default values and enviroment variables to the Config class test
  case.
* Added a new test case verifying that our configuration object is
  stored in the Flask App configuration dictionary.

* fixed the failing test for real this time. Promise

* fixed issue where mq test fail when run alone

* Don’t expose the culled state

The culled state is now only internal. It is no longer possible to get
the culled hosts by any means. Added a regression test verifying that
this state is not supported.

* Use OpenAPI default for staleness

Use OpenAPI standard schema definition of default values for the new
staleness fields. Removed the Python constant and default values.

* finished the empty tests (the ones that can be)

* add reference to a jira issue

* Fix merge conflicts

* Decouple config from serialization

Extracted the culling timestamp computation logic to a separate module.
As a result, the serialization code no longer depends on the whole
Config object.

* use generated stale_timestamp in sample payloads
  • Loading branch information
Glutexo authored and jharting committed Nov 22, 2019
1 parent ec17174 commit e5d30e7
Show file tree
Hide file tree
Showing 17 changed files with 1,091 additions and 24 deletions.
34 changes: 31 additions & 3 deletions api/host.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import uuid
from datetime import datetime
from datetime import timezone
from enum import Enum

import flask
Expand All @@ -12,6 +14,7 @@
from api import metrics
from app import db
from app import events
from app import staleness_offset
from app.auth import current_identity
from app.exceptions import InventoryException
from app.exceptions import ValidationException
Expand Down Expand Up @@ -98,7 +101,7 @@ def _add_host(input_host):
"host",
)

return add_host(input_host, update_system_profile=False)
return add_host(input_host, staleness_offset(), update_system_profile=False)


def find_hosts_by_tag(account_number, string_tags, query):
Expand All @@ -112,6 +115,24 @@ def find_hosts_by_tag(account_number, string_tags, query):
return query.filter(Host.tags.contains(tags_to_find))


def find_hosts_by_staleness(states, query):
staleness_offset_ = staleness_offset()
stale_timestamp = staleness_offset_.stale_timestamp(Host.stale_timestamp)
stale_warning_timestamp = staleness_offset_.stale_warning_timestamp(Host.stale_timestamp)
culled_timestamp = staleness_offset_.culled_timestamp(Host.stale_timestamp)

null = None
now = datetime.now(timezone.utc)

condition_map = {
"fresh": stale_timestamp > now,
"stale": sqlalchemy.and_(Host.stale_timestamp <= now, stale_warning_timestamp > now),
"stale_warning": sqlalchemy.and_(stale_warning_timestamp <= now, culled_timestamp > now),
"unknown": Host.stale_timestamp == null,
}
return query.filter(sqlalchemy.or_(condition_map[state] for state in states))


@api_operation
@metrics.api_request_time.time()
def get_host_list(
Expand All @@ -124,6 +145,7 @@ def get_host_list(
per_page=100,
order_by=None,
order_how=None,
staleness=None,
):
if fqdn:
query = find_hosts_by_canonical_facts(current_identity.account_number, {"fqdn": fqdn})
Expand All @@ -140,6 +162,9 @@ def get_host_list(
# add tag filtering to the query
query = find_hosts_by_tag(current_identity.account_number, tags, query)

if staleness:
query = find_hosts_by_staleness(staleness, query)

try:
order_by = _params_to_order_by(order_by, order_how)
except ValueError as e:
Expand Down Expand Up @@ -186,7 +211,7 @@ def _params_to_order_by(order_by=None, order_how=None):


def _build_paginated_host_list_response(total, page, per_page, host_list):
json_host_list = [serialize_host(host) for host in host_list]
json_host_list = [serialize_host(host, staleness_offset()) for host in host_list]
json_output = {
"total": total,
"count": len(host_list),
Expand Down Expand Up @@ -274,9 +299,12 @@ def delete_by_id(host_id_list):

@api_operation
@metrics.api_request_time.time()
def get_host_by_id(host_id_list, page=1, per_page=100, order_by=None, order_how=None):
def get_host_by_id(host_id_list, page=1, per_page=100, order_by=None, order_how=None, staleness=None):
query = _get_host_list_by_id_list(current_identity.account_number, host_id_list)

if staleness:
query = find_hosts_by_staleness(staleness, query)

try:
order_by = _params_to_order_by(order_by, order_how)
except ValueError as e:
Expand Down
8 changes: 8 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import connexion
import yaml
from connexion.resolver import RestyResolver
from flask import current_app
from flask import jsonify
from flask import request
from prometheus_flask_exporter import PrometheusMetrics

from api.mgmt import monitoring_blueprint
from app import payload_tracker
from app.config import Config
from app.culling import StalenessOffset
from app.exceptions import InventoryException
from app.logging import configure_logging
from app.logging import get_logger
Expand All @@ -28,6 +30,10 @@ def render_exception(exception):
return response


def staleness_offset():
return StalenessOffset.from_config(current_app.config["INVENTORY_CONFIG"])


def create_app(config_name, start_tasks=False, start_payload_tracker=False):
connexion_options = {"swagger_ui": True}

Expand Down Expand Up @@ -68,6 +74,8 @@ def create_app(config_name, start_tasks=False, start_payload_tracker=False):
flask_app.config["SQLALCHEMY_POOL_SIZE"] = app_config.db_pool_size
flask_app.config["SQLALCHEMY_POOL_TIMEOUT"] = app_config.db_pool_timeout

flask_app.config["INVENTORY_CONFIG"] = app_config

db.init_app(flask_app)

flask_app.register_blueprint(monitoring_blueprint, url_prefix=app_config.mgmt_url_path_prefix)
Expand Down
3 changes: 3 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(self):
payload_tracker_enabled = os.environ.get("PAYLOAD_TRACKER_ENABLED", "true")
self.payload_tracker_enabled = payload_tracker_enabled.lower() == "true"

self.culling_stale_warning_offset_days = int(os.environ.get("CULLING_STALE_WARNING_OFFSET_DAYS", "7"))
self.culling_culled_offset_days = int(os.environ.get("CULLING_CULLED_OFFSET_DAYS", "14"))

def _build_base_url_path(self):
app_name = os.getenv("APP_NAME", "inventory")
path_prefix = os.getenv("PATH_PREFIX", "api")
Expand Down
24 changes: 24 additions & 0 deletions app/culling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from collections import namedtuple
from datetime import timedelta


__all__ = ("StalenessOffset",)


class StalenessOffset(namedtuple("StalenessOffset", ("stale_warning_offset_days", "culled_offset_days"))):
@classmethod
def from_config(cls, config):
return cls(config.culling_stale_warning_offset_days, config.culling_culled_offset_days)

@staticmethod
def _add_days(timestamp, days):
return timestamp + timedelta(days=days)

def stale_timestamp(self, stale_timestamp):
return self._add_days(stale_timestamp, 0)

def stale_warning_timestamp(self, stale_timestamp):
return self._add_days(stale_timestamp, self.stale_warning_offset_days)

def culled_timestamp(self, stale_timestamp):
return self._add_days(stale_timestamp, self.culled_offset_days)
26 changes: 26 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,20 @@ def __init__(
facts=None,
tags=None,
system_profile_facts=None,
stale_timestamp=None,
reporter=None,
):

if not canonical_facts:
raise InventoryException(
title="Invalid request", detail="At least one of the canonical fact fields must be present."
)

if (not stale_timestamp and reporter) or (stale_timestamp and not reporter):
raise InventoryException(
title="Invalid request", detail="Both stale_timestamp and reporter fields must be present."
)

self.canonical_facts = canonical_facts

if display_name:
Expand All @@ -105,6 +112,8 @@ def __init__(
self.facts = facts
self.tags = tags
self.system_profile_facts = system_profile_facts or {}
self.stale_timestamp = stale_timestamp
self.reporter = reporter

def save(self):
db.session.add(self)
Expand All @@ -123,6 +132,8 @@ def update(self, input_host, update_system_profile=False):
if update_system_profile:
self._update_system_profile(input_host.system_profile_facts)

self._update_stale_timestamp(input_host.stale_timestamp, input_host.reporter)

def patch(self, patch_data):
logger.debug("patching host (id=%s) with data: %s", self.id, patch_data)

Expand Down Expand Up @@ -169,6 +180,19 @@ def update_facts(self, facts_dict):
for input_namespace, input_facts in facts_dict.items():
self.replace_facts_in_namespace(input_namespace, input_facts)

def _update_stale_timestamp(self, stale_timestamp, reporter):
if (
stale_timestamp
and reporter
and (
(not self.reporter and not self.stale_timestamp)
or (reporter == self.reporter and stale_timestamp >= self.stale_timestamp)
or (reporter != self.reporter and stale_timestamp <= self.stale_timestamp)
)
):
self.stale_timestamp = stale_timestamp
self.reporter = reporter

def replace_facts_in_namespace(self, namespace, facts_dict):
self.facts[namespace] = facts_dict
orm.attributes.flag_modified(self, "facts")
Expand Down Expand Up @@ -304,6 +328,8 @@ class HostSchema(Schema):
facts = fields.List(fields.Nested(FactsSchema))
tags = fields.List(fields.Nested(TagsSchema))
system_profile = fields.Nested(SystemProfileSchema)
stale_timestamp = fields.DateTime(timezone=True)
reporter = fields.Str(validate=validate.Length(min=1, max=255))

@validates("ip_addresses")
def validate_ip_addresses(self, ip_address_list):
Expand Down
4 changes: 4 additions & 0 deletions app/queue/egress.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class HostSchema(Schema):
# FIXME:
created = fields.Str()
updated = fields.Str()
stale_timestamp = fields.Str()
stale_warning_timestamp = fields.Str()
culled_timestamp = fields.Str()
reporter = fields.Str()


class HostEvent(Schema):
Expand Down
12 changes: 8 additions & 4 deletions app/queue/ingress.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from marshmallow import Schema
from marshmallow import ValidationError

from app import staleness_offset
from app.exceptions import InventoryException
from app.logging import get_logger
from app.logging import threadctx
Expand Down Expand Up @@ -49,6 +50,7 @@ def parse_operation_message(message):
metrics.ingress_message_parsing_failure.inc()
raise

logger.info("parsed_message: %s", parsed_operation)
return parsed_operation


Expand All @@ -62,18 +64,20 @@ def add_host(host_data):
try:
logger.info("Attempting to add host...")
input_host = deserialize_host(host_data)
(output_host, add_results) = host_repository.add_host(input_host)
metrics.add_host_success.labels(add_results.name).inc() # created vs updated
(output_host, add_results) = host_repository.add_host(input_host, staleness_offset())
metrics.add_host_success.labels(
add_results.name, host_data.get("reporter", "null")
).inc() # created vs updated
logger.info("Host added") # This definitely needs to be more specific (added vs updated?)
payload_tracker_processing_ctx.inventory_id = output_host["id"]
return (output_host, add_results)
except InventoryException:
logger.exception("Error adding host ", extra={"host": host_data})
metrics.add_host_failure.labels("InventoryException").inc()
metrics.add_host_failure.labels("InventoryException", host_data.get("reporter", "null")).inc()
raise
except Exception:
logger.exception("Error while adding host", extra={"host": host_data})
metrics.add_host_failure.labels("Exception").inc()
metrics.add_host_failure.labels("Exception", host_data.get("reporter", "null")).inc()
raise


Expand Down
6 changes: 4 additions & 2 deletions app/queue/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
"inventory_ingress_message_parsing_failures", "Total amount of failures parsing ingress messages"
)
add_host_success = Counter(
"inventory_ingress_add_host_successes", "Total amount of successfully added hosts", ["result"]
"inventory_ingress_add_host_successes", "Total amount of successfully added hosts", ["result", "reporter"]
)
add_host_failure = Counter(
"inventory_ingress_add_host_failures", "Total amount of failures adding hosts", ["cause", "reporter"]
)
add_host_failure = Counter("inventory_ingress_add_host_failures", "Total amount of failures adding hosts", ["cause"])
ingress_message_handler_success = Counter(
"inventory_ingress_message_handler_successes",
"Total amount of successfully handled messages from the ingress queue",
Expand Down
17 changes: 16 additions & 1 deletion app/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,32 @@ def deserialize_host(raw_data):
facts,
tags,
validated_data.get("system_profile", {}),
validated_data.get("stale_timestamp"),
validated_data.get("reporter"),
)


def serialize_host(host):
def serialize_host(host, staleness_offset):
if host.stale_timestamp:
stale_timestamp = staleness_offset.stale_timestamp(host.stale_timestamp)
stale_warning_timestamp = staleness_offset.stale_warning_timestamp(host.stale_timestamp)
culled_timestamp = staleness_offset.culled_timestamp(host.stale_timestamp)
else:
stale_timestamp = None
stale_warning_timestamp = None
culled_timestamp = None

return {
**serialize_canonical_facts(host.canonical_facts),
"id": _serialize_uuid(host.id),
"account": host.account,
"display_name": host.display_name,
"ansible_host": host.ansible_host,
"facts": _serialize_facts(host.facts),
"reporter": host.reporter,
"stale_timestamp": stale_timestamp and _serialize_datetime(stale_timestamp),
"stale_warning_timestamp": stale_timestamp and _serialize_datetime(stale_warning_timestamp),
"culled_timestamp": stale_timestamp and _serialize_datetime(culled_timestamp),
# without astimezone(timezone.utc) the isoformat() method does not include timezone offset even though iso-8601
# requires it
"created": _serialize_datetime(host.created_on),
Expand Down
16 changes: 16 additions & 0 deletions app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ def ansible_host(self):
def ansible_host(self, ansible_host):
self.__data["ansible_host"] = ansible_host

@property
def stale_timestamp(self):
return self.__data.get("stale_timestamp", None)

@stale_timestamp.setter
def stale_timestamp(self, stale_timestamp):
self.__data["stale_timestamp"] = stale_timestamp

@property
def reporter(self):
return self.__data.get("reporter", None)

@reporter.setter
def reporter(self, reporter):
self.__data["reporter"] = reporter

def to_json(self):
return json.dumps(self.__data)

Expand Down
3 changes: 2 additions & 1 deletion host_dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pprint

from app import create_app
from app import staleness_offset
from app.models import Host
from app.serialization import serialize_host

Expand Down Expand Up @@ -41,7 +42,7 @@
elif args.account_number:
query_results = Host.query.filter(Host.account == args.account_number).all()

json_host_list = [serialize_host(host) for host in query_results]
json_host_list = [serialize_host(host, staleness_offset()) for host in query_results]

if args.no_pp:
print(json_host_list)
Expand Down
Loading

0 comments on commit e5d30e7

Please sign in to comment.