Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for concurrent modification exception on removing table #78

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<name>Cassandra Exporter Common</name>

<properties>
<version.picocli>3.6.1</version.picocli>
<version.picocli>3.9.5</version.picocli>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private static class FactoryBuilder {
interface Modifier {
/**
* @param keyPropertyList Map of MBean ObjectName key properties and their values.
* @param labels The current map of labels to be provided to the collector constructor.
* @param labels The current map of labels to be provided to the collector constructor.
* @return true to continue building the collector, false to abort.
*/
boolean modify(final Map<String, String> keyPropertyList, final Map<String, String> labels);
Expand Down Expand Up @@ -72,7 +72,7 @@ FactoryBuilder withLabelMaker(final LabelMaker labelMaker) {
return this.withModifier((keyPropertyList, labels) -> {
labels.putAll(labelMaker.apply(keyPropertyList));
return true;
});
});
}

FactoryBuilder withHelp(final String help) {
Expand Down Expand Up @@ -116,6 +116,7 @@ public interface CollectorConstructor {
private final Set<TableLabels> tableLabels;
private final Set<String> excludedKeyspaces;
private final Map<TableMetricScope, TableMetricScope.Filter> tableMetricScopeFilters;
private final Set<Harvester.Exclusion> exclusions;


public FactoriesSupplier(final MetadataFactory metadataFactory, final HarvesterOptions options) {
Expand All @@ -129,6 +130,7 @@ public FactoriesSupplier(final MetadataFactory metadataFactory, final HarvesterO
.put(TableMetricScope.KEYSPACE, options.keyspaceMetricsFilter)
.put(TableMetricScope.TABLE, options.tableMetricsFilter)
.build();
this.exclusions = options.exclusions;
}


Expand Down Expand Up @@ -516,14 +518,12 @@ private static <T> FactoryBuilder.CollectorConstructor functionalCollectorConstr
}




private Factory cache(final Factory delegate, final long duration, final TimeUnit unit) {
return CachingCollector.cache(delegate, duration, unit);
}

private Iterator<Factory> cache(final Iterator<Factory> delegates, final long duration, final TimeUnit unit) {
return Iterators.transform(delegates, delegate -> CachingCollector.cache(delegate, duration, unit));
return Iterators.transform(delegates, delegate -> CachingCollector.cache(delegate, duration, unit));
}


Expand All @@ -532,10 +532,10 @@ public List<Factory> get() {
final ImmutableList.Builder<Factory> builder = ImmutableList.builder();

builder.add(FailureDetectorMBeanMetricFamilyCollector.factory(metadataFactory));
builder.add(cache(StorageServiceMBeanMetricFamilyCollector.factory(metadataFactory, excludedKeyspaces), 5, TimeUnit.MINUTES));
builder.add(cache(StorageServiceMBeanMetricFamilyCollector.factory(metadataFactory, excludedKeyspaces, exclusions), 5, TimeUnit.MINUTES));

builder.add(MemoryPoolMXBeanMetricFamilyCollector.FACTORY);
builder.add(GarbageCollectorMXBeanMetricFamilyCollector.FACTORY);
builder.add(GarbageCollectorMXBeanMetricFamilyCollector.factory(exclusions));
builder.add(BufferPoolMXBeanMetricFamilyCollector.FACTORY);
builder.add(cache(OperatingSystemMXBeanMetricFamilyCollector.FACTORY, 5, TimeUnit.MINUTES));
builder.add(ThreadMXBeanMetricFamilyCollector.factory(perThreadTimingEnabled));
Expand Down Expand Up @@ -635,7 +635,7 @@ public List<Factory> get() {

// org.apache.cassandra.metrics.CompactionMetrics
{
builder.add(compactionMetric(functionalCollectorConstructor(counterAsCounter()),"BytesCompacted", "bytes_compacted_total", "Total number of bytes compacted (since server start)."));
builder.add(compactionMetric(functionalCollectorConstructor(counterAsCounter()), "BytesCompacted", "bytes_compacted_total", "Total number of bytes compacted (since server start)."));
builder.add(compactionMetric(functionalCollectorConstructor(numericGaugeAsCounter()), "CompletedTasks", "completed_tasks_total", "Total number of completed compaction tasks (since server start)."));
// "PendingTasks" ignored -- it's an aggregate of the table-level metrics (see the table metric "PendingCompactions")
builder.add(compactionMetric(functionalCollectorConstructor(meterAsCounter()), "TotalCompactionsCompleted", "completed_total", "Total number of compactions (since server start)."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.zegelin.jmx.NamedObject;
import com.zegelin.cassandra.exporter.cli.HarvesterOptions;
import com.zegelin.prometheus.domain.CounterMetricFamily;
import com.zegelin.prometheus.domain.Interval.Quantile;
import com.zegelin.prometheus.domain.Labels;
import com.zegelin.prometheus.domain.MetricFamily;
import com.zegelin.prometheus.domain.NumericMetric;
Expand Down Expand Up @@ -132,6 +133,7 @@ public boolean equals(final Object o) {

private final boolean collectorTimingEnabled;
private final Map<String, Stopwatch> collectionTimes = new ConcurrentHashMap<>();
private final Set<Quantile> excludedHistoQuantiles;

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("cassandra-exporter-harvester-defer-%d")
Expand All @@ -145,6 +147,11 @@ protected Harvester(final MetadataFactory metadataFactory, final HarvesterOption
this.exclusions = options.exclusions;
this.enabledGlobalLabels = options.globalLabels;
this.collectorTimingEnabled = options.collectorTimingEnabled;
this.excludedHistoQuantiles = options.excludedHistoQuantiles;
}

public Set<Quantile> getExcludedHistoQuantiles() {
return excludedHistoQuantiles;
}

protected void addCollectorFactory(final MBeanGroupMetricFamilyCollector.Factory factory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableSet;
import com.zegelin.netty.Floats;
import com.zegelin.prometheus.domain.Interval.Quantile;
import com.zegelin.cassandra.exporter.FactoriesSupplier;
import com.zegelin.cassandra.exporter.Harvester;
import picocli.CommandLine;
Expand Down Expand Up @@ -152,4 +153,27 @@ public void setExcludeSystemTables(final boolean excludeSystemTables) {

excludedKeyspaces.addAll(CASSANDRA_SYSTEM_KEYSPACES);
}

public final Set<Quantile> excludedHistoQuantiles = new HashSet<>();
@Option(names = {"--exclude-from-histogram"}, paramLabel = "EXCLUSION", arity = "1..*",
description = "Select which quantiles to exclude from histogram metrics. The specified quantiles are excluded from all histogram/summary metrics" +
"Valid options are: P_50, P_75, P_95, P_98, P_99, P_99_9" +
"'P_50' (Quantile .5), " +
"'P_75' (Quantile .75), " +
"'P_95' (Quantile .95), " +
"'P_98' (Quantile .98). " +
"'P_99' (Quantile .99). " +
"'P_99_9' (Quantile .999). " +
"The default is to include all quantiles. "
)
void setExcludeFromHistogram(final Set<String> values) {
values.forEach( e -> {
Quantile q = Quantile.ALL_PERCENTILES.get(e);
if(q == null) {
throw new IllegalArgumentException(String.format("The specified exlusion quantile '%s' is invalid, value values are '%s'", e, Quantile.ALL_PERCENTILES.keySet()));
}
excludedHistoQuantiles.add(q);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.zegelin.cassandra.exporter.Harvester;
import com.zegelin.cassandra.exporter.MBeanGroupMetricFamilyCollector;
import com.zegelin.cassandra.exporter.MetadataFactory;
import com.zegelin.prometheus.domain.GaugeMetricFamily;
Expand All @@ -26,13 +27,14 @@

public class StorageServiceMBeanMetricFamilyCollector extends MBeanGroupMetricFamilyCollector {
private static final Logger logger = LoggerFactory.getLogger(StorageServiceMBeanMetricFamilyCollector.class);
private final Set<Harvester.Exclusion> exclusions;

public static Factory factory(final MetadataFactory metadataFactory, final Set<String> excludedKeyspaces) {
public static Factory factory(final MetadataFactory metadataFactory, final Set<String> excludedKeyspaces, final Set<Harvester.Exclusion> exclusions) {
return mBean -> {
if (!STORAGE_SERVICE_MBEAN_NAME.apply(mBean.name))
return null;

return new StorageServiceMBeanMetricFamilyCollector((StorageServiceMBean) mBean.object, metadataFactory, excludedKeyspaces);
return new StorageServiceMBeanMetricFamilyCollector((StorageServiceMBean) mBean.object, metadataFactory, excludedKeyspaces, exclusions);
};
}

Expand All @@ -45,10 +47,11 @@ public static Factory factory(final MetadataFactory metadataFactory, final Set<S


private StorageServiceMBeanMetricFamilyCollector(final StorageServiceMBean storageServiceMBean,
final MetadataFactory metadataFactory, final Set<String> excludedKeyspaces) {
final MetadataFactory metadataFactory, final Set<String> excludedKeyspaces, final Set<Harvester.Exclusion> exclusions) {
this.storageServiceMBean = storageServiceMBean;
this.metadataFactory = metadataFactory;
this.excludedKeyspaces = excludedKeyspaces;
this.exclusions=exclusions;

// determine the set of FileStores (i.e., mountpoints) for the Cassandra data/CL/cache directories
// (which can be done once -- changing directories requires a server restart)
Expand Down Expand Up @@ -144,6 +147,6 @@ public Stream<MetricFamily> collect() {
metricFamilyStreamBuilder.add(new GaugeMetricFamily("cassandra_storage_filesystem_unallocated_bytes", null, fileStoreUnallocatedSpaceMetrics.build()));
}

return metricFamilyStreamBuilder.build();
return metricFamilyStreamBuilder.build().filter(mf -> exclusions.contains(mf.name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.ImmutableMap;
import com.sun.management.GcInfo;
import com.zegelin.cassandra.exporter.Harvester;
import com.zegelin.jmx.ObjectNames;
import com.zegelin.cassandra.exporter.MBeanGroupMetricFamilyCollector;
import com.zegelin.prometheus.domain.*;
Expand All @@ -11,28 +12,33 @@
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import static com.zegelin.cassandra.exporter.MetricValueConversionFunctions.millisecondsToSeconds;
import static com.zegelin.cassandra.exporter.MetricValueConversionFunctions.neg1ToNaN;

public class GarbageCollectorMXBeanMetricFamilyCollector extends MBeanGroupMetricFamilyCollector {
private static final ObjectName GARBAGE_COLLECTOR_MXBEAN_NAME_PATTERN = ObjectNames.create(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*");
private final Set<Harvester.Exclusion> exclusions;

public static final Factory FACTORY = mBean -> {
if (!GARBAGE_COLLECTOR_MXBEAN_NAME_PATTERN.apply(mBean.name))
return null;
public static Factory factory(final Set<Harvester.Exclusion> exclusions) {
return mBean -> {
if (!GARBAGE_COLLECTOR_MXBEAN_NAME_PATTERN.apply(mBean.name))
return null;

final GarbageCollectorMXBean garbageCollectorMXBean = (GarbageCollectorMXBean) mBean.object;
final GarbageCollectorMXBean garbageCollectorMXBean = (GarbageCollectorMXBean) mBean.object;

final Labels collectorLabels = Labels.of("collector", garbageCollectorMXBean.getName());
final Labels collectorLabels = Labels.of("collector", garbageCollectorMXBean.getName());

return new GarbageCollectorMXBeanMetricFamilyCollector(ImmutableMap.of(collectorLabels, garbageCollectorMXBean));
};
return new GarbageCollectorMXBeanMetricFamilyCollector(ImmutableMap.of(collectorLabels, garbageCollectorMXBean), exclusions);
};
}

private final Map<Labels, GarbageCollectorMXBean> labeledGarbageCollectorMXBeans;

private GarbageCollectorMXBeanMetricFamilyCollector(final Map<Labels, GarbageCollectorMXBean> labeledGarbageCollectorMXBeans) {
private GarbageCollectorMXBeanMetricFamilyCollector(final Map<Labels, GarbageCollectorMXBean> labeledGarbageCollectorMXBeans, Set<Harvester.Exclusion> exclusions) {
this.exclusions = exclusions;
this.labeledGarbageCollectorMXBeans = labeledGarbageCollectorMXBeans;
}

Expand All @@ -46,10 +52,12 @@ public MBeanGroupMetricFamilyCollector merge(final MBeanGroupMetricFamilyCollect

final Map<Labels, GarbageCollectorMXBean> labeledGarbageCollectorMXBeans = new HashMap<>(this.labeledGarbageCollectorMXBeans);
for (final Map.Entry<Labels, GarbageCollectorMXBean> entry : other.labeledGarbageCollectorMXBeans.entrySet()) {
labeledGarbageCollectorMXBeans.merge(entry.getKey(), entry.getValue(), (o1, o2) -> {throw new IllegalStateException(String.format("Object %s and %s cannot be merged, yet their labels are the same.", o1, o2));});
labeledGarbageCollectorMXBeans.merge(entry.getKey(), entry.getValue(), (o1, o2) -> {
throw new IllegalStateException(String.format("Object %s and %s cannot be merged, yet their labels are the same.", o1, o2));
});
}

return new GarbageCollectorMXBeanMetricFamilyCollector(labeledGarbageCollectorMXBeans);
return new GarbageCollectorMXBeanMetricFamilyCollector(labeledGarbageCollectorMXBeans, exclusions);
}

@Override
Expand All @@ -73,11 +81,10 @@ public Stream<MetricFamily> collect() {
}
}
}

return Stream.of(
new CounterMetricFamily("cassandra_jvm_gc_collection_count", "Total number of collections that have occurred (since JVM start).", collectionCountMetrics.build()),
new CounterMetricFamily("cassandra_jvm_gc_estimated_collection_duration_seconds_total", "Estimated cumulative collection elapsed time (since JVM start).", collectionDurationTotalSecondsMetrics.build()),
new GaugeMetricFamily("cassandra_jvm_gc_last_collection_duration_seconds", "Last collection duration.", lastGCDurationSecondsMetrics.build())
);
final Stream.Builder<MetricFamily> metricFamilyStreamBuilder = Stream.builder();
metricFamilyStreamBuilder.add(new CounterMetricFamily("cassandra_jvm_gc_collection_count", "Total number of collections that have occurred (since JVM start).", collectionCountMetrics.build()));
metricFamilyStreamBuilder.add(new CounterMetricFamily("cassandra_jvm_gc_estimated_collection_duration_seconds_total", "Estimated cumulative collection elapsed time (since JVM start).", collectionDurationTotalSecondsMetrics.build()));
metricFamilyStreamBuilder.add(new GaugeMetricFamily("cassandra_jvm_gc_last_collection_duration_seconds", "Last collection duration.", lastGCDurationSecondsMetrics.build()));
return metricFamilyStreamBuilder.build().filter(mf -> exclusions.contains(mf.name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private ChannelFuture sendMetrics(final ChannelHandlerContext ctx, final FullHtt
lastWriteFuture = ctx.writeAndFlush(response);

if (request.getMethod() == HttpMethod.GET) {
ReadableByteChannel byteChannel = new FormattedByteChannel(new TextFormatExposition(metricFamilyStream, timestamp, globalLabels, includeHelp));
ReadableByteChannel byteChannel = new FormattedByteChannel(new TextFormatExposition(metricFamilyStream, timestamp, globalLabels, includeHelp, harvester.getExcludedHistoQuantiles()));
lastWriteFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedNioStream(byteChannel, FormattedByteChannel.MAX_CHUNK_SIZE)));
}

Expand Down
12 changes: 10 additions & 2 deletions common/src/main/java/com/zegelin/prometheus/domain/Interval.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.zegelin.prometheus.domain;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.zegelin.function.FloatFloatFunction;

import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/*
A Summary quanitle or Histogram bucket and associated value.
Expand All @@ -23,6 +23,14 @@ public static class Quantile {

public static final Set<Quantile> STANDARD_PERCENTILES = ImmutableSet.of(P_50, P_75, P_95, P_98, P_99, P_99_9);
public static final Quantile POSITIVE_INFINITY = q(Float.POSITIVE_INFINITY);
public static final Map<String,Quantile> ALL_PERCENTILES = new ImmutableMap.Builder<String,Quantile>()
.put("P_50",P_50)
.put("P_75",P_75)
.put("P_95",P_95)
.put("P_98",P_98)
.put("P_99",P_99)
.put("P_99_9",P_99_9)
.build();

public final float value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import com.google.common.base.Stopwatch;
import com.zegelin.netty.Resources;
import com.zegelin.prometheus.domain.*;
import com.zegelin.prometheus.domain.Interval.Quantile;
import com.zegelin.prometheus.exposition.ExpositionSink;
import com.zegelin.prometheus.exposition.FormattedExposition;
import io.netty.buffer.ByteBuf;

import java.time.Instant;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Stream;

public class TextFormatExposition implements FormattedExposition {
Expand All @@ -35,13 +37,14 @@ private enum State {
private int metricCount = 0;

private final Stopwatch stopwatch = Stopwatch.createUnstarted();


public TextFormatExposition(final Stream<MetricFamily> metricFamilies, final Instant timestamp, final Labels globalLabels, final boolean includeHelp) {
private final Set<Quantile> excludedHistoQuantiles;
public TextFormatExposition(final Stream<MetricFamily> metricFamilies, final Instant timestamp, final Labels globalLabels, final boolean includeHelp, final Set<Quantile> excludedHistoQuantiles) {
this.metricFamiliesIterator = metricFamilies.iterator();
this.timestamp = timestamp;
this.globalLabels = globalLabels;
this.includeHelp = includeHelp;
this.excludedHistoQuantiles = excludedHistoQuantiles;
}

@Override
Expand Down Expand Up @@ -70,7 +73,7 @@ public void nextSlice(final ExpositionSink<?> chunkBuffer) {

final MetricFamily<?> metricFamily = metricFamiliesIterator.next();

metricFamilyWriter = new TextFormatMetricFamilyWriter(timestamp, globalLabels, includeHelp, metricFamily);
metricFamilyWriter = new TextFormatMetricFamilyWriter(timestamp, globalLabels, includeHelp, metricFamily, excludedHistoQuantiles);

metricFamilyWriter.writeFamilyHeader(chunkBuffer);

Expand Down
Loading