Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail repair after additive change in cluster topology #1521

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/packaging/resource/cassandra-reaper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ activateQueryLogger: false
jmxConnectionTimeoutInSeconds: 5
useAddressTranslator: false
maxParallelRepairs: 2
scheduleRetryOnError: false
scheduleRetryDelay: PT1H
# purgeRecordsAfterInDays: 30
# numberOfRunsToKeepPerUnit: 10

Expand Down
2 changes: 2 additions & 0 deletions src/server/src/main/docker/cassandra-reaper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ repairManagerSchedulingIntervalSeconds: ${REAPER_REPAIR_MANAGER_SCHEDULING_INTER
jmxConnectionTimeoutInSeconds: ${REAPER_JMX_CONNECTION_TIMEOUT_IN_SECONDS}
useAddressTranslator: ${REAPER_USE_ADDRESS_TRANSLATOR}
maxParallelRepairs: ${REAPER_MAX_PARALLEL_REPAIRS}
scheduleRetryOnError: ${REAPER_SCHEDULE_RETRY_ON_ERROR:-false}
scheduleRetryDelay: ${REAPER_SCHEDULE_RETRY_DELAY:-PT1H}

# datacenterAvailability has three possible values: ALL | LOCAL | EACH
# the correct value to use depends on whether jmx ports to C* nodes in remote datacenters are accessible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ public final class ReaperApplicationConfiguration extends Configuration {
@Nullable
private String persistenceStoragePath;

@JsonProperty
private Boolean scheduleRetryOnError;

@JsonProperty
private Duration scheduleRetryDelay;

public HttpManagement getHttpManagement() {
return httpManagement;
}
Expand Down Expand Up @@ -533,6 +539,22 @@ public String getPersistenceStoragePath() {
return persistenceStoragePath;
}

public Boolean isScheduleRetryOnError() {
return scheduleRetryOnError != null ? scheduleRetryOnError : false;
}

public void setScheduleRetryOnError(Boolean scheduleRetryOnError) {
this.scheduleRetryOnError = scheduleRetryOnError;
}

public Duration getScheduleRetryDelay() {
return scheduleRetryDelay != null ? scheduleRetryDelay : Duration.ofMinutes(60);
}

public void setScheduleRetryDelay(Duration scheduleRetryDelay) {
this.scheduleRetryDelay = scheduleRetryDelay;
}

public enum DatacenterAvailability {
/* We require direct JMX access to all nodes across all datacenters */
ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,42 @@ private void endRepairRun() {
}
}

private void maybeScheduleRetryOnError() {
if (context.config.isScheduleRetryOnError()) {
// Check if a schedule exists for this keyspace and cluster before rescheduling
Collection<RepairSchedule> schedulesForKeyspace
= context.storage.getRepairScheduleDao()
.getRepairSchedulesForClusterAndKeyspace(clusterName, repairUnit.getKeyspaceName());
List<RepairSchedule> repairSchedules = schedulesForKeyspace.stream()
.filter(schedule -> schedule.getRepairUnitId().equals(repairUnit.getId()))
.collect(Collectors.toList());

if (!repairSchedules.isEmpty()) {
// Set precondition that only a single schedule should match
Preconditions.checkArgument(repairSchedules.size() == 1,
String.format("Update for repair run %s and unit %s "
+ "should impact a single schedule. %d were found",
repairRunId,
repairUnit.getId(),
repairSchedules.size())
);
RepairSchedule scheduleToTune = repairSchedules.get(0);

int minuteRetryDelay = (int) context.config.getScheduleRetryDelay().toMinutes();
DateTime nextRepairRun = DateTime.now().plusMinutes(minuteRetryDelay);

if (nextRepairRun.isBefore(scheduleToTune.getNextActivation())) {
LOG.debug("Scheduling next repair run at {} for repair schedule {}", nextRepairRun,
scheduleToTune.getId());

RepairSchedule newSchedule
= scheduleToTune.with().nextActivation(nextRepairRun).build(scheduleToTune.getId());
context.storage.getRepairScheduleDao().updateRepairSchedule(newSchedule);
}
}
}
}

/**
* Tune segment timeout and number of segments for adaptive schedules.
* Checks that the run was triggered by an adaptive schedule and gathers info on the run to apply tunings.
Expand Down Expand Up @@ -508,6 +544,9 @@ private void startNextSegment() throws ReaperException, InterruptedException {
} else {
potentialReplicas.addAll(potentialReplicaMap.keySet());
}
if (potentialReplicas.isEmpty()) {
failRepairDueToOutdatedSegment(segment.getId(), segment.getTokenRange());
}
LOG.debug("Potential replicas for segment {}: {}", segment.getId(), potentialReplicas);
ICassandraManagementProxy coordinator = clusterFacade.connect(cluster, potentialReplicas);
if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) {
Expand Down Expand Up @@ -684,26 +723,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection<
return true;
}
if (potentialCoordinators.isEmpty()) {
LOG.warn(
"Segment #{} is faulty, no potential coordinators for range: {}",
segmentId,
segment.toString());
// This segment has a faulty token range. Abort the entire repair run.
synchronized (this) {
repairRunDao.updateRepairRun(
repairRunDao.getRepairRun(repairRunId).get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", segment))
.endTime(DateTime.now())
.build(repairRunId));

context.metricRegistry.counter(
MetricRegistry.name(RepairManager.class, "repairDone", RepairRun.RunState.ERROR.toString())).inc();

killAndCleanupRunner();
}

failRepairDueToOutdatedSegment(segmentId, segment);
return false;
}
} else {
Expand Down Expand Up @@ -754,6 +774,35 @@ public void onFailure(Throwable throwable) {
return true;
}

private synchronized void failRepairDueToOutdatedSegment(UUID segmentId, Segment segment) {
// This segment has a faulty token range possibly due to an additive change in cluster topology
// during repair. Abort the entire repair run.
LOG.warn("Segment #{} is faulty, no potential coordinators for range: {}", segmentId,
segment.toString());
// If the segment has been removed, ignore. Should only happen in tests on backends
// that delete repair segments.
Optional<RepairRun> repairRun = repairRunDao.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
try {
repairRunDao.updateRepairRun(
repairRunDao.getRepairRun(repairRunId).get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", segment))
.endTime(DateTime.now())
.build(repairRunId));

context.metricRegistry.counter(
MetricRegistry.name(RepairManager.class, "repairDone",
RepairRun.RunState.ERROR.toString())).inc();

maybeScheduleRetryOnError();
} finally {
killAndCleanupRunner();
}
}
}

private List<String> filterPotentialCoordinatorsByDatacenters(
Collection<String> datacenters,
List<String> potentialCoordinators) throws ReaperException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public final class RepairRunnerHangingTest {
Expand Down Expand Up @@ -245,12 +247,9 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM
final IStorageDao storage = new MemoryStorageFacade();
storage.getClusterDao().addCluster(cluster);
RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName("reaper")
.columnFamilies(cfNames)
.incrementalRepair(false)
.subrangeIncrementalRepair(false)
RepairUnit.builder().clusterName(cluster.getName())
.keyspaceName("reaper").columnFamilies(cfNames)
.incrementalRepair(false).subrangeIncrementalRepair(false)
.nodes(nodeSet)
.datacenters(datacenters)
.blacklistedTables(blacklistedTables)
Expand All @@ -266,7 +265,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM
Collections.singleton(
RepairSegment.builder(
Segment.builder()
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("50")))
.withReplicas(replicas)
.build(),
cf.getId())));
Expand Down Expand Up @@ -382,6 +381,9 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
}
});
assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
verify(jmx, times(2)).triggerRepair(
any(), any(), any(), any(), any(), any(), any(), anyInt()
);
}

@Test
Expand All @@ -402,10 +404,8 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti
DateTimeUtils.setCurrentMillisFixed(timeRun);
RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
.clusterName(cluster.getName()).keyspaceName(ksName)
.columnFamilies(cfNames).incrementalRepair(incrementalRepair)
.subrangeIncrementalRepair(incrementalRepair)
.nodes(nodeSet)
.datacenters(datacenters)
Expand All @@ -420,7 +420,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti
Collections.singleton(
RepairSegment.builder(
Segment.builder()
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("50")))
.withReplicas(replicas)
.build(),
cf.getId())));
Expand Down Expand Up @@ -534,6 +534,9 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
}
});
assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
verify(jmx, times(2)).triggerRepair(
any(), any(), any(), any(), any(), any(), any(), anyInt()
);
}

@Test
Expand Down
Loading
Loading