Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reaper endpoints: Async Repair Endpoint #358

Merged
merged 17 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastax.mgmtapi.rpc.Rpc;
import com.datastax.mgmtapi.rpc.RpcParam;
import com.datastax.mgmtapi.rpc.RpcRegistry;
import com.datastax.mgmtapi.rpc.models.RingRange;
import com.datastax.mgmtapi.util.Job;
import com.datastax.mgmtapi.util.JobExecutor;
import com.datastax.oss.driver.api.core.CqlIdentifier;
Expand All @@ -34,6 +35,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -54,6 +56,7 @@
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -737,98 +740,106 @@ public void clearSnapshots(
@Rpc(name = "repair")
public String repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "tables") Optional<List<String>> tables,
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "notifications") boolean notifications)
@RpcParam(name = "notifications") boolean notifications,
@RpcParam(name = "repairParallelism") Optional<RepairParallelism> repairParallelism,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue:
There are 2 issues here:

  • RepairParallelism and Optional are not included in the various com.datastax.mgmtapi.rpc.GenericSerializer classes
  • RepairParallelism is a class that exists in Cassandra (org.apache.cassandra.repair.RepairParallelism) and in the Management API server code (com.datastax.mgmtapi.resources.v2.models.RepairParallelism)

We could add the classes to the GenericSerializers, but that's a slippery slope. Right now, it supports Java primitives and basic collection types.

The second issue is more of a confusing thing. The v2 model object looks a lot like the Cassandra object, but they are not the same. And in the Resource code, we are not sending the Cassandra object, we are sending the V2 model object. So this will never deserialize correctly since we are expecting the Cassandra object in this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second issue is more of a confusing thing. The v2 model object looks a lot like the Cassandra object, but they are not the same. And in the Resource code, we are not sending the Cassandra object, we are sending the V2 model object. So this will never deserialize correctly since we are expecting the Cassandra object in this method.

Woops... Sorry, I should have spotted that one :(

Wouldn't we be better off adding a generic serializer for the collections interface to make things more generic? I have to be honest, I do not like the way management API is not using the type system very much throughout. I understand some of that is unavoidable due to the way the RPC calls work, but if we can't even serialise our own types, that is unfortunate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I need to understand why we aren't just serializing straight to json or using some off the shelf serialization here - was there a reason for writing our own serializer?

@RpcParam(name = "datacenters") Optional<Collection<String>> datacenters,
Miles-Garnsey marked this conversation as resolved.
Show resolved Hide resolved
@RpcParam(name = "associatedTokens") Optional<List<RingRange>> associatedTokens,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue:
We likely have the same GenericSerialization issue for RingRanges as we do for RepairParallelism above

@RpcParam(name = "repairThreadCount") Optional<Integer> repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
// create the repair spec
Map<String, String> repairSpec = new HashMap<>();

// add any specified tables to the repair spec
if (tables != null && !tables.isEmpty()) {
// set the tables/column families
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
}

// handle incremental vs full
boolean isIncremental = Boolean.FALSE.equals(full);
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental));
if (isIncremental) {
// incremental repairs will fail if parallelism is not set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
}

// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);

if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}
assert (keyspace != null);
Map<String, String> repairSpec = new HashMap<>();
repairParallelism.map(rPar -> repairSpec.put(RepairOption.PARALLELISM_KEY, rPar.getName()));
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
repairThreadCount.map(
tCount ->
repairSpec.put(
RepairOption.JOB_THREADS_KEY, Integer.toString(tCount == 0 ? 1 : tCount)));
repairSpec.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
tables.map(
tabs -> repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ",")));
if (full) {
associatedTokens.map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue:
Probably need to guard against a null associatedTokens here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think input sanitization should be handled in the Resources layer, no? By the time it gets to the NodeOpsProvider I'd rather have everything in Optionals, but tell me if you disagree. I do wonder if introducing Optionals here just adds more complexity as now we have a third type of nullity.

aTokens ->
repairSpec.put(
RepairOption.RANGES_KEY,
StringUtils.join(
aTokens.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
",")));
}
datacenters.map(
dcs -> repairSpec.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ",")));

String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);
// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}
if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}

ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);
String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

throw new RuntimeException("At least one keyspace must be defined");
ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);

return job.getJobId();
}

@Rpc(name = "move")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright DataStax, Inc.
*
* Please see the included license file for details.
*/

package com.datastax.mgmtapi.rpc.models;

import java.math.BigInteger;
import java.util.Comparator;

public final class RingRange {

public static final Comparator<RingRange> START_COMPARATOR =
(RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start);

public final BigInteger start;
public final BigInteger end;

public RingRange(BigInteger start, BigInteger end) {
this.start = start;
this.end = end;
}

public RingRange(String... range) {
start = new BigInteger(range[0]);
end = new BigInteger(range[1]);
}

public BigInteger getStart() {
return start;
}

public BigInteger getEnd() {
return end;
}
}
102 changes: 99 additions & 3 deletions management-api-server/doc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,58 @@
},
"summary" : "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version). This operation is asynchronous and returns immediately."
}
},
"/api/v2/repairs" : {
"put" : {
"operationId" : "putRepairV2",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/RepairRequest"
}
}
}
},
"responses" : {
"200" : {
"content" : {
"application/json" : {
"example" : "OK",
"schema" : {
"$ref" : "#/components/schemas/RepairRequestResponse"
}
}
},
"description" : "Repair Successfully requested"
},
"400" : {
"content" : {
"text/plain" : {
"example" : "keyspace must be specified",
"schema" : {
"type" : "string",
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
}
}
},
"description" : "Repair request missing Keyspace name"
},
"500" : {
"content" : {
"text/plain" : {
"example" : "internal error, we did not receive the expected repair ID from Cassandra.",
"schema" : {
"type" : "string",
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
}
}
},
"description" : "internal error, we did not receive the expected repair ID from Cassandra."
}
},
"summary" : "Initiate a new repair"
}
}
},
"components" : {
Expand Down Expand Up @@ -1904,20 +1956,52 @@
"RepairRequest" : {
"type" : "object",
"properties" : {
"full" : {
"associated_tokens" : {
"type" : "array",
"items" : {
"$ref" : "#/components/schemas/RingRange"
}
},
"datacenters" : {
"type" : "array",
"items" : {
"type" : "string"
}
},
"full_repair" : {
"type" : "boolean"
},
"keyspace_name" : {
"keyspace" : {
"type" : "string"
},
"notifications" : {
"type" : "boolean"
},
"repair_parallelism" : {
"type" : "string",
"enum" : [ "sequential", "parallel", "dc_parallel" ]
},
"repair_thread_count" : {
"type" : "integer",
"format" : "int32"
},
"tables" : {
"type" : "array",
"items" : {
"type" : "string"
}
}
},
"required" : [ "keyspace_name" ]
"required" : [ "keyspace" ]
},
"RepairRequestResponse" : {
"type" : "object",
"properties" : {
"repair_id" : {
"type" : "string"
}
},
"required" : [ "repair_id" ]
},
"ReplicationSetting" : {
"type" : "object",
Expand All @@ -1932,6 +2016,18 @@
},
"required" : [ "dc_name", "replication_factor" ]
},
"RingRange" : {
"type" : "object",
"properties" : {
"end" : {
"type" : "integer"
},
"start" : {
"type" : "integer"
}
},
"required" : [ "end", "start" ]
},
"ScrubRequest" : {
"type" : "object",
"properties" : {
Expand Down
Loading