Skip to content

Commit

Permalink
MINIFICPP-2488 Group metrics in Prometheus metric families by name
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Nov 7, 2024
1 parent d011670 commit e584f87
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 49 deletions.
4 changes: 2 additions & 2 deletions cmake/Prometheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ set(PC ${Bash_EXECUTABLE} -c "set -x &&\

FetchContent_Declare(
prometheus-cpp
URL "https://github.com/jupp0r/prometheus-cpp/archive/refs/tags/v1.0.1.tar.gz"
URL_HASH "SHA256=593e028d401d3298eada804d252bc38d8cab3ea1c9e88bcd72095281f85e6d16"
URL "https://github.com/jupp0r/prometheus-cpp/archive/refs/tags/v1.3.0.tar.gz"
URL_HASH "SHA256=ac6e958405a29fbbea9db70b00fa3c420e16ad32e1baf941ab233ba031dd72ee"
PATCH_COMMAND "${PC}"
)

Expand Down
3 changes: 3 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
return self.prometheus_checker.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)

def verify_all_metric_types_are_defined_once(self):
return self.prometheus_checker.verify_all_metric_types_are_defined_once()

def check_minifi_log_matches_regex(self, regex, timeout_seconds=60, count=1):
for container_name in self.container_store.get_container_names("minifi-cpp"):
line_found = self.wait_for_app_logs_regex(container_name, regex, timeout_seconds, count)
Expand Down
16 changes: 16 additions & 0 deletions docker/test/integration/cluster/checkers/PrometheusChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from prometheus_api_client import PrometheusConnect
from utils import wait_for

Expand Down Expand Up @@ -113,3 +114,18 @@ def verify_metric_larger_than_zero(self, metric_name, metric_class, labels={}):

def verify_metrics_larger_than_zero(self, metric_names, metric_class, labels={}):
return all((self.verify_metric_larger_than_zero(metric_name, metric_class, labels) for metric_name in metric_names))

def verify_all_metric_types_are_defined_once(self):
response = requests.get("http://127.0.0.1:9936/metrics")
if response.status_code < 200 or response.status_code >= 300:
return False

metric_types = set()
for line in response.text.split("\n"):
if line.startswith("# TYPE"):
metric_type = line.split(" ")[2]
if metric_type in metric_types:
return False
metric_types.add(metric_type)

return True
5 changes: 5 additions & 0 deletions docker/test/integration/cluster/containers/MinifiContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,16 @@ def deploy(self):
if self.options.use_flow_config_from_url:
self.command = ["/bin/sh", "-c", "rm " + MinifiContainer.MINIFI_ROOT + "/conf/config.yml && ./bin/minifi.sh run"]

ports = {}
if self.options.enable_prometheus or self.options.enable_prometheus_with_ssl:
ports = {'9936/tcp': 9936}

self.client.containers.run(
image,
detach=True,
name=self.name,
network=self.network.name,
entrypoint=self.command,
ports=ports,
volumes=self.vols)
logging.info('Added container \'%s\'', self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ def check_metric_class_on_prometheus(self, metric_class, timeout_seconds):
def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name):
assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) or self.cluster.log_app_output()

def check_all_prometheus_metric_types_are_defined_once(self):
assert self.cluster.verify_all_metric_types_are_defined_once() or self.cluster.log_app_output()

def check_if_peak_memory_usage_exceeded(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> None:
assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds) or self.cluster.log_app_output()

Expand Down
1 change: 1 addition & 0 deletions docker/test/integration/features/prometheus.feature
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Feature: MiNiFi can publish metrics to Prometheus server
And "FlowInformation" is published to the Prometheus server in less than 60 seconds
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds
And all Prometheus metric types are only defined once

Scenario: Published metrics are scraped by Prometheus server through SSL connection
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
Expand Down
5 changes: 5 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,11 @@ def step_impl(context, metric_class, timeout_seconds, processor_name):
context.test.check_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)


@then("all Prometheus metric types are only defined once")
def step_impl(context):
context.test.check_all_prometheus_metric_types_are_defined_once()


@then("Elasticsearch is empty")
def step_impl(context):
context.test.check_empty_elastic(context.test.get_container_name_with_postfix("elasticsearch"))
Expand Down
25 changes: 12 additions & 13 deletions extensions/prometheus/PrometheusMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,22 @@ PrometheusExposerConfig PrometheusMetricsPublisher::readExposerConfig() const {
void PrometheusMetricsPublisher::clearMetricNodes() {
std::lock_guard<std::mutex> lock(registered_metrics_mutex_);
logger_->log_debug("Clearing all metric nodes.");
for (const auto& collection : gauge_collections_) {
exposer_->removeMetric(collection);
if (gauge_collection_) {
exposer_->removeMetric(gauge_collection_);
}
gauge_collections_.clear();
gauge_collection_.reset();
}

void PrometheusMetricsPublisher::loadMetricNodes() {
logger_->log_debug("Loading all metric nodes.");
std::lock_guard<std::mutex> lock(registered_metrics_mutex_);
auto nodes = getMetricNodes();

for (const auto& metric_node : nodes) {
logger_->log_debug("Registering metric node '{}'", metric_node->getName());
gauge_collections_.push_back(std::make_shared<PublishedMetricGaugeCollection>(metric_node, agent_identifier_));
exposer_->registerMetric(gauge_collections_.back());
}
gauge_collection_ = std::make_shared<PublishedMetricGaugeCollection>(getMetricProviders(), agent_identifier_);
exposer_->registerMetric(gauge_collection_);
}

std::vector<state::response::SharedResponseNode> PrometheusMetricsPublisher::getMetricNodes() const {
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> PrometheusMetricsPublisher::getMetricProviders() const {
gsl_Expects(response_node_loader_ && configuration_);
std::vector<state::response::SharedResponseNode> nodes;
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> nodes;
auto metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_metrics);
if (!metric_classes_str || metric_classes_str->empty()) {
metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
Expand All @@ -96,7 +92,10 @@ std::vector<state::response::SharedResponseNode> PrometheusMetricsPublisher::get
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
continue;
}
nodes.insert(nodes.end(), response_nodes.begin(), response_nodes.end());
for (const auto& response_node : response_nodes) {
logger_->log_info("Loading metric node '{}'", response_node->getName());
nodes.push_back(response_node);
}
}
}
return nodes;
Expand Down
4 changes: 2 additions & 2 deletions extensions/prometheus/PrometheusMetricsPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class PrometheusMetricsPublisher : public state::MetricsPublisher {

private:
PrometheusExposerConfig readExposerConfig() const;
std::vector<state::response::SharedResponseNode> getMetricNodes() const;
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> getMetricProviders() const;
void loadAgentIdentifier();

std::mutex registered_metrics_mutex_;
std::vector<std::shared_ptr<PublishedMetricGaugeCollection>> gauge_collections_;
std::shared_ptr<PublishedMetricGaugeCollection> gauge_collection_;
std::unique_ptr<MetricsExposer> exposer_;
std::string agent_identifier_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<PrometheusMetricsPublisher>::getLogger()};
Expand Down
36 changes: 22 additions & 14 deletions extensions/prometheus/PublishedMetricGaugeCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,34 @@

namespace org::apache::nifi::minifi::extensions::prometheus {

PublishedMetricGaugeCollection::PublishedMetricGaugeCollection(std::shared_ptr<state::PublishedMetricProvider> metric, std::string agent_identifier)
: metric_{std::move(metric)},
PublishedMetricGaugeCollection::PublishedMetricGaugeCollection(std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>>&& metric_providers, std::string agent_identifier)
: metric_providers_{std::move(metric_providers)},
agent_identifier_(std::move(agent_identifier)) {
}

std::vector<::prometheus::MetricFamily> PublishedMetricGaugeCollection::Collect() const {
std::vector<::prometheus::MetricFamily> collection;
for (const auto& metric : metric_->calculateMetrics()) {
::prometheus::ClientMetric client_metric;
client_metric.label = ranges::views::transform(metric.labels, [](auto&& kvp) { return ::prometheus::ClientMetric::Label{kvp.first, kvp.second}; })
| ranges::to<std::vector<::prometheus::ClientMetric::Label>>;
client_metric.label.push_back(::prometheus::ClientMetric::Label{"agent_identifier", agent_identifier_});
client_metric.gauge = ::prometheus::ClientMetric::Gauge{metric.value};
collection.push_back({
.name = "minifi_" + metric.name,
.help = "",
.type = ::prometheus::MetricType::Gauge,
.metric = { std::move(client_metric) }
});
for (const auto& metric_provider : metric_providers_) {
for (const auto& metric : metric_provider->calculateMetrics()) {
::prometheus::ClientMetric client_metric;
client_metric.label = ranges::views::transform(metric.labels, [](auto&& kvp) { return ::prometheus::ClientMetric::Label{kvp.first, kvp.second}; })
| ranges::to<std::vector<::prometheus::ClientMetric::Label>>;
client_metric.label.push_back(::prometheus::ClientMetric::Label{"agent_identifier", agent_identifier_});
client_metric.gauge = ::prometheus::ClientMetric::Gauge{metric.value};
auto existing_metric = std::find_if(collection.begin(), collection.end(), [&](const auto& metric_family) { return metric_family.name == "minifi_" + metric.name; });
if (existing_metric != collection.end()) {
existing_metric->metric.push_back(std::move(client_metric));
} else {
collection.push_back({
.name = "minifi_" + metric.name,
.help = "",
.type = ::prometheus::MetricType::Gauge,
.metric = { std::move(client_metric) }
});
}
}
}

return collection;
}

Expand Down
6 changes: 4 additions & 2 deletions extensions/prometheus/PublishedMetricGaugeCollection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@
#include "state/PublishedMetricProvider.h"
#include "prometheus/collectable.h"
#include "prometheus/metric_family.h"
#include "utils/gsl.h"
#include "core/logging/LoggerConfiguration.h"

namespace org::apache::nifi::minifi::extensions::prometheus {

class PublishedMetricGaugeCollection : public ::prometheus::Collectable {
public:
explicit PublishedMetricGaugeCollection(std::shared_ptr<state::PublishedMetricProvider> metric, std::string agent_identifier);
explicit PublishedMetricGaugeCollection(std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>>&& metric_providers, std::string agent_identifier);
std::vector<::prometheus::MetricFamily> Collect() const override;

private:
std::shared_ptr<state::PublishedMetricProvider> metric_;
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> metric_providers_;
std::string agent_identifier_;
};

Expand Down
30 changes: 14 additions & 16 deletions extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ namespace org::apache::nifi::minifi::extensions::prometheus::test {

class DummyMetricsExposer : public MetricsExposer {
public:
void registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) override {
metrics_.push_back(metric);
void registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metrics) override {
metrics_ = metrics;
}

void removeMetric(const std::shared_ptr<PublishedMetricGaugeCollection>&) override {
}

[[nodiscard]] std::vector<std::shared_ptr<PublishedMetricGaugeCollection>> getMetrics() const {
[[nodiscard]] std::shared_ptr<PublishedMetricGaugeCollection> getMetrics() const {
return metrics_;
}

private:
std::vector<std::shared_ptr<PublishedMetricGaugeCollection>> metrics_;
std::shared_ptr<PublishedMetricGaugeCollection> metrics_;
};

class PrometheusPublisherTestFixture {
Expand Down Expand Up @@ -114,18 +114,16 @@ TEST_CASE_METHOD(PrometheusPublisherTestFixtureWithDummyExposer, "Test adding me
publisher_->loadMetricNodes();
auto stored_metrics = exposer_->getMetrics();
std::vector<std::string> valid_metrics_without_flow = {"QueueMetrics", "RepositoryMetrics", "DeviceInfoNode", "FlowInformation", "AgentInformation"};
REQUIRE(stored_metrics.size() == valid_metrics_without_flow.size());
for (const auto& stored_metric : stored_metrics) {
auto collection = stored_metric->Collect();
for (const auto& metric_family : collection) {
for (const auto& prometheus_metric : metric_family.metric) {
auto metric_class_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "metric_class"; });
REQUIRE(metric_class_label_it != ranges::end(prometheus_metric.label));
REQUIRE(ranges::contains(valid_metrics_without_flow, metric_class_label_it->value));
auto agent_identifier_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "agent_identifier"; });
REQUIRE(agent_identifier_label_it != ranges::end(prometheus_metric.label));
REQUIRE(agent_identifier_label_it->value == "AgentId-1");
}
REQUIRE(stored_metrics);
auto collection = stored_metrics->Collect();
for (const auto& metric_family : collection) {
for (const auto& prometheus_metric : metric_family.metric) {
auto metric_class_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "metric_class"; });
REQUIRE(metric_class_label_it != ranges::end(prometheus_metric.label));
REQUIRE(ranges::contains(valid_metrics_without_flow, metric_class_label_it->value));
auto agent_identifier_label_it = ranges::find_if(prometheus_metric.label, [](const auto& label) { return label.name == "agent_identifier"; });
REQUIRE(agent_identifier_label_it != ranges::end(prometheus_metric.label));
REQUIRE(agent_identifier_label_it->value == "AgentId-1");
}
}
}
Expand Down

0 comments on commit e584f87

Please sign in to comment.