Skip to content

Commit

Permalink
Replace Semaphore with ReleasableLock for consistency
Browse files Browse the repository at this point in the history
Signed-off-by: Yujin Ahn <[email protected]>
  • Loading branch information
ahnyujin committed Feb 9, 2025
1 parent b82ed5a commit 8cf56b4
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
Expand Down Expand Up @@ -91,8 +92,8 @@ public class RemoteFsTranslog extends Translog {
protected final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);

// These permits exist to allow any inflight background triggered upload.
private static final int SYNC_PERMIT = 1;
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private static final ReentrantLock reentrantLock = new ReentrantLock();
private static final ReleasableLock syncPermit = new ReleasableLock(reentrantLock);
protected final AtomicBoolean pauseSync = new AtomicBoolean(false);
private final boolean isTranslogMetadataEnabled;

Expand Down Expand Up @@ -374,9 +375,9 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
// ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync`
// action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here -
// 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs
// 2. Using syncPermits, we prevent syncs at the desired time during primary relocation.
if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) {
logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits());
// 2. Using syncPermit, we prevent syncs at the desired time during primary relocation.
if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.isHeldByCurrentThread() || syncPermit.tryAcquire() == null) {
logger.debug("skipped uploading translog for {} {} syncPermit={}", primaryTerm, generation, availablePermits());
// NO-OP
return false;
}
Expand Down Expand Up @@ -446,7 +447,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint)
);
} finally {
syncPermit.release(SYNC_PERMIT);
syncPermit.close();
}

}
Expand Down Expand Up @@ -535,14 +536,13 @@ protected void setMinSeqNoToKeep(long seqNo) {
@Override
protected Releasable drainSync() {
try {
if (syncPermit.tryAcquire(SYNC_PERMIT, 1, TimeUnit.MINUTES)) {
if (syncPermit.tryAcquire(timeValueMinutes(1L)) != null) {
boolean result = pauseSync.compareAndSet(false, true);
assert result && syncPermit.availablePermits() == 0;
assert result && syncPermit.isHeldByCurrentThread();
logger.info("All inflight remote translog syncs finished and further syncs paused");
return Releasables.releaseOnce(() -> {
syncPermit.release(SYNC_PERMIT);
syncPermit.close();
boolean wasSyncPaused = pauseSync.getAndSet(false);
assert syncPermit.availablePermits() == SYNC_PERMIT : "Available permits is " + syncPermit.availablePermits();
assert wasSyncPaused : "RemoteFsTranslog sync was not paused before re-enabling it";
logger.info("Resumed remote translog sync back on relocation failure");
});
Expand Down Expand Up @@ -748,7 +748,7 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi

// Visible for testing
int availablePermits() {
return syncPermit.availablePermits();
return reentrantLock.isLocked() ? 0 : 1;
}

/**
Expand Down

0 comments on commit 8cf56b4

Please sign in to comment.