From 405011da487cb1577bc78d509a567de2ce2a7ff5 Mon Sep 17 00:00:00 2001 From: Andres Beck-Ruiz Date: Wed, 6 Nov 2024 17:07:14 -0500 Subject: [PATCH 1/3] Resolve CheckStyle errors --- .../src/main/docker/cassandra-reaper.yml | 2 + .../ReaperApplicationConfiguration.java | 22 + .../cassandrareaper/service/RepairRunner.java | 89 +- .../service/RepairRunnerHangingTest.java | 27 +- .../RepairRunnerTopologyChangeTest.java | 768 ++++++++++++++++++ 5 files changed, 876 insertions(+), 32 deletions(-) create mode 100644 src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTopologyChangeTest.java diff --git a/src/server/src/main/docker/cassandra-reaper.yml b/src/server/src/main/docker/cassandra-reaper.yml index 1fde85785..e42262dc6 100644 --- a/src/server/src/main/docker/cassandra-reaper.yml +++ b/src/server/src/main/docker/cassandra-reaper.yml @@ -30,6 +30,8 @@ repairManagerSchedulingIntervalSeconds: ${REAPER_REPAIR_MANAGER_SCHEDULING_INTER jmxConnectionTimeoutInSeconds: ${REAPER_JMX_CONNECTION_TIMEOUT_IN_SECONDS} useAddressTranslator: ${REAPER_USE_ADDRESS_TRANSLATOR} maxParallelRepairs: ${REAPER_MAX_PARALLEL_REPAIRS} +scheduleRetryOnError: ${REAPER_SCHEDULE_RETRY_ON_ERROR:-false} +scheduleRetryDelay: ${REAPER_SCHEDULE_RETRY_DELAY:-PT1H} # datacenterAvailability has three possible values: ALL | LOCAL | EACH # the correct value to use depends on whether jmx ports to C* nodes in remote datacenters are accessible diff --git a/src/server/src/main/java/io/cassandrareaper/ReaperApplicationConfiguration.java b/src/server/src/main/java/io/cassandrareaper/ReaperApplicationConfiguration.java index 705b78f9c..8ac7d64a7 100644 --- a/src/server/src/main/java/io/cassandrareaper/ReaperApplicationConfiguration.java +++ b/src/server/src/main/java/io/cassandrareaper/ReaperApplicationConfiguration.java @@ -177,6 +177,12 @@ public final class ReaperApplicationConfiguration extends Configuration { @Nullable private String persistenceStoragePath; + @JsonProperty + private Boolean scheduleRetryOnError; + + @JsonProperty + private Duration scheduleRetryDelay; + public HttpManagement getHttpManagement() { return httpManagement; } @@ -533,6 +539,22 @@ public String getPersistenceStoragePath() { return persistenceStoragePath; } + public Boolean isScheduleRetryOnError() { + return scheduleRetryOnError != null ? scheduleRetryOnError : false; + } + + public void setScheduleRetryOnError(Boolean scheduleRetryOnError) { + this.scheduleRetryOnError = scheduleRetryOnError; + } + + public Duration getScheduleRetryDelay() { + return scheduleRetryDelay != null ? scheduleRetryDelay : Duration.ofMinutes(60); + } + + public void setScheduleRetryDelay(Duration scheduleRetryDelay) { + this.scheduleRetryDelay = scheduleRetryDelay; + } + public enum DatacenterAvailability { /* We require direct JMX access to all nodes across all datacenters */ ALL, diff --git a/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java b/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java index c5f4f84b6..c6ae76b24 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java +++ b/src/server/src/main/java/io/cassandrareaper/service/RepairRunner.java @@ -372,6 +372,42 @@ private void endRepairRun() { } } + private void maybeScheduleRetryOnError() { + if (context.config.isScheduleRetryOnError()) { + // Check if a schedule exists for this keyspace and cluster before rescheduling + Collection schedulesForKeyspace + = context.storage.getRepairScheduleDao() + .getRepairSchedulesForClusterAndKeyspace(clusterName, repairUnit.getKeyspaceName()); + List repairSchedules = schedulesForKeyspace.stream() + .filter(schedule -> schedule.getRepairUnitId().equals(repairUnit.getId())) + .collect(Collectors.toList()); + + if (!repairSchedules.isEmpty()) { + // Set precondition that only a single schedule should match + Preconditions.checkArgument(repairSchedules.size() == 1, + String.format("Update for repair run %s and unit %s " + + "should impact a single schedule. %d were found", + repairRunId, + repairUnit.getId(), + repairSchedules.size()) + ); + RepairSchedule scheduleToTune = repairSchedules.get(0); + + int minuteRetryDelay = (int) context.config.getScheduleRetryDelay().toMinutes(); + DateTime nextRepairRun = DateTime.now().plusMinutes(minuteRetryDelay); + + if (nextRepairRun.isBefore(scheduleToTune.getNextActivation())) { + LOG.debug("Scheduling next repair run at {} for repair schedule {}", nextRepairRun, + scheduleToTune.getId()); + + RepairSchedule newSchedule + = scheduleToTune.with().nextActivation(nextRepairRun).build(scheduleToTune.getId()); + context.storage.getRepairScheduleDao().updateRepairSchedule(newSchedule); + } + } + } + } + /** * Tune segment timeout and number of segments for adaptive schedules. * Checks that the run was triggered by an adaptive schedule and gathers info on the run to apply tunings. @@ -508,6 +544,9 @@ private void startNextSegment() throws ReaperException, InterruptedException { } else { potentialReplicas.addAll(potentialReplicaMap.keySet()); } + if (potentialReplicas.isEmpty()) { + failRepairDueToOutdatedSegment(segment.getId(), segment.getTokenRange()); + } LOG.debug("Potential replicas for segment {}: {}", segment.getId(), potentialReplicas); ICassandraManagementProxy coordinator = clusterFacade.connect(cluster, potentialReplicas); if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) { @@ -684,26 +723,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection< return true; } if (potentialCoordinators.isEmpty()) { - LOG.warn( - "Segment #{} is faulty, no potential coordinators for range: {}", - segmentId, - segment.toString()); - // This segment has a faulty token range. Abort the entire repair run. - synchronized (this) { - repairRunDao.updateRepairRun( - repairRunDao.getRepairRun(repairRunId).get() - .with() - .runState(RepairRun.RunState.ERROR) - .lastEvent(String.format("No coordinators for range %s", segment)) - .endTime(DateTime.now()) - .build(repairRunId)); - - context.metricRegistry.counter( - MetricRegistry.name(RepairManager.class, "repairDone", RepairRun.RunState.ERROR.toString())).inc(); - - killAndCleanupRunner(); - } - + failRepairDueToOutdatedSegment(segmentId, segment); return false; } } else { @@ -754,6 +774,35 @@ public void onFailure(Throwable throwable) { return true; } + private synchronized void failRepairDueToOutdatedSegment(UUID segmentId, Segment segment) { + // This segment has a faulty token range possibly due to an additive change in cluster topology + // during repair. Abort the entire repair run. + LOG.warn("Segment #{} is faulty, no potential coordinators for range: {}", segmentId, + segment.toString()); + // If the segment has been removed, ignore. Should only happen in tests on backends + // that delete repair segments. + Optional repairRun = repairRunDao.getRepairRun(repairRunId); + if (repairRun.isPresent()) { + try { + repairRunDao.updateRepairRun( + repairRunDao.getRepairRun(repairRunId).get() + .with() + .runState(RepairRun.RunState.ERROR) + .lastEvent(String.format("No coordinators for range %s", segment)) + .endTime(DateTime.now()) + .build(repairRunId)); + + context.metricRegistry.counter( + MetricRegistry.name(RepairManager.class, "repairDone", + RepairRun.RunState.ERROR.toString())).inc(); + + maybeScheduleRetryOnError(); + } finally { + killAndCleanupRunner(); + } + } + } + private List filterPotentialCoordinatorsByDatacenters( Collection datacenters, List potentialCoordinators) throws ReaperException { diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java index 577b6520b..566052ee1 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java @@ -79,6 +79,8 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public final class RepairRunnerHangingTest { @@ -246,12 +248,9 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); storage.getClusterDao().addCluster(cluster); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( - RepairUnit.builder() - .clusterName(cluster.getName()) - .keyspaceName("reaper") - .columnFamilies(cfNames) - .incrementalRepair(false) - .subrangeIncrementalRepair(false) + RepairUnit.builder().clusterName(cluster.getName()) + .keyspaceName("reaper").columnFamilies(cfNames) + .incrementalRepair(false).subrangeIncrementalRepair(false) .nodes(nodeSet) .datacenters(datacenters) .blacklistedTables(blacklistedTables) @@ -267,7 +266,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM Collections.singleton( RepairSegment.builder( Segment.builder() - .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100"))) + .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("50"))) .withReplicas(replicas) .build(), cf.getId()))); @@ -383,6 +382,9 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept } }); assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + verify(jmx, times(2)).triggerRepair( + any(), any(), any(), any(), any(), any(), any(), anyInt() + ); } @Test @@ -403,10 +405,8 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti DateTimeUtils.setCurrentMillisFixed(timeRun); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( RepairUnit.builder() - .clusterName(cluster.getName()) - .keyspaceName(ksName) - .columnFamilies(cfNames) - .incrementalRepair(incrementalRepair) + .clusterName(cluster.getName()).keyspaceName(ksName) + .columnFamilies(cfNames).incrementalRepair(incrementalRepair) .subrangeIncrementalRepair(incrementalRepair) .nodes(nodeSet) .datacenters(datacenters) @@ -421,7 +421,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti Collections.singleton( RepairSegment.builder( Segment.builder() - .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100"))) + .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("50"))) .withReplicas(replicas) .build(), cf.getId()))); @@ -535,6 +535,9 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept } }); assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + verify(jmx, times(2)).triggerRepair( + any(), any(), any(), any(), any(), any(), any(), anyInt() + ); } @Test diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTopologyChangeTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTopologyChangeTest.java new file mode 100644 index 000000000..0e62dcb1f --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTopologyChangeTest.java @@ -0,0 +1,768 @@ +/* + * Copyright 2015-2017 Spotify AB + * Copyright 2016-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.service; + +import io.cassandrareaper.AppContext; +import io.cassandrareaper.ReaperApplicationConfiguration; +import io.cassandrareaper.ReaperApplicationConfiguration.AutoSchedulingConfiguration; +import io.cassandrareaper.ReaperException; +import io.cassandrareaper.core.Cluster; +import io.cassandrareaper.core.CompactionStats; +import io.cassandrareaper.core.Node; +import io.cassandrareaper.core.RepairRun; +import io.cassandrareaper.core.RepairSchedule; +import io.cassandrareaper.core.RepairSegment; +import io.cassandrareaper.core.RepairUnit; +import io.cassandrareaper.core.Segment; +import io.cassandrareaper.crypto.NoopCrypotograph; +import io.cassandrareaper.management.ClusterFacade; +import io.cassandrareaper.management.RepairStatusHandler; +import io.cassandrareaper.management.jmx.CassandraManagementProxyTest; +import io.cassandrareaper.management.jmx.JmxCassandraManagementProxy; +import io.cassandrareaper.management.jmx.JmxManagementConnectionFactory; +import io.cassandrareaper.storage.IStorageDao; +import io.cassandrareaper.storage.MemoryStorageFacade; + +import java.io.IOException; +import java.math.BigInteger; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.management.MalformedObjectNameException; +import javax.management.ReflectionException; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.cassandra.locator.EndpointSnitchInfoMBean; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.commons.lang3.RandomStringUtils; +import org.joda.time.DateTime; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public final class RepairRunnerTopologyChangeTest { + + private static final List THREE_TOKENS = Lists.newArrayList( + BigInteger.valueOf(0L), + BigInteger.valueOf(100L), + BigInteger.valueOf(200L)); + + private static final Set TABLES = ImmutableSet.of("table1"); + + private final Cluster cluster = Cluster.builder() + .withName("test_" + RandomStringUtils.randomAlphabetic(12)) + .withSeedHosts(ImmutableSet.of("127.0.0.1")) + .withState(Cluster.State.ACTIVE) + .build(); + + private static Map, List> addRangeToMap( + Map, List> map, + String start, + String end, + String... hosts) { + + List range = Lists.newArrayList(start, end); + List endPoints = Lists.newArrayList(hosts); + map.put(range, endPoints); + return map; + } + + private static Map, List> fourNodeClusterAfterBootstrap() { + Map, List> map = new HashMap, List>(); + map = addRangeToMap(map, "0", "50", "127.0.0.1", "127.0.0.4", "127.0.0.2"); + map = addRangeToMap(map, "50", "100", "127.0.0.4", "127.0.0.2", "127.0.0.3"); + map = addRangeToMap(map, "100", "200", "127.0.0.2", "127.0.0.3", "127.0.0.1"); + map = addRangeToMap(map, "200", "0", "127.0.0.3", "127.0.0.1", "127.0.0.4"); + return map; + } + + private static Map, List> twoNodeClusterAfterBootstrap() { + Map, List> map = new HashMap, List>(); + map = addRangeToMap(map, "0", "200", "127.0.0.1", "127.0.0.3"); + map = addRangeToMap(map, "200", "0", "127.0.0.3", "127.0.0.1"); + return map; + } + + private RepairRun addNewRepairRun( + final Map nodeMap, + final double intensity, + final IStorageDao storage, + UUID cf, + UUID hostID + ) { + return storage.getRepairRunDao().addRepairRun( + RepairRun.builder(cluster.getName(), cf) + .intensity(intensity) + .segmentCount(1) + .repairParallelism(RepairParallelism.PARALLEL) + .tables(TABLES), + Lists.newArrayList( + RepairSegment.builder( + Segment.builder() + .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100"))) + .withReplicas(nodeMap) + .build(), cf) + .withState(RepairSegment.State.RUNNING) + .withStartTime(DateTime.now()) + .withCoordinatorHost("reaper") + .withHostID(hostID), + RepairSegment.builder( + Segment.builder() + .withTokenRange(new RingRange(new BigInteger("100"), new BigInteger("200"))) + .withReplicas(nodeMap) + .build(), cf) + .withHostID(hostID) + ) + ); + } + + private Map endpointToHostIDMap() { + Map endpointToHostIDMap = new HashMap(); + endpointToHostIDMap.put("127.0.0.1", UUID.randomUUID().toString()); + endpointToHostIDMap.put("127.0.0.2", UUID.randomUUID().toString()); + endpointToHostIDMap.put("127.0.0.3", UUID.randomUUID().toString()); + + return endpointToHostIDMap; + } + + @Test + public void testFailRepairAfterAdditiveChangeInTopology() throws ReaperException, IOException { + final String ksName = "reaper"; + final Set cfNames = Sets.newHashSet("reaper"); + final boolean incrementalRepair = false; + final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3"); + final Map nodeMap = ImmutableMap.of( + "127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1"); + final Set datacenters = Collections.emptySet(); + final Set blacklistedTables = Collections.emptySet(); + final double intensity = 0.5f; + final int repairThreadCount = 1; + final int segmentTimeout = 30; + final List tokens = THREE_TOKENS; + final IStorageDao storage = new MemoryStorageFacade(); + AppContext context = new AppContext(); + context.storage = storage; + context.config = new ReaperApplicationConfiguration(); + context.config.setAutoScheduling(new AutoSchedulingConfiguration()); + context.config.setScheduleRetryOnError(true); + context.config.setScheduleRetryDelay(java.time.Duration.parse("PT1H")); + storage.getClusterDao().addCluster(cluster); + UUID cf = storage.getRepairUnitDao().addRepairUnit( + RepairUnit.builder().clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) + .subrangeIncrementalRepair(incrementalRepair) + .nodes(nodeSet) + .datacenters(datacenters) + .blacklistedTables(blacklistedTables) + .repairThreadCount(repairThreadCount) + .timeout(segmentTimeout)) + .getId(); + DateTime initialActivationDate = DateTime.now(); + DateTime nextActivationDate = initialActivationDate.plusHours(2); + storage.getRepairScheduleDao().addRepairSchedule(RepairSchedule.builder(cf).daysBetween(1).intensity(1) + .segmentCountPerNode(64) + .nextActivation(nextActivationDate) + .repairParallelism(RepairParallelism.DATACENTER_AWARE) + ); + final Map endpointToHostIDMap = endpointToHostIDMap(); + RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf, null); + final UUID runId = run.getId(); + final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId(); + assertEquals(storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState(), + RepairSegment.State.NOT_STARTED); + final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl(); + when(jmx.getClusterName()).thenReturn(cluster.getName()); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps()); + when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap); + when(jmx.getTokens()).thenReturn(tokens); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class); + when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1"); + try { + when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1"); + } catch (UnknownHostException ex) { + throw new AssertionError(ex); + } + context.repairManager = RepairManager.create( + context, + Executors.newScheduledThreadPool(10), + 1, + TimeUnit.MILLISECONDS, + 1, + context.storage.getRepairRunDao()); + AtomicInteger repairNumberCounter = new AtomicInteger(1); + when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt())) + .then( + (invocation) -> { + final int repairNumber = repairNumberCounter.getAndIncrement(); + new Thread() { + @Override + public void run() { + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.STARTED), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.FINISHED), + Optional.empty(), + null, + jmx); + } + }.start(); + return repairNumber; + }); + context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) { + @Override + protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException { + return jmx; + } + }; + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + storage.getRepairRunDao().updateRepairRun( + run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(fourNodeClusterAfterBootstrap()); + context.repairManager.resumeRunningRepairRuns(); + // The repair run should fail due to the token ranges for each node becoming smaller, resulting in + // the new ranges not completely enclosing every previously calculated segment. + await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + return RepairRun.RunState.ERROR == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); + RepairSchedule updatedRepairSchedule = storage.getRepairScheduleDao() + .getRepairSchedulesForClusterAndKeyspace(cluster.getName(), ksName) + .iterator().next(); + // Ensure that repair schedule has been updated to activate again in one hour with a delta of a minute, given + // test execution takes a few seconds. One hour is the default schedule retry delay. + assertTrue(updatedRepairSchedule.getNextActivation().isAfter(initialActivationDate.plusHours(1))); + assertTrue(updatedRepairSchedule.getNextActivation().isBefore(initialActivationDate.plusHours(1).plusMinutes(1))); + } + + @Test + public void testAfterAdditiveChangeInTopologyNoErrorWhenRepairScheduleDoesNotExist() + throws ReaperException, IOException { + final String ksName = "reaper"; + final Set cfNames = Sets.newHashSet("reaper"); + final boolean incrementalRepair = false; + final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3"); + final Map nodeMap = ImmutableMap.of( + "127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1"); + final Set datacenters = Collections.emptySet(); + final Set blacklistedTables = Collections.emptySet(); + final double intensity = 0.5f; + final int repairThreadCount = 1; + final int segmentTimeout = 30; + final List tokens = THREE_TOKENS; + final IStorageDao storage = new MemoryStorageFacade(); + AppContext context = new AppContext(); + context.storage = storage; + context.config = new ReaperApplicationConfiguration(); + context.config.setAutoScheduling(new AutoSchedulingConfiguration()); + context.config.setScheduleRetryOnError(true); + context.config.setScheduleRetryDelay(java.time.Duration.parse("PT1H")); + storage.getClusterDao().addCluster(cluster); + UUID cf = storage.getRepairUnitDao().addRepairUnit( + RepairUnit.builder().clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) + .subrangeIncrementalRepair(incrementalRepair) + .nodes(nodeSet) + .datacenters(datacenters) + .blacklistedTables(blacklistedTables) + .repairThreadCount(repairThreadCount) + .timeout(segmentTimeout)) + .getId(); + final Map endpointToHostIDMap = endpointToHostIDMap(); + RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf, null); + final UUID runId = run.getId(); + final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId(); + assertEquals(storage.getRepairSegmentDao() + .getRepairSegment(runId, segmentId).get().getState(), RepairSegment.State.NOT_STARTED); + final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl(); + when(jmx.getClusterName()).thenReturn(cluster.getName()); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps()); + when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap); + when(jmx.getTokens()).thenReturn(tokens); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class); + when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1"); + try { + when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1"); + } catch (UnknownHostException ex) { + throw new AssertionError(ex); + } + context.repairManager = RepairManager.create( + context, + Executors.newScheduledThreadPool(10), + 1, + TimeUnit.MILLISECONDS, + 1, + context.storage.getRepairRunDao()); + AtomicInteger repairNumberCounter = new AtomicInteger(1); + when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt())) + .then( + (invocation) -> { + final int repairNumber = repairNumberCounter.getAndIncrement(); + new Thread() { + @Override + public void run() { + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.STARTED), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.FINISHED), + Optional.empty(), + null, + jmx); + } + }.start(); + return repairNumber; + }); + context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) { + @Override + protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException { + return jmx; + } + }; + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + storage.getRepairRunDao().updateRepairRun( + run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(fourNodeClusterAfterBootstrap()); + context.repairManager.resumeRunningRepairRuns(); + await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + return RepairRun.RunState.ERROR == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); + } + + @Test + public void testAfterAdditiveChangeInTopologyNoRescheduleWhenScheduleRetryOnErrorIsFalse() + throws ReaperException, IOException { + final String ksName = "reaper"; + final Set cfNames = Sets.newHashSet("reaper"); + final boolean incrementalRepair = false; + final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3"); + final Map nodeMap = ImmutableMap.of( + "127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1"); + final Set datacenters = Collections.emptySet(); + final Set blacklistedTables = Collections.emptySet(); + final double intensity = 0.5f; + final int repairThreadCount = 1; + final int segmentTimeout = 30; + final List tokens = THREE_TOKENS; + final IStorageDao storage = new MemoryStorageFacade(); + AppContext context = new AppContext(); + context.storage = storage; + context.config = new ReaperApplicationConfiguration(); + context.config.setAutoScheduling(new AutoSchedulingConfiguration()); + context.config.setScheduleRetryOnError(false); + context.config.setScheduleRetryDelay(java.time.Duration.parse("PT1H")); + storage.getClusterDao().addCluster(cluster); + UUID cf = storage.getRepairUnitDao().addRepairUnit( + RepairUnit.builder().clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) + .subrangeIncrementalRepair(incrementalRepair) + .nodes(nodeSet) + .datacenters(datacenters) + .blacklistedTables(blacklistedTables) + .repairThreadCount(repairThreadCount) + .timeout(segmentTimeout)) + .getId(); + DateTime initialActivationDate = DateTime.now(); + DateTime nextActivationDate = initialActivationDate.plusHours(2); + storage.getRepairScheduleDao().addRepairSchedule(RepairSchedule.builder(cf) + .daysBetween(1).intensity(1).segmentCountPerNode(64) + .nextActivation(nextActivationDate) + .repairParallelism(RepairParallelism.DATACENTER_AWARE) + ); + final Map endpointToHostIDMap = endpointToHostIDMap(); + RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf, null); + final UUID runId = run.getId(); + final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId(); + assertEquals(storage.getRepairSegmentDao() + .getRepairSegment(runId, segmentId).get().getState(), RepairSegment.State.NOT_STARTED); + final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl(); + when(jmx.getClusterName()).thenReturn(cluster.getName()); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps()); + when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap); + when(jmx.getTokens()).thenReturn(tokens); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class); + when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1"); + try { + when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1"); + } catch (UnknownHostException ex) { + throw new AssertionError(ex); + } + context.repairManager = RepairManager.create( + context, + Executors.newScheduledThreadPool(10), + 1, + TimeUnit.MILLISECONDS, + 1, + context.storage.getRepairRunDao()); + AtomicInteger repairNumberCounter = new AtomicInteger(1); + when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt())) + .then( + (invocation) -> { + final int repairNumber = repairNumberCounter.getAndIncrement(); + new Thread() { + @Override + public void run() { + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.STARTED), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.FINISHED), + Optional.empty(), + null, + jmx); + } + }.start(); + return repairNumber; + }); + context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) { + @Override + protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException { + return jmx; + } + }; + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + storage.getRepairRunDao().updateRepairRun( + run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(fourNodeClusterAfterBootstrap()); + context.repairManager.resumeRunningRepairRuns(); + await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + return RepairRun.RunState.ERROR == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); + RepairSchedule updatedRepairSchedule = storage.getRepairScheduleDao() + .getRepairSchedulesForClusterAndKeyspace(cluster.getName(), ksName) + .iterator().next(); + // Ensure that repair schedule has not been updated + assertEquals(updatedRepairSchedule.getNextActivation(), nextActivationDate); + } + + @Test + public void testAfterAdditiveChangeInTopologyNoRescheduleWhenScheduleRetryDelayIsLaterThanNextActivation() + throws ReaperException, IOException { + final String ksName = "reaper"; + final Set cfNames = Sets.newHashSet("reaper"); + final boolean incrementalRepair = false; + final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3"); + final Map nodeMap = ImmutableMap.of( + "127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1"); + final Set datacenters = Collections.emptySet(); + final Set blacklistedTables = Collections.emptySet(); + final double intensity = 0.5f; + final int repairThreadCount = 1; + final int segmentTimeout = 30; + final List tokens = THREE_TOKENS; + final IStorageDao storage = new MemoryStorageFacade(); + AppContext context = new AppContext(); + context.storage = storage; + context.config = new ReaperApplicationConfiguration(); + context.config.setAutoScheduling(new AutoSchedulingConfiguration()); + context.config.setScheduleRetryOnError(true); + context.config.setScheduleRetryDelay(java.time.Duration.parse("PT3H")); + storage.getClusterDao().addCluster(cluster); + UUID cf = storage.getRepairUnitDao().addRepairUnit( + RepairUnit.builder().clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) + .subrangeIncrementalRepair(incrementalRepair) + .nodes(nodeSet) + .datacenters(datacenters) + .blacklistedTables(blacklistedTables) + .repairThreadCount(repairThreadCount) + .timeout(segmentTimeout)) + .getId(); + DateTime initialActivationDate = DateTime.now(); + DateTime nextActivationDate = initialActivationDate.plusHours(2); + storage.getRepairScheduleDao().addRepairSchedule(RepairSchedule.builder(cf).daysBetween(1).intensity(1) + .segmentCountPerNode(64) + .nextActivation(nextActivationDate) + .repairParallelism(RepairParallelism.DATACENTER_AWARE) + ); + final Map endpointToHostIDMap = endpointToHostIDMap(); + RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf, null); + final UUID runId = run.getId(); + final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId(); + assertEquals(storage.getRepairSegmentDao() + .getRepairSegment(runId, segmentId).get().getState(), RepairSegment.State.NOT_STARTED); + final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl(); + when(jmx.getClusterName()).thenReturn(cluster.getName()); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps()); + when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap); + when(jmx.getTokens()).thenReturn(tokens); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class); + when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1"); + try { + when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1"); + } catch (UnknownHostException ex) { + throw new AssertionError(ex); + } + context.repairManager = RepairManager.create( + context, + Executors.newScheduledThreadPool(10), + 1, + TimeUnit.MILLISECONDS, + 1, + context.storage.getRepairRunDao()); + AtomicInteger repairNumberCounter = new AtomicInteger(1); + when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt())) + .then( + (invocation) -> { + final int repairNumber = repairNumberCounter.getAndIncrement(); + new Thread() { + @Override + public void run() { + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.STARTED), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.FINISHED), + Optional.empty(), + null, + jmx); + } + }.start(); + return repairNumber; + }); + context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) { + @Override + protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException { + return jmx; + } + }; + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + storage.getRepairRunDao().updateRepairRun( + run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(fourNodeClusterAfterBootstrap()); + context.repairManager.resumeRunningRepairRuns(); + await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + return RepairRun.RunState.ERROR == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); + RepairSchedule updatedRepairSchedule = storage.getRepairScheduleDao() + .getRepairSchedulesForClusterAndKeyspace(cluster.getName(), ksName) + .iterator().next(); + // Ensure that repair schedule has not been updated + assertEquals(updatedRepairSchedule.getNextActivation(), nextActivationDate); + } + + @Test + public void testSuccessAfterSubtractiveChangeInTopology() throws InterruptedException, ReaperException, + MalformedObjectNameException, ReflectionException, IOException { + final String ksName = "reaper"; + final Set cfNames = Sets.newHashSet("reaper"); + final boolean incrementalRepair = false; + final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3"); + final Map nodeMap = ImmutableMap.of("127.0.0.1", "dc1", + "127.0.0.2", "dc1", "127.0.0.3", "dc1"); + final Set datacenters = Collections.emptySet(); + final Set blacklistedTables = Collections.emptySet(); + final double intensity = 0.5f; + final int repairThreadCount = 1; + final int segmentTimeout = 30; + final List tokens = THREE_TOKENS; + final IStorageDao storage = new MemoryStorageFacade(); + AppContext context = new AppContext(); + context.storage = storage; + context.config = new ReaperApplicationConfiguration(); + storage.getClusterDao().addCluster(cluster); + UUID cf = storage.getRepairUnitDao().addRepairUnit( + RepairUnit.builder() + .clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) + .subrangeIncrementalRepair(incrementalRepair) + .nodes(nodeSet) + .datacenters(datacenters) + .blacklistedTables(blacklistedTables) + .repairThreadCount(repairThreadCount) + .timeout(segmentTimeout)) + .getId(); + final Map endpointToHostIDMap = endpointToHostIDMap(); + RepairRun run = addNewRepairRun(nodeMap, intensity, storage, cf, null); + final UUID runId = run.getId(); + final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId(); + assertEquals(storage.getRepairSegmentDao() + .getRepairSegment(runId, segmentId).get().getState(), RepairSegment.State.NOT_STARTED); + final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl(); + when(jmx.getClusterName()).thenReturn(cluster.getName()); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps()); + + when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap); + when(jmx.getTokens()).thenReturn(tokens); + EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class); + when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1"); + try { + when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1"); + } catch (UnknownHostException ex) { + throw new AssertionError(ex); + } + ClusterFacade clusterFacade = mock(ClusterFacade.class); + when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); + when(clusterFacade.nodeIsDirectlyAccessible(any(), any())).thenReturn(true); + when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any())) + .thenReturn(Lists.newArrayList(nodeSet)); + when(clusterFacade.getRangeToEndpointMap(any(), anyString())) + .thenReturn((Map) ImmutableMap.of( + Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSet), + Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSet))); + when(clusterFacade.getEndpointToHostId(any())).thenReturn(nodeMap); + when(clusterFacade.listActiveCompactions(any())).thenReturn( + CompactionStats.builder() + .withActiveCompactions(Collections.emptyList()) + .withPendingCompactions(Optional.of(0)) + .build()); + context.repairManager = RepairManager.create( + context, + clusterFacade, + Executors.newScheduledThreadPool(10), + 1, + TimeUnit.MILLISECONDS, + 1, + context.storage.getRepairRunDao()); + AtomicInteger repairNumberCounter = new AtomicInteger(1); + when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt())) + .then( + (invocation) -> { + final int repairNumber = repairNumberCounter.getAndIncrement(); + new Thread() { + @Override + public void run() { + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.STARTED), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.SESSION_SUCCESS), + Optional.empty(), + null, + jmx); + ((RepairStatusHandler) invocation.getArgument(5)) + .handle( + repairNumber, + Optional.of(ActiveRepairService.Status.FINISHED), + Optional.empty(), + null, + jmx); + } + }.start(); + return repairNumber; + }); + context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) { + @Override + protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException { + return jmx; + } + }; + assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + storage.getRepairRunDao().updateRepairRun( + run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); + when(jmx.getRangeToEndpointMap(anyString())).thenReturn(twoNodeClusterAfterBootstrap()); + context.repairManager.resumeRunningRepairRuns(); + // The repair run should succeed despite the topology change. Although token ranges change, + // they will become larger and still entirely enclose each previously calculated segment. + await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); + } + +} \ No newline at end of file From ecb06634517cf06aa7d78391ec3c20ed31f918a6 Mon Sep 17 00:00:00 2001 From: Andres Beck-Ruiz Date: Wed, 6 Nov 2024 17:08:40 -0500 Subject: [PATCH 2/3] Add config variables to cassandra-reaper.yaml --- src/packaging/resource/cassandra-reaper.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/packaging/resource/cassandra-reaper.yaml b/src/packaging/resource/cassandra-reaper.yaml index 04d933d4d..52307e157 100644 --- a/src/packaging/resource/cassandra-reaper.yaml +++ b/src/packaging/resource/cassandra-reaper.yaml @@ -32,6 +32,8 @@ activateQueryLogger: false jmxConnectionTimeoutInSeconds: 5 useAddressTranslator: false maxParallelRepairs: 2 +scheduleRetryOnError: false +scheduleRetryDelay: PT1H # purgeRecordsAfterInDays: 30 # numberOfRunsToKeepPerUnit: 10 From 6b1266f340a7ef96d1031dd4e377d8c70cfcce2f Mon Sep 17 00:00:00 2001 From: Andres Beck-Ruiz Date: Wed, 22 Jan 2025 11:19:18 -0500 Subject: [PATCH 3/3] Rebase and fix formatting --- src/server/src/checkstyle/suppressions.xml | 1 + .../service/RepairRunnerHangingTest.java | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/server/src/checkstyle/suppressions.xml b/src/server/src/checkstyle/suppressions.xml index 43c02d5dd..33566daed 100644 --- a/src/server/src/checkstyle/suppressions.xml +++ b/src/server/src/checkstyle/suppressions.xml @@ -20,5 +20,6 @@ + diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java index 566052ee1..70ea7fb13 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java @@ -248,9 +248,12 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); storage.getClusterDao().addCluster(cluster); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( - RepairUnit.builder().clusterName(cluster.getName()) - .keyspaceName("reaper").columnFamilies(cfNames) - .incrementalRepair(false).subrangeIncrementalRepair(false) + RepairUnit.builder() + .clusterName(cluster.getName()) + .keyspaceName("reaper") + .columnFamilies(cfNames) + .incrementalRepair(false) + .subrangeIncrementalRepair(false) .nodes(nodeSet) .datacenters(datacenters) .blacklistedTables(blacklistedTables) @@ -405,8 +408,10 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti DateTimeUtils.setCurrentMillisFixed(timeRun); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( RepairUnit.builder() - .clusterName(cluster.getName()).keyspaceName(ksName) - .columnFamilies(cfNames).incrementalRepair(incrementalRepair) + .clusterName(cluster.getName()) + .keyspaceName(ksName) + .columnFamilies(cfNames) + .incrementalRepair(incrementalRepair) .subrangeIncrementalRepair(incrementalRepair) .nodes(nodeSet) .datacenters(datacenters)