Skip to content

Commit

Permalink
Fix bug in replica lock release in the mem store
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Dejanovski authored and Alexander Dejanovski committed Dec 18, 2024
1 parent 664236b commit 4245a9e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicaLockManagerWithTtl {

private static final Logger LOG = LoggerFactory.getLogger(ReplicaLockManagerWithTtl.class);
private final ConcurrentHashMap<String, LockInfo> replicaLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, Set<UUID>> repairRunToSegmentLocks = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
Expand Down Expand Up @@ -58,6 +61,7 @@ public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String
&& lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId));

if (anyReplicaLocked) {
LOG.debug("One of the replicas is already locked by another segment for runId: {}", runId);
return false; // Replica is locked by another runId and not expired
}

Expand Down Expand Up @@ -110,16 +114,21 @@ public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set<Str
// Remove the lock for replicas
replicas.stream()
.map(replica -> getReplicaLockKey(replica, runId))
.map(replicaLocks::get)
.filter(lockInfo -> lockInfo != null && lockInfo.runId.equals(runId))
.forEach(lockInfo -> replicaLocks.remove(getReplicaLockKey(lockInfo.runId.toString(), runId)));
.forEach(replica -> LOG.debug("releasing lock for replica: {}", replica));

replicas.stream()
.map(replica -> getReplicaLockKey(replica, runId))
.forEach(replicaLocks::remove);

LOG.debug("Locked replicas after release: {}", replicaLocks.keySet());
// Remove the segmentId from the runId mapping
Set<UUID> segments = repairRunToSegmentLocks.get(runId);
if (segments != null) {
segments.remove(segmentId);
if (segments.isEmpty()) {
repairRunToSegmentLocks.remove(runId);
} else {
repairRunToSegmentLocks.put(runId, segments);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public void testRenewRunningRepairsForNodesNotLocked() {
@Test
public void testReleaseRunningRepairsForNodes() {
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas));
// Same replicas can't be locked twice
assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicas));
assertTrue(replicaLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas));
// After unlocking, we can lock again
assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicas));
}

@Test
Expand Down

0 comments on commit 4245a9e

Please sign in to comment.