diff --git a/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java b/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java index c614a1621..8f217d4e6 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java @@ -27,9 +27,12 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ReplicaLockManagerWithTtl { + private static final Logger LOG = LoggerFactory.getLogger(ReplicaLockManagerWithTtl.class); private final ConcurrentHashMap replicaLocks = new ConcurrentHashMap<>(); private final ConcurrentHashMap> repairRunToSegmentLocks = new ConcurrentHashMap<>(); private final Lock lock = new ReentrantLock(); @@ -58,6 +61,7 @@ public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set currentTime && lockInfo.runId.equals(runId)); if (anyReplicaLocked) { + LOG.debug("One of the replicas is already locked by another segment for runId: {}", runId); return false; // Replica is locked by another runId and not expired } @@ -110,16 +114,21 @@ public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set getReplicaLockKey(replica, runId)) - .map(replicaLocks::get) - .filter(lockInfo -> lockInfo != null && lockInfo.runId.equals(runId)) - .forEach(lockInfo -> replicaLocks.remove(getReplicaLockKey(lockInfo.runId.toString(), runId))); + .forEach(replica -> LOG.debug("releasing lock for replica: {}", replica)); + replicas.stream() + .map(replica -> getReplicaLockKey(replica, runId)) + .forEach(replicaLocks::remove); + + LOG.debug("Locked replicas after release: {}", replicaLocks.keySet()); // Remove the segmentId from the runId mapping Set segments = repairRunToSegmentLocks.get(runId); if (segments != null) { segments.remove(segmentId); if (segments.isEmpty()) { repairRunToSegmentLocks.remove(runId); + } else { + repairRunToSegmentLocks.put(runId, segments); } } return true; diff --git a/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java b/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java index 08a5e12be..33ee695dd 100644 --- a/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java +++ b/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java @@ -72,7 +72,11 @@ public void testRenewRunningRepairsForNodesNotLocked() { @Test public void testReleaseRunningRepairsForNodes() { assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + // Same replicas can't be locked twice + assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicas)); assertTrue(replicaLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas)); + // After unlocking, we can lock again + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicas)); } @Test