Skip to content

Commit

Permalink
Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Dec 12, 2024
1 parent ae27f21 commit 83b8836
Showing 1 changed file with 135 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,154 @@

package org.elasticsearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.threadpool.ThreadPool;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Executor;

public class ElasticsearchThreadPoolMergeScheduler extends AbstractThreadPoolMergeScheduler {
/**
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
* and current merges.
*/
public class ElasticsearchThreadPoolMergeScheduler extends AbstractThreadPoolMergeScheduler implements ElasticsearchMergeScheduler {

protected final Logger logger;
private final Settings indexSettings;
private final ShardId shardId;

private final MergeTracking mergeTracking;
private final MergeSchedulerConfig config;
private final SameThreadExecutorService sameThreadExecutorService = new SameThreadExecutorService();

ElasticsearchThreadPoolMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
this.config = indexSettings.getMergeSchedulerConfig();
this.shardId = shardId;
this.indexSettings = indexSettings.getSettings();
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergeTracking = new MergeTracking(
logger,
() -> indexSettings.getMergeSchedulerConfig().isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY
);
refreshConfig();
}

@Override
public Set<OnGoingMerge> onGoingMerges() {
return mergeTracking.onGoingMerges();
}

/** We're currently only interested in messages with this prefix. */
private static final String MERGE_THREAD_MESSAGE_PREFIX = "merge thread";

public ElasticsearchThreadPoolMergeScheduler(ThreadPool threadPool) {
@Override
// Overridden until investigation in https://github.com/apache/lucene/pull/13475 is complete
public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) {
return sameThreadExecutorService;
}

@Override
// Overridden until investigation in https://github.com/apache/lucene/pull/13475 is complete
public void close() throws IOException {
super.close();
sameThreadExecutorService.shutdown();
}

@Override
protected void dispatchMerge(MergeTask newMergeTask) throws IOException {
// TODO: IMPLEMENT AND DISPATCH TO THREAD POOL.
}

@Override
/** Overridden to route specific MergeThread messages to our logger. */
protected boolean verbose() {
if (logger.isTraceEnabled()) {
return true;
}
return super.verbose();
}

@Override
/** Overridden to route specific MergeThread messages to our logger. */
protected void message(String message) {
if (logger.isTraceEnabled() && message.startsWith(MERGE_THREAD_MESSAGE_PREFIX)) {
logger.trace("{}", message);
}
super.message(message);
}

@Override
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
long timeNS = System.nanoTime();
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
mergeTracking.mergeStarted(onGoingMerge);
try {
beforeMerge(onGoingMerge);
super.doMerge(mergeSource, merge);
} finally {
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - timeNS);
mergeTracking.mergeFinished(merge, onGoingMerge, tookMS);

afterMerge(onGoingMerge);
}

}

/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void beforeMerge(OnGoingMerge merge) {}

/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void afterMerge(OnGoingMerge merge) {}

@Override
public MergeScheduler clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}

@Override
protected boolean maybeStall(MergeSource mergeSource) {
// Don't stall here, because we do our own index throttling (in InternalEngine.IndexThrottle) when merges can't keep up
return true;
}

@Override
public MergeStats stats() {
return mergeTracking.stats();
}

@Override
public void refreshConfig() {
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
boolean isEnabled = getIORateLimitMBPerSec() != Double.POSITIVE_INFINITY;
if (config.isAutoThrottle() && isEnabled == false) {
enableAutoIOThrottle();
} else if (config.isAutoThrottle() == false && isEnabled) {
disableAutoIOThrottle();
}
}

@Override
protected synchronized String getMergeTaskName(MergeSource mergeSource, MergePolicy.OneMerge merge) {
// Implement similar to get merge thread name
return super.getMergeTaskName(mergeSource, merge);
public MergeScheduler getMergeScheduler() {
return this;
}
}

0 comments on commit 83b8836

Please sign in to comment.