Skip to content

Commit

Permalink
Export request latencies as histograms
Browse files Browse the repository at this point in the history
This adds support for histogram-style metrics instead of using
summaries. It means we can sum on a cluster level and present the user's
experienced latency instead of looking at it on a per-node level.

The current version limits the range of histogram buckets between 0.1ms
and 60s, to avoid exporting huge amounts of buckets that are likely
empty. Further patches could limit this further, for example by going
for a 1.44x increment instead of the 1.2x increment, or by specifying
the ranges in the configuration.
  • Loading branch information
TvdW committed Sep 10, 2020
1 parent 2767af9 commit 9748305
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public Iterable<Interval> getIntervals() {
new Interval(Interval.Quantile.P_99_9, (float) timer.get999thPercentile() * durationFactor)
);
}

@Override
public long[] getValues() {
return timer.values();
}
};
}

Expand All @@ -108,6 +113,11 @@ public Iterable<Interval> getIntervals() {
new Interval(Interval.Quantile.P_99_9, (float) histogram.get999thPercentile())
);
}

@Override
public long[] getValues() {
return histogram.values();
}
};
}

Expand All @@ -125,6 +135,11 @@ public Iterable<Interval> getIntervals() {

return Interval.asIntervals(Interval.Quantile.STANDARD_PERCENTILES, q -> (float) snapshot.getValue(q.value));
}

@Override
public long[] getValues() {
return metric.getSnapshot().getValues();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.cassandra.metrics.CassandraMetricsRegistry.JmxMeterMBean;
import org.apache.cassandra.utils.EstimatedHistogram;

import java.util.ArrayList;
import java.util.stream.Stream;

public final class CollectorFunctions {
Expand Down Expand Up @@ -176,4 +177,58 @@ protected static CollectorFunction<SamplingCounting> samplingAndCountingAsSummar
public static CollectorFunction<SamplingCounting> samplingAndCountingAsSummary() {
return samplingAndCountingAsSummary(FloatFloatFunction.identity());
}

/**
* Collect a {@link SamplingCounting} as a Prometheus histogram.
*/
protected static CollectorFunction<SamplingCounting> samplingAndCountingAsHistogram(final FloatFloatFunction bucketScaleFunction) {
// Set some limits on the range so we don't export all 170 buckets
float bucketMin = 0.0001f; // 0.1ms
float bucketMax = 60.0f; // 60sec

// Avoid recomputing the buckets frequently. Cassandra uses ~170 buckets
float[] cachedBuckets = newBucketOffsets(200, bucketScaleFunction);

return group -> {
final Stream<HistogramMetricFamily.Histogram> histogramStream = group.labeledObjects().entrySet().stream()
.map(e -> {
long[] values = e.getValue().getValues();
float[] buckets = values.length <= cachedBuckets.length
? cachedBuckets
: newBucketOffsets(values.length, bucketScaleFunction);

float sum = 0;
long count = 0;
ArrayList<Interval> intervals = new ArrayList<>();
assert values[values.length-1] == 0;

for (int i = 0; i < values.length; i++) {
if (values[i] != 0) {
sum += buckets[i] * values[i];
count += values[i];
}
if (buckets[i] >= bucketMin && buckets[i] <= bucketMax) {
intervals.add(new Interval(new Interval.Quantile(buckets[i]), count));
}
}

return new HistogramMetricFamily.Histogram(e.getKey(), sum, count, intervals);
});

return Stream.of(new HistogramMetricFamily(group.name(), group.help(), histogramStream));
};
}

public static CollectorFunction<SamplingCounting> samplingAndCountingAsHistogram() {
return samplingAndCountingAsHistogram(FloatFloatFunction.identity());
}

private static float[] newBucketOffsets(int size, final FloatFloatFunction bucketScaleFunction) {
long[] rawOffsets = EstimatedHistogram.newOffsets(size, false);
float[] adjustedOffsets = new float[size];
for (int i = 0; i < size; i++) {
adjustedOffsets[i] = bucketScaleFunction.apply(rawOffsets[i]);
}
return adjustedOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.zegelin.cassandra.exporter.cli.HarvesterOptions;
import com.zegelin.cassandra.exporter.collector.CachingCollector;
import com.zegelin.cassandra.exporter.collector.FailureDetectorMBeanMetricFamilyCollector;
import com.zegelin.cassandra.exporter.collector.LatencyMetricGroupSummaryCollector;
import com.zegelin.cassandra.exporter.collector.StorageServiceMBeanMetricFamilyCollector;
import com.zegelin.cassandra.exporter.collector.dynamic.FunctionalMetricFamilyCollector;
import com.zegelin.cassandra.exporter.collector.jvm.*;
Expand Down Expand Up @@ -502,6 +501,15 @@ private static FactoryBuilder.CollectorConstructor timerAsSummaryCollectorConstr
};
}

private static FactoryBuilder.CollectorConstructor timerAsHistogramCollectorConstructor() {
return (name, help, labels, mBean) -> {
final NamedObject<SamplingCounting> samplingCountingNamedObject = CassandraMetricsUtilities.jmxTimerMBeanAsSamplingCounting(mBean);

return new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, samplingCountingNamedObject),
samplingAndCountingAsHistogram(MetricValueConversionFunctions::nanosecondsToSeconds));
};
}

private static FactoryBuilder.CollectorConstructor histogramAsSummaryCollectorConstructor() {
return (name, help, labels, mBean) -> {
final NamedObject<SamplingCounting> samplingCountingNamedObject = CassandraMetricsUtilities.jmxHistogramAsSamplingCounting(mBean);
Expand All @@ -510,6 +518,15 @@ private static FactoryBuilder.CollectorConstructor histogramAsSummaryCollectorCo
};
}

private static FactoryBuilder.CollectorConstructor histogramAsHistogramCollectorConstructor() {
return (name, help, labels, mBean) -> {
final NamedObject<SamplingCounting> samplingCountingNamedObject = CassandraMetricsUtilities.jmxHistogramAsSamplingCounting(mBean);

return new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, samplingCountingNamedObject),
samplingAndCountingAsHistogram(MetricValueConversionFunctions::nanosecondsToSeconds));
};
}

private static <T> FactoryBuilder.CollectorConstructor functionalCollectorConstructor(final FunctionalMetricFamilyCollector.CollectorFunction<T> function) {
return (final String name, final String help, final Labels labels, final NamedObject<?> mBean) ->
new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, mBean.<T>cast()), function);
Expand Down Expand Up @@ -590,8 +607,7 @@ public List<Factory> get() {
builder.add(clientRequestMetricFactory(functionalCollectorConstructor(meterAsCounter()), "Unavailables", "unavailable_exceptions_total", "Total number of UnavailableExceptions thrown (since server start)."));
builder.add(clientRequestMetricFactory(functionalCollectorConstructor(meterAsCounter()), "Failures", "failures_total", "Total number of failed requests (since server start)."));

builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "Latency", "latency_seconds", "Request latency."));
builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "TotalLatency", "latency_seconds", "Total request duration."));
builder.add(clientRequestMetricFactory(timerAsHistogramCollectorConstructor(), "Latency", "latency_seconds", "Request latency."));
}


Expand Down Expand Up @@ -706,15 +722,12 @@ public List<Factory> get() {
builder.addAll(tableMetricFactory(functionalCollectorConstructor(histogramGaugeAsSummary()), "EstimatedColumnCountHistogram", "estimated_columns", null));

builder.addAll(tableMetricFactory(histogramAsSummaryCollectorConstructor(), "SSTablesPerReadHistogram", "sstables_per_read", null));
//
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "ReadLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "read")));
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "ReadTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "read")));

builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "RangeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "range_read")));
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "RangeTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "range_read")));
builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "ReadLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "read")));

builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "RangeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "range_read")));

builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "WriteLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "write")));
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "WriteTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "write")));
builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "WriteLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "write")));

builder.addAll(tableMetricFactory(TABLE_SCOPE, functionalCollectorConstructor(counterAsGauge()), "PendingFlushes", "pending_flushes", null));
builder.addAll(tableMetricFactory(KEYSPACE_NODE_SCOPE, functionalCollectorConstructor(numericGaugeAsGauge()), "PendingFlushes", "pending_flushes", null));
Expand Down Expand Up @@ -766,14 +779,11 @@ public List<Factory> get() {
builder.addAll(tableMetricFactory(TABLE_SCOPE, functionalCollectorConstructor(counterAsGauge()), "RowCacheMiss", "row_cache_misses", null, ImmutableMap.of("miss_type", "miss")));
builder.addAll(tableMetricFactory(KEYSPACE_NODE_SCOPE, functionalCollectorConstructor(numericGaugeAsGauge()), "RowCacheMiss", "row_cache_misses", null, ImmutableMap.of("miss_type", "miss")));

builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasPrepareLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_prepare")));
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasPrepareTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_prepare")));
builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "CasPrepareLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_prepare")));

builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasProposeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_propose")));
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasProposeTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_propose")));
builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "CasProposeLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_propose")));

builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasCommitLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_commit")));
builder.addAll(tableMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "CasCommitTotalLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_commit")));
builder.addAll(tableMetricFactory(timerAsHistogramCollectorConstructor(), "CasCommitLatency", "operation_latency_seconds", null, ImmutableMap.of("operation", "cas_commit")));

builder.addAll(tableMetricFactory(functionalCollectorConstructor(numericGaugeAsGauge(MetricValueConversionFunctions::percentToRatio)), "PercentRepaired", "repaired_ratio", null));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public interface SamplingCounting {
long getCount();

Iterable<Interval> getIntervals();

long[] getValues();
}

0 comments on commit 9748305

Please sign in to comment.