diff --git a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java index fdf5df234..63f461eb2 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java +++ b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java @@ -308,9 +308,7 @@ private void abortSegmentsWithNoLeaderNonIncremental(RepairRun repairRun, Collec if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) { // When multiple Reapers are in use, we can get stuck segments when one instance is rebooted // Any segment in RUNNING or STARTED state but with no leader should be killed - Set leaders = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).getLockedSegmentsForRun(repairRun.getId()) - : Collections.emptySet(); + Set leaders = context.storage.getLockedSegmentsForRun(repairRun.getId()); Collection orphanedSegments = runningSegments .stream() diff --git a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java index 68bf551e8..5bd30fe99 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java +++ b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java @@ -869,10 +869,8 @@ private boolean takeLead(RepairSegment segment) { ? ((IDistributedStorage) context.storage).takeLead(leaderElectionId) : true; } else { - result = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()) - : true; + result = context.storage.lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); } if (!result) { context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "takeLead", "failed")).inc(); @@ -895,10 +893,8 @@ private boolean renewLead(RepairSegment segment) { } return result; } else { - boolean resultLock2 = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()) - : true; + boolean resultLock2 = context.storage.renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); if (!resultLock2) { context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "renewLead", "failed")).inc(); releaseLead(segment); @@ -912,13 +908,14 @@ private boolean renewLead(RepairSegment segment) { private void releaseLead(RepairSegment segment) { try (Timer.Context cx = context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "releaseLead")).time()) { - if (context.storage instanceof IDistributedStorage) { - if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) { + + if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) { + if (context.storage instanceof IDistributedStorage) { ((IDistributedStorage) context.storage).releaseLead(leaderElectionId); - } else { - ((IDistributedStorage) context.storage).releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()); } + } else { + context.storage.releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); } } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java index 298bf825b..fc48dd6b9 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java @@ -41,7 +41,6 @@ import io.cassandrareaper.storage.operations.IOperationsDao; import java.util.List; -import java.util.Set; import java.util.UUID; @@ -62,23 +61,6 @@ public interface IDistributedStorage extends IDistributedMetrics { void releaseLead(UUID leaderId); - boolean lockRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - boolean renewRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - boolean releaseRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - Set getLockedSegmentsForRun(UUID runId); - int countRunningReapers(); List getRunningReapers(); diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java b/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java index 98e879371..7157ae995 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java @@ -26,6 +26,9 @@ import io.cassandrareaper.storage.repairunit.IRepairUnitDao; import io.cassandrareaper.storage.snapshot.ISnapshotDao; +import java.util.Set; +import java.util.UUID; + import io.dropwizard.lifecycle.Managed; /** @@ -34,6 +37,23 @@ public interface IStorageDao extends Managed, IMetricsDao { + boolean lockRunningRepairsForNodes( + UUID repairId, + UUID segmentId, + Set replicas); + + boolean renewRunningRepairsForNodes( + UUID repairId, + UUID segmentId, + Set replicas); + + boolean releaseRunningRepairsForNodes( + UUID repairId, + UUID segmentId, + Set replicas); + + Set getLockedSegmentsForRun(UUID runId); + boolean isStorageConnected(); IEventsDao getEventsDao(); diff --git a/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java b/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java index f5b775fd4..84facac48 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java @@ -29,6 +29,7 @@ import io.cassandrareaper.storage.events.IEventsDao; import io.cassandrareaper.storage.events.MemoryEventsDao; import io.cassandrareaper.storage.memory.MemoryStorageRoot; +import io.cassandrareaper.storage.memory.ReplicaLockManagerWithTtl; import io.cassandrareaper.storage.metrics.MemoryMetricsDao; import io.cassandrareaper.storage.repairrun.IRepairRunDao; import io.cassandrareaper.storage.repairrun.MemoryRepairRunDao; @@ -46,9 +47,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import com.google.common.io.Files; import org.eclipse.serializer.persistence.types.PersistenceFieldEvaluator; import org.eclipse.store.storage.embedded.types.EmbeddedStorage; import org.eclipse.store.storage.embedded.types.EmbeddedStorageManager; @@ -61,8 +64,9 @@ */ public final class MemoryStorageFacade implements IStorageDao { + // Default time to live of leads taken on a segment + private static final long DEFAULT_LEAD_TTL = 90_000; private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class); - /** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects * that sometimes use the transient keyword for some of their implementation's backing stores**/ private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR = @@ -85,8 +89,9 @@ public final class MemoryStorageFacade implements IStorageDao { ); private final MemorySnapshotDao memSnapshotDao = new MemorySnapshotDao(); private final MemoryMetricsDao memMetricsDao = new MemoryMetricsDao(); + private final ReplicaLockManagerWithTtl replicaLockManagerWithTtl; - public MemoryStorageFacade(String persistenceStoragePath) { + public MemoryStorageFacade(String persistenceStoragePath, long leadTime) { LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath); this.embeddedStorage = EmbeddedStorage.Foundation(Paths.get(persistenceStoragePath)) .onConnectionFoundation( @@ -103,10 +108,19 @@ public MemoryStorageFacade(String persistenceStoragePath) { LOG.info("Loading existing data from persistence storage"); this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root(); } + this.replicaLockManagerWithTtl = new ReplicaLockManagerWithTtl(leadTime); } public MemoryStorageFacade() { - this("/tmp/" + UUID.randomUUID().toString()); + this(Files.createTempDir().getAbsolutePath(), DEFAULT_LEAD_TTL); + } + + public MemoryStorageFacade(String persistenceStoragePath) { + this(persistenceStoragePath, DEFAULT_LEAD_TTL); + } + + public MemoryStorageFacade(long leadTime) { + this(Files.createTempDir().getAbsolutePath(), leadTime); } @Override @@ -296,4 +310,25 @@ public Collection getRepairSegmentsByRunId(UUID runId) { public Map getSubscriptionsById() { return this.memoryStorageRoot.getSubscriptionsById(); } + + @Override + public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + return replicaLockManagerWithTtl.lockRunningRepairsForNodes(runId, segmentId, replicas); + } + + @Override + public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + return replicaLockManagerWithTtl.renewRunningRepairsForNodes(runId, segmentId, replicas); + } + + @Override + public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + LOG.info("Releasing locks for runId: {}, segmentId: {}, replicas: {}", runId, segmentId, replicas); + return replicaLockManagerWithTtl.releaseRunningRepairsForNodes(runId, segmentId, replicas); + } + + @Override + public Set getLockedSegmentsForRun(UUID runId) { + return replicaLockManagerWithTtl.getLockedSegmentsForRun(runId); + } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java b/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java new file mode 100644 index 000000000..c614a1621 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java @@ -0,0 +1,172 @@ +/* + * Copyright 2024-2024 DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.memory; + +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; + +public class ReplicaLockManagerWithTtl { + + private final ConcurrentHashMap replicaLocks = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> repairRunToSegmentLocks = new ConcurrentHashMap<>(); + private final Lock lock = new ReentrantLock(); + + private final long ttlMilliSeconds; + + public ReplicaLockManagerWithTtl(long ttlMilliSeconds) { + this.ttlMilliSeconds = ttlMilliSeconds; + // Schedule cleanup of expired locks + ScheduledExecutorService lockCleanupScheduler = Executors.newScheduledThreadPool(1); + lockCleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredLocks, 1, 1, TimeUnit.SECONDS); + } + + private String getReplicaLockKey(String replica, UUID runId) { + return replica + runId; + } + + public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + lock.lock(); + try { + long currentTime = System.currentTimeMillis(); + // Check if any replica is already locked by another runId + boolean anyReplicaLocked = replicas.stream() + .map(replica -> replicaLocks.get(getReplicaLockKey(replica, runId))) + .anyMatch(lockInfo -> lockInfo != null + && lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId)); + + if (anyReplicaLocked) { + return false; // Replica is locked by another runId and not expired + } + + // Lock the replicas for the given runId and segmentId + long expirationTime = currentTime + ttlMilliSeconds; + replicas.forEach(replica -> + replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, expirationTime)) + ); + + // Update runId to segmentId mapping + repairRunToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId); + return true; + } finally { + lock.unlock(); + } + } + + public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + lock.lock(); + try { + long currentTime = System.currentTimeMillis(); + + // Check if all replicas are already locked by this runId + boolean allReplicasLocked = replicas.stream() + .map(replica -> replicaLocks.get(getReplicaLockKey(replica, runId))) + .allMatch(lockInfo -> lockInfo != null && lockInfo.runId.equals(runId) + && lockInfo.expirationTime > currentTime); + + if (!allReplicasLocked) { + return false; // Some replica is not validly locked by this runId + } + + // Renew the lock by extending the expiration time + long newExpirationTime = currentTime + ttlMilliSeconds; + replicas.forEach(replica -> + replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, newExpirationTime)) + ); + + // Ensure the segmentId is linked to the runId + repairRunToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId); + return true; + } finally { + lock.unlock(); + } + } + + public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + lock.lock(); + try { + // Remove the lock for replicas + replicas.stream() + .map(replica -> getReplicaLockKey(replica, runId)) + .map(replicaLocks::get) + .filter(lockInfo -> lockInfo != null && lockInfo.runId.equals(runId)) + .forEach(lockInfo -> replicaLocks.remove(getReplicaLockKey(lockInfo.runId.toString(), runId))); + + // Remove the segmentId from the runId mapping + Set segments = repairRunToSegmentLocks.get(runId); + if (segments != null) { + segments.remove(segmentId); + if (segments.isEmpty()) { + repairRunToSegmentLocks.remove(runId); + } + } + return true; + } finally { + lock.unlock(); + } + } + + public Set getLockedSegmentsForRun(UUID runId) { + return repairRunToSegmentLocks.getOrDefault(runId, Collections.emptySet()); + } + + @VisibleForTesting + public void cleanupExpiredLocks() { + lock.lock(); + try { + long currentTime = System.currentTimeMillis(); + + // Remove expired locks from replicaLocks + replicaLocks.entrySet().removeIf(entry -> entry.getValue().expirationTime <= currentTime); + + // Clean up runToSegmentLocks by removing segments with no active replicas + repairRunToSegmentLocks.entrySet().removeIf(entry -> { + UUID runId = entry.getKey(); + Set segments = entry.getValue(); + + // Retain only active segments + segments.removeIf(segmentId -> { + boolean active = replicaLocks.values().stream() + .anyMatch(info -> info.runId.equals(runId)); + return !active; + }); + return segments.isEmpty(); + }); + } finally { + lock.unlock(); + } + } + + // Class to store lock information + private static class LockInfo { + UUID runId; + long expirationTime; + + LockInfo(UUID runId, long expirationTime) { + this.runId = runId; + this.expirationTime = expirationTime; + } + } +} \ No newline at end of file diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java index d1cf856b8..2f8eec8dd 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java @@ -562,6 +562,7 @@ public void a_new_daily_repair_schedule_is_added_for_the_last_added_cluster_and_ params.put("intensity", "0.9"); params.put("scheduleDaysBetween", "1"); params.put("scheduleTriggerTime", DateTime.now().plusSeconds(1).toString()); + params.put("segmentCountPerNode", "1"); ReaperTestJettyRunner runner = RUNNERS.get(RAND.nextInt(RUNNERS.size())); Response response = runner.callReaper("POST", "/repair_schedule", Optional.of(params)); int responseStatus = response.getStatus(); diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java index 9e2ddcbed..dd4db2995 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java @@ -17,6 +17,13 @@ package io.cassandrareaper.acceptance; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.stream.Stream; + import cucumber.api.CucumberOptions; import cucumber.api.junit.Cucumber; import org.junit.AfterClass; @@ -25,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + @RunWith(Cucumber.class) @CucumberOptions( features = { @@ -46,6 +54,8 @@ public static void setUp() throws Exception { "setting up testing Reaper runner with {} seed hosts defined and memory storage", TestContext.TEST_CLUSTER_SEED_HOSTS.size()); + // We now have persistence in the memory store, so we need to clean up the storage folder before starting the tests + deleteFolderContents("/tmp/reaper/storage/"); runner = new ReaperTestJettyRunner(MEMORY_CONFIG_FILE); BasicSteps.addReaperRunner(runner); } @@ -56,4 +66,22 @@ public static void tearDown() { runner.runnerInstance.after(); } + public static void deleteFolderContents(String folderPath) throws IOException { + // Check if the path exists + Path path = Paths.get(folderPath); + if (!Files.exists(path)) { + return; + } + try (Stream walk = Files.walk(path)) { + walk.sorted(Comparator.reverseOrder()) + .forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new RuntimeException("Failed to delete " + p, e); + } + }); + } + } + } diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java index 412191e9b..f8992240d 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java @@ -26,7 +26,6 @@ import io.cassandrareaper.core.RepairUnit; import io.cassandrareaper.core.Segment; import io.cassandrareaper.management.ClusterFacade; -import io.cassandrareaper.storage.IDistributedStorage; import io.cassandrareaper.storage.IStorageDao; import io.cassandrareaper.storage.cassandra.CassandraStorageFacade; import io.cassandrareaper.storage.cluster.IClusterDao; @@ -144,7 +143,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte Mockito.doNothing().when(context.repairManager).abortSegments(any(), any()); Mockito.doReturn(run).when(context.repairManager).startRepairRun(run); - when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet()); + when(context.storage.getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet()); IRepairUnitDao mockedRepairUnitDao = mock(IRepairUnitDao.class); Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao); Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf); @@ -238,7 +237,7 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao); Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf); - when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn( + when(context.storage.getLockedSegmentsForRun(any())).thenReturn( new HashSet(Arrays.asList(segment.getId()))); context.repairManager.resumeRunningRepairRuns(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java index afbd073f8..577b6520b 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java @@ -83,6 +83,7 @@ public final class RepairRunnerHangingTest { + private static final long LEAD_TTL = 1000L; private static final Logger LOG = LoggerFactory.getLogger(RepairRunnerHangingTest.class); private static final Set TABLES = ImmutableSet.of("table1"); private static final List THREE_TOKENS = Lists.newArrayList( @@ -242,7 +243,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM final double intensity = 0.5f; final int repairThreadCount = 1; final int segmentTimeout = 1; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); storage.getClusterDao().addCluster(cluster); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( RepairUnit.builder() @@ -397,7 +398,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti final double intensity = 0.5f; final int repairThreadCount = 1; final int segmentTimeout = 1; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); storage.getClusterDao().addCluster(cluster); DateTimeUtils.setCurrentMillisFixed(timeRun); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( @@ -553,7 +554,7 @@ public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws Inte final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java index b7f16e250..91789d398 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java @@ -99,6 +99,7 @@ import static org.mockito.Mockito.when; public final class RepairRunnerTest { + private static final long LEAD_TTL = 100L; private static final Set TABLES = ImmutableSet.of("table1"); private static final Duration POLL_INTERVAL = Duration.TWO_SECONDS; private static final List THREE_TOKENS = Lists.newArrayList( @@ -223,7 +224,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException, Mal final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -333,8 +334,10 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); context.repairManager.resumeRunningRepairRuns(); - Thread.sleep(1000); - assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + + await().with().pollInterval(POLL_INTERVAL).atMost(30, SECONDS).until(() -> { + return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); } @Test(expected = ConditionTimeoutException.class) @@ -353,7 +356,7 @@ public void testTooManyPendingCompactions() final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -546,7 +549,7 @@ public void testDontFailRepairAfterTopologyChange() throws InterruptedException, final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -667,7 +670,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept context.repairManager.resumeRunningRepairRuns(); // The repair run should succeed despite the topology change. - await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + await().with().atMost(120, TimeUnit.SECONDS).until(() -> { return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); }); } @@ -687,7 +690,7 @@ public void testSubrangeIncrementalRepair() throws InterruptedException, ReaperE final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -802,7 +805,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); context.repairManager.resumeRunningRepairRuns(); - await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + await().with().atMost(120, TimeUnit.SECONDS).until(() -> { return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); }); } @@ -962,7 +965,7 @@ public void getNodeMetricsInLocalDcAvailabilityForLocalDcNodeTest() throws Excep final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java index c4cdbff11..a533df458 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java @@ -193,6 +193,7 @@ public JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -348,6 +349,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -496,6 +498,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -639,6 +642,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -782,6 +786,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -927,6 +932,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1073,6 +1079,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1205,6 +1212,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1300,6 +1308,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); diff --git a/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java b/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java new file mode 100644 index 000000000..08a5e12be --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2014-2017 Spotify AB + * Copyright 2016-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.memory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ReplicaLockManagerWithTtlTest { + + private ReplicaLockManagerWithTtl replicaLockManager; + private UUID runId; + private UUID segmentId; + private Set replicas; + private Set replicasOverlap; + + @BeforeEach + public void setUp() { + replicaLockManager = new ReplicaLockManagerWithTtl(1000); + runId = UUID.randomUUID(); + segmentId = UUID.randomUUID(); + replicas = new HashSet<>(Arrays.asList("replica1", "replica2", "replica3")); + replicasOverlap = new HashSet<>(Arrays.asList("replica4", "replica2", "replica5")); + } + + @Test + public void testLockRunningRepairsForNodes() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testLockRunningRepairsForNodesAlreadyLocked() { + UUID anotherRunId = UUID.randomUUID(); + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testRenewRunningRepairsForNodes() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + assertTrue(replicaLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testRenewRunningRepairsForNodesNotLocked() { + assertFalse(replicaLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testReleaseRunningRepairsForNodes() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + assertTrue(replicaLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testGetLockedSegmentsForRun() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + Set lockedSegments = replicaLockManager.getLockedSegmentsForRun(runId); + assertTrue(lockedSegments.contains(segmentId)); + } + + @Test + public void testCleanupExpiredLocks() throws InterruptedException { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + // We can lock the replica for a different run id + assertTrue(replicaLockManager.lockRunningRepairsForNodes(UUID.randomUUID(), UUID.randomUUID(), replicas)); + // The following lock should fail because overlapping replicas are already locked + assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicasOverlap)); + Thread.sleep(1000); // Wait for TTL to expire + replicaLockManager.cleanupExpiredLocks(); + // The following lock should succeed as the lock expired + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicasOverlap)); + } +} \ No newline at end of file diff --git a/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml b/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml index e8ed291f1..ecedb61fe 100644 --- a/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml +++ b/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml @@ -28,7 +28,7 @@ enableDynamicSeedList: false jmxConnectionTimeoutInSeconds: 10 datacenterAvailability: LOCAL percentRepairedCheckIntervalMinutes: 1 -repairManagerSchedulingIntervalSeconds: 0 +repairManagerSchedulingIntervalSeconds: 1 logging: level: WARN