Skip to content

Commit

Permalink
Allow repair() RPC method to take more arguments (as required by Re…
Browse files Browse the repository at this point in the history
…aper). Get the old NodeOpsResources class calling the new RPC method correctly.
  • Loading branch information
Miles-Garnsey committed Aug 23, 2023
1 parent e84942f commit 30e2606
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastax.mgmtapi.rpc.Rpc;
import com.datastax.mgmtapi.rpc.RpcParam;
import com.datastax.mgmtapi.rpc.RpcRegistry;
import com.datastax.mgmtapi.rpc.models.RingRange;
import com.datastax.mgmtapi.util.Job;
import com.datastax.mgmtapi.util.JobExecutor;
import com.datastax.oss.driver.api.core.CqlIdentifier;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -739,34 +741,45 @@ public void repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "beginToken") BigInteger beginToken,
@RpcParam(name = "endToken") BigInteger endToken,
@RpcParam(name = "repairParallelism") RepairParallelism repairParallelism,
@RpcParam(name = "datacenters") Collection<String> datacenters,
@RpcParam(name = "associatedTokens") List<RingRange> associatedTokens,
@RpcParam(name = "repairThreadCount") int repairThreadCount)
@RpcParam(name = "associatedTokens") Optional<List<RingRange>> associatedTokens,
@RpcParam(name = "repairParallelism") Optional<RepairParallelism> repairParallelism,
@RpcParam(name = "datacenters") Optional<Collection<String>> datacenters,
@RpcParam(name = "repairThreadCount") Optional<Integer> repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
Map<String, String> options = new HashMap<>();
options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName());
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(repairThreadCount == 0 ? 1 : repairThreadCount));
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
assert(keyspace != null);
Map<String, String> options = new HashMap<>();
repairParallelism.map(rPar ->
options.put(
RepairOption.PARALLELISM_KEY, rPar.getName()
)
);
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
repairThreadCount.map(tCount ->
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(tCount == 0 ? 1 : tCount))
);
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
associatedTokens.map(aTokens ->
options.put(
RepairOption.RANGES_KEY,
StringUtils.join(
associatedTokens
aTokens
.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
","));
}
",")
)
);
}

datacenters.map( dcs ->
options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ","))
);
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options);
}

@Rpc(name = "move")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -502,10 +503,15 @@ public Response repair(RepairRequest repairRequest) {
}
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full);
repairRequest.full,
// The default repair does not allow for specifying things like parallelism, threadCounts, source DCs or ranges etc.
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());

return Response.ok("OK").build();
});
Expand Down Expand Up @@ -704,4 +710,4 @@ public Response move(@QueryParam(value = "newToken") String newToken) {
+ " \"language\": null,\n"
+ " \"encoding\": null\n"
+ "}";
}
}

0 comments on commit 30e2606

Please sign in to comment.