Skip to content

Commit

Permalink
More problems due to rebase...
Browse files Browse the repository at this point in the history
  • Loading branch information
Miles-Garnsey committed Aug 24, 2023
1 parent fa342ab commit d631cf6
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -750,105 +749,95 @@ public String repair(
@RpcParam(name = "repairThreadCount") Optional<Integer> repairThreadCount)
throws IOException {
// At least one keyspace is required
assert(keyspace != null);
assert (keyspace != null);
Map<String, String> options = new HashMap<>();
repairParallelism.map(rPar ->
options.put(
RepairOption.PARALLELISM_KEY, rPar.getName()
)
);
repairParallelism.map(rPar -> options.put(RepairOption.PARALLELISM_KEY, rPar.getName()));
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
repairThreadCount.map(tCount ->
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(tCount == 0 ? 1 : tCount))
);
repairThreadCount.map(
tCount ->
options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(tCount == 0 ? 1 : tCount)));
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
associatedTokens.map(aTokens ->
options.put(
RepairOption.RANGES_KEY,
StringUtils.join(
aTokens
.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
",")
)
);
}
datacenters.map( dcs ->
options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ","))
);
associatedTokens.map(
aTokens ->
options.put(
RepairOption.RANGES_KEY,
StringUtils.join(
aTokens.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
",")));
}
datacenters.map(dcs -> options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ",")));

// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options);
// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options);

if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}
if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}

String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);
String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}
if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);

return job.getJobId();
}
return job.getJobId();
}

@Rpc(name = "move")
public String move(
Expand All @@ -868,4 +857,4 @@ public String move(

return submitJob("move", moveOperation, async);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

public final class RingRange {

public static final Comparator<RingRange> START_COMPARATOR
= (RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start);
public static final Comparator<RingRange> START_COMPARATOR =
(RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start);

public final BigInteger start;
public final BigInteger end;
Expand All @@ -34,4 +34,4 @@ public BigInteger getStart() {
public BigInteger getEnd() {
return end;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ public Response repair(RepairRequest repairRequest) {
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
// The default repair does not allow for specifying things like parallelism, threadCounts, source DCs or ranges etc.
// The default repair does not allow for specifying things like parallelism,
// threadCounts, source DCs or ranges etc.
Optional.empty(),
Optional.empty(),
Optional.empty(),
Expand Down Expand Up @@ -642,4 +643,4 @@ public Response move(@QueryParam(value = "newToken") String newToken) {
+ " \"language\": null,\n"
+ " \"encoding\": null\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -154,11 +155,15 @@ public Response repair(RepairRequest repairRequest) {
ResponseTools.getSingleRowStringResponse(
app.dbUnixSocketFile,
app.cqlService,
"CALL NodeOps.repair(?, ?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
true))
true,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()))
.build();
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
package com.datastax.mgmtapi.resources.v2;

import javax.ws.rs.Consumes;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.datastax.mgmtapi.ManagementApplication;
import com.datastax.mgmtapi.resources.common.BaseResources;
import com.datastax.mgmtapi.resources.v2.models.RepairRequest;
import com.datastax.mgmtapi.resources.v2.models.RepairRequestResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/api/v2/repairs")
public class RepairResources extends BaseResources {

private static final ObjectMapper jsonMapper = new ObjectMapper();

public RepairResources(ManagementApplication application) {
super(application);
}
Expand All @@ -47,6 +44,17 @@ public RepairResources(ManagementApplication application) {
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(implementation = Response.Status.class),
examples = @ExampleObject(value = "keyspace must be specified")))
@ApiResponse(
responseCode = "500",
description = "internal error, we did not receive the expected repair ID from Cassandra.",
content =
@Content(
mediaType = MediaType.TEXT_PLAIN,
schema = @Schema(implementation = Response.Status.class),
examples =
@ExampleObject(
value =
"internal error, we did not receive the expected repair ID from Cassandra.")))
public final Response repair(RepairRequest request) {
return handle(
() -> {
Expand All @@ -55,19 +63,27 @@ public final Response repair(RepairRequest request) {
.entity("keyspaceName must be specified")
.build();
}
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full);
ResultSet res =
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)",
request.keyspace,
request.tables,
request.fullRepair,
request.notifications,
request.repairParallelism,
request.datacenters,
request.associatedTokens,
request.repairThreadCount);

return Response.ok("OK").build();
try {
String repairID = res.one().getString(0);
return Response.accepted(new RepairRequestResponse(repairID)).build();
} catch (Exception e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("Repair request failed: " + e.getMessage())
.build();
}
});

String repairID = ""; // TODO: implement me
return Response.ok(new RepairRequestResponse(repairID)).build();
}


}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.datastax.mgmtapi.resources.v2.models;


import com.fasterxml.jackson.annotation.JsonValue;

public enum RepairParallelism {
Expand Down Expand Up @@ -30,4 +29,4 @@ public String getName() {
public String toString() {
return this.getName();
}
}
}
Loading

0 comments on commit d631cf6

Please sign in to comment.