Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reaper endpoints: Async Repair Endpoint #358

Merged
merged 17 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -739,96 +739,123 @@ public String repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "notifications") boolean notifications)
@RpcParam(name = "notifications") boolean notifications,
@RpcParam(name = "repairParallelism") String repairParallelism,
@RpcParam(name = "datacenters") List<String> datacenters,
@RpcParam(name = "associatedTokens") String ringRangeString,
@RpcParam(name = "repairThreadCount") Integer repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
// create the repair spec
Map<String, String> repairSpec = new HashMap<>();

// add any specified tables to the repair spec
if (tables != null && !tables.isEmpty()) {
// set the tables/column families
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
assert (keyspace != null);
Map<String, String> repairSpec = new HashMap<>();
// add tables/column families
if (tables != null && !tables.isEmpty()) {
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
}
// set incremental reapir
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
// Parallelism should be set if it's requested OR if incremental repair is requested.
if (!full) {
// Incremental repair requested, make sure parallelism is correct
if (repairParallelism != null
&& !RepairParallelism.PARALLEL.getName().equals(repairParallelism)) {
throw new IOException(
"Invalid repair combination. Incremental repair if Parallelism is not set");
}

// handle incremental vs full
boolean isIncremental = Boolean.FALSE.equals(full);
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental));
if (isIncremental) {
// incremental repairs will fail if parallelism is not set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
// Incremental repair and parallelism should be set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
}
if (repairThreadCount != null) {
// if specified, the value should be at least 1
if (repairThreadCount.compareTo(Integer.valueOf(0)) <= 0) {
throw new IOException(
"Invalid repari thread count: "
+ repairThreadCount
+ ". Value should be greater than 0");
}
repairSpec.put(RepairOption.JOB_THREADS_KEY, repairThreadCount.toString());
}
repairSpec.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));

// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);
if (ringRangeString != null && !ringRangeString.isEmpty()) {
repairSpec.put(RepairOption.RANGES_KEY, ringRangeString);
}
// add datacenters to the repair spec
if (datacenters != null && !datacenters.isEmpty()) {
repairSpec.put(RepairOption.DATACENTERS_KEY, String.join(",", datacenters));
}

if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}
// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);

String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);
if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);
String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

throw new RuntimeException("At least one keyspace must be defined");
ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);

return job.getJobId();
}

@Rpc(name = "stopAllRepairs")
public void stopAllRepairs() {
ShimLoader.instance.get().getStorageService().forceTerminateAllRepairSessions();
}

@Rpc(name = "move")
Expand Down
119 changes: 116 additions & 3 deletions management-api-server/doc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,75 @@
},
"summary" : "Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version). This operation is asynchronous and returns immediately."
}
},
"/api/v2/repairs" : {
"delete" : {
"operationId" : "deleteRepairsV2",
"responses" : {
"202" : {
"content" : {
"application/json" : {
"example" : "Accepted",
"schema" : {
"$ref" : "#/components/schemas/RepairRequestResponse"
}
}
},
"description" : "Cancel repairs Successfully requested"
}
},
"summary" : "Cancel all repairs"
},
"put" : {
"operationId" : "putRepairV2",
"requestBody" : {
"content" : {
"application/json" : {
"schema" : {
"$ref" : "#/components/schemas/RepairRequest"
}
}
}
},
"responses" : {
"202" : {
"content" : {
"application/json" : {
"example" : "Accepted",
"schema" : {
"$ref" : "#/components/schemas/RepairRequestResponse"
}
}
},
"description" : "Repair Successfully requested"
},
"400" : {
"content" : {
"text/plain" : {
"example" : "keyspace must be specified",
"schema" : {
"type" : "string",
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
}
}
},
"description" : "Repair request missing Keyspace name"
},
"500" : {
"content" : {
"text/plain" : {
"example" : "internal error, we did not receive the expected repair ID from Cassandra.",
"schema" : {
"type" : "string",
"enum" : [ "OK", "Created", "Accepted", "No Content", "Reset Content", "Partial Content", "Moved Permanently", "Found", "See Other", "Not Modified", "Use Proxy", "Temporary Redirect", "Bad Request", "Unauthorized", "Payment Required", "Forbidden", "Not Found", "Method Not Allowed", "Not Acceptable", "Proxy Authentication Required", "Request Timeout", "Conflict", "Gone", "Length Required", "Precondition Failed", "Request Entity Too Large", "Request-URI Too Long", "Unsupported Media Type", "Requested Range Not Satisfiable", "Expectation Failed", "Precondition Required", "Too Many Requests", "Request Header Fields Too Large", "Internal Server Error", "Not Implemented", "Bad Gateway", "Service Unavailable", "Gateway Timeout", "HTTP Version Not Supported", "Network Authentication Required" ]
}
}
},
"description" : "internal error, we did not receive the expected repair ID from Cassandra."
}
},
"summary" : "Initiate a new repair"
}
}
},
"components" : {
Expand Down Expand Up @@ -1904,20 +1973,52 @@
"RepairRequest" : {
"type" : "object",
"properties" : {
"full" : {
"associated_tokens" : {
"type" : "array",
"items" : {
"$ref" : "#/components/schemas/RingRange"
}
},
"datacenters" : {
"type" : "array",
"items" : {
"type" : "string"
}
},
"full_repair" : {
"type" : "boolean"
},
"keyspace_name" : {
"keyspace" : {
"type" : "string"
},
"notifications" : {
"type" : "boolean"
},
"repair_parallelism" : {
"type" : "string",
"enum" : [ "sequential", "parallel", "dc_parallel" ]
},
"repair_thread_count" : {
"type" : "integer",
"format" : "int32"
},
"tables" : {
"type" : "array",
"items" : {
"type" : "string"
}
}
},
"required" : [ "keyspace_name" ]
"required" : [ "keyspace" ]
},
"RepairRequestResponse" : {
"type" : "object",
"properties" : {
"repair_id" : {
"type" : "string"
}
},
"required" : [ "repair_id" ]
},
"ReplicationSetting" : {
"type" : "object",
Expand All @@ -1932,6 +2033,18 @@
},
"required" : [ "dc_name", "replication_factor" ]
},
"RingRange" : {
"type" : "object",
"properties" : {
"end" : {
"type" : "integer"
},
"start" : {
"type" : "integer"
}
},
"required" : [ "end", "start" ]
},
"ScrubRequest" : {
"type" : "object",
"properties" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,17 @@ public Response repair(RepairRequest repairRequest) {
}
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
false);
false,
// The default repair does not allow for specifying things like parallelism,
// threadCounts, source DCs or ranges etc.
null,
null,
null,
null);

return Response.ok("OK").build();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ protected Response handle(Callable<Response> action) {
.entity("Internal connection to Cassandra closed")
.build();
} catch (Throwable t) {
t.printStackTrace();
return Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR)
.entity(t.getLocalizedMessage())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,15 @@ public Response repair(RepairRequest repairRequest) {
ResponseTools.getSingleRowStringResponse(
app.dbUnixSocketFile,
app.cqlService,
"CALL NodeOps.repair(?, ?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
true))
true,
null,
null,
null,
null))
.build();
});
}
Expand Down
Loading