diff --git a/cmake/Prometheus.cmake b/cmake/Prometheus.cmake index 10be31eb01..ef3e33231f 100644 --- a/cmake/Prometheus.cmake +++ b/cmake/Prometheus.cmake @@ -28,8 +28,8 @@ set(PC ${Bash_EXECUTABLE} -c "set -x &&\ FetchContent_Declare( prometheus-cpp - URL "https://github.com/jupp0r/prometheus-cpp/archive/refs/tags/v1.0.1.tar.gz" - URL_HASH "SHA256=593e028d401d3298eada804d252bc38d8cab3ea1c9e88bcd72095281f85e6d16" + URL "https://github.com/jupp0r/prometheus-cpp/archive/refs/tags/v1.3.0.tar.gz" + URL_HASH "SHA256=ac6e958405a29fbbea9db70b00fa3c420e16ad32e1baf941ab233ba031dd72ee" PATCH_COMMAND "${PC}" ) diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 153ffce265..92e554fb0a 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -278,6 +278,9 @@ def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds): def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name): return self.prometheus_checker.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) + def verify_all_metric_types_are_defined_once(self): + return self.prometheus_checker.verify_all_metric_types_are_defined_once() + def check_minifi_log_matches_regex(self, regex, timeout_seconds=60, count=1): for container_name in self.container_store.get_container_names("minifi-cpp"): line_found = self.wait_for_app_logs_regex(container_name, regex, timeout_seconds, count) diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 050310717f..8c12bc25fd 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import requests from prometheus_api_client import PrometheusConnect from utils import wait_for @@ -113,3 +114,18 @@ def verify_metric_larger_than_zero(self, metric_name, metric_class, labels={}): def verify_metrics_larger_than_zero(self, metric_names, metric_class, labels={}): return all((self.verify_metric_larger_than_zero(metric_name, metric_class, labels) for metric_name in metric_names)) + + def verify_all_metric_types_are_defined_once(self): + response = requests.get("http://127.0.0.1:9936/metrics") + if response.status_code < 200 or response.status_code >= 300: + return False + + metric_types = set() + for line in response.text.split("\n"): + if line.startswith("# TYPE"): + metric_type = line.split(" ")[2] + if metric_type in metric_types: + return False + metric_types.add(metric_type) + + return True diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index 770f42df02..8d0894446d 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -191,11 +191,16 @@ def deploy(self): if self.options.use_flow_config_from_url: self.command = ["/bin/sh", "-c", "rm " + MinifiContainer.MINIFI_ROOT + "/conf/config.yml && ./bin/minifi.sh run"] + ports = {} + if self.options.enable_prometheus or self.options.enable_prometheus_with_ssl: + ports = {'9936/tcp': 9936} + self.client.containers.run( image, detach=True, name=self.name, network=self.network.name, entrypoint=self.command, + ports=ports, volumes=self.vols) logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 2eef2f22db..16a30c7ee5 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -327,6 +327,9 @@ def check_metric_class_on_prometheus(self, metric_class, timeout_seconds): def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name): assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) or self.cluster.log_app_output() + def check_all_prometheus_metric_types_are_defined_once(self): + assert self.cluster.verify_all_metric_types_are_defined_once() or self.cluster.log_app_output() + def check_if_peak_memory_usage_exceeded(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> None: assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds) or self.cluster.log_app_output() diff --git a/docker/test/integration/features/prometheus.feature b/docker/test/integration/features/prometheus.feature index 0b54ccd26b..f336990b58 100644 --- a/docker/test/integration/features/prometheus.feature +++ b/docker/test/integration/features/prometheus.feature @@ -34,6 +34,7 @@ Feature: MiNiFi can publish metrics to Prometheus server And "FlowInformation" is published to the Prometheus server in less than 60 seconds And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds And "AgentStatus" is published to the Prometheus server in less than 60 seconds + And all Prometheus metric types are only defined once Scenario: Published metrics are scraped by Prometheus server through SSL connection Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input" diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 92dd64548b..b292377328 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1042,6 +1042,11 @@ def step_impl(context, metric_class, timeout_seconds, processor_name): context.test.check_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) +@then("all Prometheus metric types are only defined once") +def step_impl(context): + context.test.check_all_prometheus_metric_types_are_defined_once() + + @then("Elasticsearch is empty") def step_impl(context): context.test.check_empty_elastic(context.test.get_container_name_with_postfix("elasticsearch")) diff --git a/extensions/prometheus/PrometheusMetricsPublisher.cpp b/extensions/prometheus/PrometheusMetricsPublisher.cpp index 05dc9d7aaa..36f2391660 100644 --- a/extensions/prometheus/PrometheusMetricsPublisher.cpp +++ b/extensions/prometheus/PrometheusMetricsPublisher.cpp @@ -64,26 +64,22 @@ PrometheusExposerConfig PrometheusMetricsPublisher::readExposerConfig() const { void PrometheusMetricsPublisher::clearMetricNodes() { std::lock_guard lock(registered_metrics_mutex_); logger_->log_debug("Clearing all metric nodes."); - for (const auto& collection : gauge_collections_) { - exposer_->removeMetric(collection); + if (gauge_collection_) { + exposer_->removeMetric(gauge_collection_); } - gauge_collections_.clear(); + gauge_collection_.reset(); } void PrometheusMetricsPublisher::loadMetricNodes() { + logger_->log_debug("Loading all metric nodes."); std::lock_guard lock(registered_metrics_mutex_); - auto nodes = getMetricNodes(); - - for (const auto& metric_node : nodes) { - logger_->log_debug("Registering metric node '{}'", metric_node->getName()); - gauge_collections_.push_back(std::make_shared(metric_node, agent_identifier_)); - exposer_->registerMetric(gauge_collections_.back()); - } + gauge_collection_ = std::make_shared(getMetricProviders(), agent_identifier_); + exposer_->registerMetric(gauge_collection_); } -std::vector PrometheusMetricsPublisher::getMetricNodes() const { +std::vector>> PrometheusMetricsPublisher::getMetricProviders() const { gsl_Expects(response_node_loader_ && configuration_); - std::vector nodes; + std::vector>> nodes; auto metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_metrics); if (!metric_classes_str || metric_classes_str->empty()) { metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics); @@ -96,7 +92,10 @@ std::vector PrometheusMetricsPublisher::get logger_->log_warn("Metric class '{}' could not be loaded.", clazz); continue; } - nodes.insert(nodes.end(), response_nodes.begin(), response_nodes.end()); + for (const auto& response_node : response_nodes) { + logger_->log_info("Loading metric node '{}'", response_node->getName()); + nodes.push_back(response_node); + } } } return nodes; diff --git a/extensions/prometheus/PrometheusMetricsPublisher.h b/extensions/prometheus/PrometheusMetricsPublisher.h index 3dd5f81082..64a4fb4105 100644 --- a/extensions/prometheus/PrometheusMetricsPublisher.h +++ b/extensions/prometheus/PrometheusMetricsPublisher.h @@ -43,11 +43,11 @@ class PrometheusMetricsPublisher : public state::MetricsPublisher { private: PrometheusExposerConfig readExposerConfig() const; - std::vector getMetricNodes() const; + std::vector>> getMetricProviders() const; void loadAgentIdentifier(); std::mutex registered_metrics_mutex_; - std::vector> gauge_collections_; + std::shared_ptr gauge_collection_; std::unique_ptr exposer_; std::string agent_identifier_; std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; diff --git a/extensions/prometheus/PublishedMetricGaugeCollection.cpp b/extensions/prometheus/PublishedMetricGaugeCollection.cpp index 81726749ba..0469b6dbd0 100644 --- a/extensions/prometheus/PublishedMetricGaugeCollection.cpp +++ b/extensions/prometheus/PublishedMetricGaugeCollection.cpp @@ -26,26 +26,34 @@ namespace org::apache::nifi::minifi::extensions::prometheus { -PublishedMetricGaugeCollection::PublishedMetricGaugeCollection(std::shared_ptr metric, std::string agent_identifier) - : metric_{std::move(metric)}, +PublishedMetricGaugeCollection::PublishedMetricGaugeCollection(std::vector>>&& metric_providers, std::string agent_identifier) + : metric_providers_{std::move(metric_providers)}, agent_identifier_(std::move(agent_identifier)) { } std::vector<::prometheus::MetricFamily> PublishedMetricGaugeCollection::Collect() const { std::vector<::prometheus::MetricFamily> collection; - for (const auto& metric : metric_->calculateMetrics()) { - ::prometheus::ClientMetric client_metric; - client_metric.label = ranges::views::transform(metric.labels, [](auto&& kvp) { return ::prometheus::ClientMetric::Label{kvp.first, kvp.second}; }) - | ranges::to>; - client_metric.label.push_back(::prometheus::ClientMetric::Label{"agent_identifier", agent_identifier_}); - client_metric.gauge = ::prometheus::ClientMetric::Gauge{metric.value}; - collection.push_back({ - .name = "minifi_" + metric.name, - .help = "", - .type = ::prometheus::MetricType::Gauge, - .metric = { std::move(client_metric) } - }); + for (const auto& metric_provider : metric_providers_) { + for (const auto& metric : metric_provider->calculateMetrics()) { + ::prometheus::ClientMetric client_metric; + client_metric.label = ranges::views::transform(metric.labels, [](auto&& kvp) { return ::prometheus::ClientMetric::Label{kvp.first, kvp.second}; }) + | ranges::to>; + client_metric.label.push_back(::prometheus::ClientMetric::Label{"agent_identifier", agent_identifier_}); + client_metric.gauge = ::prometheus::ClientMetric::Gauge{metric.value}; + auto existing_metric = std::find_if(collection.begin(), collection.end(), [&](const auto& metric_family) { return metric_family.name == "minifi_" + metric.name; }); + if (existing_metric != collection.end()) { + existing_metric->metric.push_back(std::move(client_metric)); + } else { + collection.push_back({ + .name = "minifi_" + metric.name, + .help = "", + .type = ::prometheus::MetricType::Gauge, + .metric = { std::move(client_metric) } + }); + } + } } + return collection; } diff --git a/extensions/prometheus/PublishedMetricGaugeCollection.h b/extensions/prometheus/PublishedMetricGaugeCollection.h index 0858e499fc..c86f53e571 100644 --- a/extensions/prometheus/PublishedMetricGaugeCollection.h +++ b/extensions/prometheus/PublishedMetricGaugeCollection.h @@ -23,16 +23,18 @@ #include "state/PublishedMetricProvider.h" #include "prometheus/collectable.h" #include "prometheus/metric_family.h" +#include "utils/gsl.h" +#include "core/logging/LoggerConfiguration.h" namespace org::apache::nifi::minifi::extensions::prometheus { class PublishedMetricGaugeCollection : public ::prometheus::Collectable { public: - explicit PublishedMetricGaugeCollection(std::shared_ptr metric, std::string agent_identifier); + explicit PublishedMetricGaugeCollection(std::vector>>&& metric_providers, std::string agent_identifier); std::vector<::prometheus::MetricFamily> Collect() const override; private: - std::shared_ptr metric_; + std::vector>> metric_providers_; std::string agent_identifier_; }; diff --git a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp index 377befea0e..878ef888a0 100644 --- a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp +++ b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp @@ -31,19 +31,19 @@ namespace org::apache::nifi::minifi::extensions::prometheus::test { class DummyMetricsExposer : public MetricsExposer { public: - void registerMetric(const std::shared_ptr& metric) override { - metrics_.push_back(metric); + void registerMetric(const std::shared_ptr& metrics) override { + metrics_ = metrics; } void removeMetric(const std::shared_ptr&) override { } - [[nodiscard]] std::vector> getMetrics() const { + [[nodiscard]] std::shared_ptr getMetrics() const { return metrics_; } private: - std::vector> metrics_; + std::shared_ptr metrics_; }; class PrometheusPublisherTestFixture { @@ -114,18 +114,16 @@ TEST_CASE_METHOD(PrometheusPublisherTestFixtureWithDummyExposer, "Test adding me publisher_->loadMetricNodes(); auto stored_metrics = exposer_->getMetrics(); std::vector valid_metrics_without_flow = {"QueueMetrics", "RepositoryMetrics", "DeviceInfoNode", "FlowInformation", "AgentInformation"}; - REQUIRE(stored_metrics.size() == valid_metrics_without_flow.size()); - for (const auto& stored_metric : stored_metrics) { - auto collection = stored_metric->Collect(); - for (const auto& metric_family : collection) { - for (const auto& prometheus_metric : metric_family.metric) { - auto metric_class_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "metric_class"; }); - REQUIRE(metric_class_label_it != ranges::end(prometheus_metric.label)); - REQUIRE(ranges::contains(valid_metrics_without_flow, metric_class_label_it->value)); - auto agent_identifier_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "agent_identifier"; }); - REQUIRE(agent_identifier_label_it != ranges::end(prometheus_metric.label)); - REQUIRE(agent_identifier_label_it->value == "AgentId-1"); - } + REQUIRE(stored_metrics); + auto collection = stored_metrics->Collect(); + for (const auto& metric_family : collection) { + for (const auto& prometheus_metric : metric_family.metric) { + auto metric_class_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "metric_class"; }); + REQUIRE(metric_class_label_it != ranges::end(prometheus_metric.label)); + REQUIRE(ranges::contains(valid_metrics_without_flow, metric_class_label_it->value)); + auto agent_identifier_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "agent_identifier"; }); + REQUIRE(agent_identifier_label_it != ranges::end(prometheus_metric.label)); + REQUIRE(agent_identifier_label_it->value == "AgentId-1"); } } }