Skip to content

Commit

Permalink
Fix race condition in indexer timer tasks (#1003)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Jul 25, 2024
1 parent dabee28 commit 0c773f5
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.analysis.Analyzer;
Expand Down Expand Up @@ -55,7 +56,11 @@ public class LuceneIndexStoreImpl implements LogStore {
private final SearcherManager searcherManager;
private final DocumentBuilder documentBuilder;
private final FSDirectory indexDirectory;
private final Timer timer;

private final ScheduledExecutorService scheduledCommit =
Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduledRefresh =
Executors.newSingleThreadScheduledExecutor();
private final SnapshotDeletionPolicy snapshotDeletionPolicy;
private Optional<IndexWriter> indexWriter;

Expand Down Expand Up @@ -123,25 +128,29 @@ public LuceneIndexStoreImpl(
indexWriter = Optional.of(new IndexWriter(indexDirectory, indexWriterConfig));
this.searcherManager = new SearcherManager(indexWriter.get(), false, false, null);

timer = new Timer(true);
timer.schedule(
new TimerTask() {
@Override
public void run() {
scheduledCommit.scheduleWithFixedDelay(
() -> {
try {
commit();
} catch (Exception e) {
LOG.error("Error running scheduled commit", e);
}
},
config.commitDuration.toMillis(),
config.commitDuration.toMillis());
timer.schedule(
new TimerTask() {
@Override
public void run() {
config.commitDuration.toMillis(),
TimeUnit.MILLISECONDS);

scheduledRefresh.scheduleWithFixedDelay(
() -> {
try {
refresh();
} catch (Exception e) {
LOG.error("Error running scheduled commit", e);
}
},
config.refreshDuration.toMillis(),
config.refreshDuration.toMillis());
config.refreshDuration.toMillis(),
TimeUnit.MILLISECONDS);

// Initialize stats counters
messagesReceivedCounter = registry.counter(MESSAGES_RECEIVED_COUNTER);
Expand Down Expand Up @@ -364,16 +373,28 @@ public void releaseIndexCommit(IndexCommit indexCommit) {
* ensure that the data is already committed before close.
*/
@Override
public void close() {
public void close() throws IOException {
LOG.info("Closing index {}", id);
scheduledCommit.close();
scheduledRefresh.close();
try {
if (!scheduledCommit.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error("Timed out waiting for scheduled commit to close");
}
if (!scheduledRefresh.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.error("Timed out waiting for scheduled refresh to close");
}
} catch (InterruptedException e) {
throw new IOException(e);
}

indexWriterLock.lock();
try {
if (indexWriter.isEmpty()) {
// Closable.close() requires this be idempotent, so silently exit instead of throwing an
// exception
return;
}

timer.cancel();
try {
indexWriter.get().close();
} catch (IllegalStateException | IOException | NoSuchElementException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public class SuppressExceptionsOnClosedWriter {
public SuppressExceptionsOnClosedWriter() throws IOException {}

@Test
public void testSearcherOnclosedWriter() {
public void testSearcherOnclosedWriter() throws IOException {
addMessages(testLogStore.logStore, 1, 100, true);
testLogStore.logStore.close();
testLogStore.logStore = null;
Expand Down

0 comments on commit 0c773f5

Please sign in to comment.