From 9748305c7a32e7e9a3bbe357baf8c05689719080 Mon Sep 17 00:00:00 2001 From: Tom van der Woerdt Date: Thu, 3 Sep 2020 17:08:56 +0200 Subject: [PATCH] Export request latencies as histograms This adds support for histogram-style metrics instead of using summaries. It means we can sum on a cluster level and present the user's experienced latency instead of looking at it on a per-node level. The current version limits the range of histogram buckets between 0.1ms and 60s, to avoid exporting huge amounts of buckets that are likely empty. Further patches could limit this further, for example by going for a 1.44x increment instead of the 1.2x increment, or by specifying the ranges in the configuration. --- .../exporter/CassandraMetricsUtilities.java | 15 +++++ .../exporter/CollectorFunctions.java | 55 +++++++++++++++++++ .../cassandra/exporter/FactoriesSupplier.java | 42 ++++++++------ .../cassandra/exporter/SamplingCounting.java | 2 + 4 files changed, 98 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/com/zegelin/cassandra/exporter/CassandraMetricsUtilities.java b/common/src/main/java/com/zegelin/cassandra/exporter/CassandraMetricsUtilities.java index 2dca131..fe1bd95 100644 --- a/common/src/main/java/com/zegelin/cassandra/exporter/CassandraMetricsUtilities.java +++ b/common/src/main/java/com/zegelin/cassandra/exporter/CassandraMetricsUtilities.java @@ -87,6 +87,11 @@ public Iterable getIntervals() { new Interval(Interval.Quantile.P_99_9, (float) timer.get999thPercentile() * durationFactor) ); } + + @Override + public long[] getValues() { + return timer.values(); + } }; } @@ -108,6 +113,11 @@ public Iterable getIntervals() { new Interval(Interval.Quantile.P_99_9, (float) histogram.get999thPercentile()) ); } + + @Override + public long[] getValues() { + return histogram.values(); + } }; } @@ -125,6 +135,11 @@ public Iterable getIntervals() { return Interval.asIntervals(Interval.Quantile.STANDARD_PERCENTILES, q -> (float) snapshot.getValue(q.value)); } + + @Override + public long[] getValues() { + return metric.getSnapshot().getValues(); + } }; } diff --git a/common/src/main/java/com/zegelin/cassandra/exporter/CollectorFunctions.java b/common/src/main/java/com/zegelin/cassandra/exporter/CollectorFunctions.java index 6455bee..4298153 100644 --- a/common/src/main/java/com/zegelin/cassandra/exporter/CollectorFunctions.java +++ b/common/src/main/java/com/zegelin/cassandra/exporter/CollectorFunctions.java @@ -10,6 +10,7 @@ import org.apache.cassandra.metrics.CassandraMetricsRegistry.JmxMeterMBean; import org.apache.cassandra.utils.EstimatedHistogram; +import java.util.ArrayList; import java.util.stream.Stream; public final class CollectorFunctions { @@ -176,4 +177,58 @@ protected static CollectorFunction samplingAndCountingAsSummar public static CollectorFunction samplingAndCountingAsSummary() { return samplingAndCountingAsSummary(FloatFloatFunction.identity()); } + + /** + * Collect a {@link SamplingCounting} as a Prometheus histogram. + */ + protected static CollectorFunction samplingAndCountingAsHistogram(final FloatFloatFunction bucketScaleFunction) { + // Set some limits on the range so we don't export all 170 buckets + float bucketMin = 0.0001f; // 0.1ms + float bucketMax = 60.0f; // 60sec + + // Avoid recomputing the buckets frequently. Cassandra uses ~170 buckets + float[] cachedBuckets = newBucketOffsets(200, bucketScaleFunction); + + return group -> { + final Stream histogramStream = group.labeledObjects().entrySet().stream() + .map(e -> { + long[] values = e.getValue().getValues(); + float[] buckets = values.length <= cachedBuckets.length + ? cachedBuckets + : newBucketOffsets(values.length, bucketScaleFunction); + + float sum = 0; + long count = 0; + ArrayList intervals = new ArrayList<>(); + assert values[values.length-1] == 0; + + for (int i = 0; i < values.length; i++) { + if (values[i] != 0) { + sum += buckets[i] * values[i]; + count += values[i]; + } + if (buckets[i] >= bucketMin && buckets[i] <= bucketMax) { + intervals.add(new Interval(new Interval.Quantile(buckets[i]), count)); + } + } + + return new HistogramMetricFamily.Histogram(e.getKey(), sum, count, intervals); + }); + + return Stream.of(new HistogramMetricFamily(group.name(), group.help(), histogramStream)); + }; + } + + public static CollectorFunction samplingAndCountingAsHistogram() { + return samplingAndCountingAsHistogram(FloatFloatFunction.identity()); + } + + private static float[] newBucketOffsets(int size, final FloatFloatFunction bucketScaleFunction) { + long[] rawOffsets = EstimatedHistogram.newOffsets(size, false); + float[] adjustedOffsets = new float[size]; + for (int i = 0; i < size; i++) { + adjustedOffsets[i] = bucketScaleFunction.apply(rawOffsets[i]); + } + return adjustedOffsets; + } } diff --git a/common/src/main/java/com/zegelin/cassandra/exporter/FactoriesSupplier.java b/common/src/main/java/com/zegelin/cassandra/exporter/FactoriesSupplier.java index 2d88503..1aeee22 100644 --- a/common/src/main/java/com/zegelin/cassandra/exporter/FactoriesSupplier.java +++ b/common/src/main/java/com/zegelin/cassandra/exporter/FactoriesSupplier.java @@ -6,7 +6,6 @@ import com.zegelin.cassandra.exporter.cli.HarvesterOptions; import com.zegelin.cassandra.exporter.collector.CachingCollector; import com.zegelin.cassandra.exporter.collector.FailureDetectorMBeanMetricFamilyCollector; -import com.zegelin.cassandra.exporter.collector.LatencyMetricGroupSummaryCollector; import com.zegelin.cassandra.exporter.collector.StorageServiceMBeanMetricFamilyCollector; import com.zegelin.cassandra.exporter.collector.dynamic.FunctionalMetricFamilyCollector; import com.zegelin.cassandra.exporter.collector.jvm.*; @@ -502,6 +501,15 @@ private static FactoryBuilder.CollectorConstructor timerAsSummaryCollectorConstr }; } + private static FactoryBuilder.CollectorConstructor timerAsHistogramCollectorConstructor() { + return (name, help, labels, mBean) -> { + final NamedObject samplingCountingNamedObject = CassandraMetricsUtilities.jmxTimerMBeanAsSamplingCounting(mBean); + + return new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, samplingCountingNamedObject), + samplingAndCountingAsHistogram(MetricValueConversionFunctions::nanosecondsToSeconds)); + }; + } + private static FactoryBuilder.CollectorConstructor histogramAsSummaryCollectorConstructor() { return (name, help, labels, mBean) -> { final NamedObject samplingCountingNamedObject = CassandraMetricsUtilities.jmxHistogramAsSamplingCounting(mBean); @@ -510,6 +518,15 @@ private static FactoryBuilder.CollectorConstructor histogramAsSummaryCollectorCo }; } + private static FactoryBuilder.CollectorConstructor histogramAsHistogramCollectorConstructor() { + return (name, help, labels, mBean) -> { + final NamedObject samplingCountingNamedObject = CassandraMetricsUtilities.jmxHistogramAsSamplingCounting(mBean); + + return new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, samplingCountingNamedObject), + samplingAndCountingAsHistogram(MetricValueConversionFunctions::nanosecondsToSeconds)); + }; + } + private static FactoryBuilder.CollectorConstructor functionalCollectorConstructor(final FunctionalMetricFamilyCollector.CollectorFunction function) { return (final String name, final String help, final Labels labels, final NamedObject mBean) -> new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, mBean.cast()), function); @@ -590,8 +607,7 @@ public List get() { builder.add(clientRequestMetricFactory(functionalCollectorConstructor(meterAsCounter()), "Unavailables", "unavailable_exceptions_total", "Total number of UnavailableExceptions thrown (since server start).")); builder.add(clientRequestMetricFactory(functionalCollectorConstructor(meterAsCounter()), "Failures", "failures_total", "Total number of failed requests (since server start).")); - builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "Latency", "latency_seconds", "Request latency.")); - builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "TotalLatency", "latency_seconds", "Total request duration.")); + builder.add(clientRequestMetricFactory(timerAsHistogramCollectorConstructor(), "Latency", "latency_seconds", "Request latency.")); } @@ -706,15 +722,12 @@ public List get() { builder.addAll(tableMetricFactory(functionalCollectorConstructor(histogramGaugeAsSummary()), "EstimatedColumnCountHistogram", "estimated_columns", null)); builder.addAll(tableMetricFactory(histogramAsSummaryCollectorConstructor(), "SSTablesPerReadHistogram", "sstables_per_read", null)); -// - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "ReadLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "read"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "ReadTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "read"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "RangeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "range_read"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "RangeTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "range_read"))); + builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "ReadLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "read"))); + + builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "RangeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "range_read"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "WriteLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "write"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "WriteTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "write"))); + builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "WriteLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "write"))); builder.addAll(tableMetricFactory(TABLE_SCOPE, functionalCollectorConstructor(counterAsGauge()), "PendingFlushes", "pending_flushes", null)); builder.addAll(tableMetricFactory(KEYSPACE_NODE_SCOPE, functionalCollectorConstructor(numericGaugeAsGauge()), "PendingFlushes", "pending_flushes", null)); @@ -766,14 +779,11 @@ public List get() { builder.addAll(tableMetricFactory(TABLE_SCOPE, functionalCollectorConstructor(counterAsGauge()), "RowCacheMiss", "row_cache_misses", null, ImmutableMap.of("miss_type", "miss"))); builder.addAll(tableMetricFactory(KEYSPACE_NODE_SCOPE, functionalCollectorConstructor(numericGaugeAsGauge()), "RowCacheMiss", "row_cache_misses", null, ImmutableMap.of("miss_type", "miss"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasPrepareLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_prepare"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasPrepareTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_prepare"))); + builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "CasPrepareLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_prepare"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasProposeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_propose"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasProposeTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_propose"))); + builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "CasProposeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_propose"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasCommitLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_commit"))); - builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasCommitTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_commit"))); + builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "CasCommitLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_commit"))); builder.addAll(tableMetricFactory(functionalCollectorConstructor(numericGaugeAsGauge(MetricValueConversionFunctions::percentToRatio)), "PercentRepaired", "repaired_ratio", null)); diff --git a/common/src/main/java/com/zegelin/cassandra/exporter/SamplingCounting.java b/common/src/main/java/com/zegelin/cassandra/exporter/SamplingCounting.java index 89c886d..cee55e0 100644 --- a/common/src/main/java/com/zegelin/cassandra/exporter/SamplingCounting.java +++ b/common/src/main/java/com/zegelin/cassandra/exporter/SamplingCounting.java @@ -13,4 +13,6 @@ public interface SamplingCounting { long getCount(); Iterable getIntervals(); + + long[] getValues(); }