From d631cf6aa8fd3f7db625a9620bc329226eec6e14 Mon Sep 17 00:00:00 2001 From: Miles-Garnsey Date: Wed, 23 Aug 2023 19:14:02 +1000 Subject: [PATCH] More problems due to rebase... --- .../com/datastax/mgmtapi/NodeOpsProvider.java | 167 ++++++++---------- .../mgmtapi/rpc/models/RingRange.java | 6 +- .../mgmtapi/resources/NodeOpsResources.java | 5 +- .../resources/v1/NodeOpsResources.java | 9 +- .../mgmtapi/resources/v2/RepairResources.java | 62 ++++--- .../v2/models/RepairParallelism.java | 3 +- .../resources/v2/models/RepairRequest.java | 49 +++-- .../v2/models/RepairRequestResponse.java | 9 +- .../resources/v2/models/RingRange.java | 10 +- 9 files changed, 171 insertions(+), 149 deletions(-) diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java index 1c573550..fee9ca92 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java @@ -25,7 +25,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import java.io.IOException; -import java.math.BigInteger; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -750,105 +749,95 @@ public String repair( @RpcParam(name = "repairThreadCount") Optional repairThreadCount) throws IOException { // At least one keyspace is required - assert(keyspace != null); + assert (keyspace != null); Map options = new HashMap<>(); - repairParallelism.map(rPar -> - options.put( - RepairOption.PARALLELISM_KEY, rPar.getName() - ) - ); + repairParallelism.map(rPar -> options.put(RepairOption.PARALLELISM_KEY, rPar.getName())); options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full)); - repairThreadCount.map(tCount -> - options.put( - RepairOption.JOB_THREADS_KEY, - Integer.toString(tCount == 0 ? 1 : tCount)) - ); + repairThreadCount.map( + tCount -> + options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(tCount == 0 ? 1 : tCount))); options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE)); options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ",")); if (full) { - associatedTokens.map(aTokens -> - options.put( - RepairOption.RANGES_KEY, - StringUtils.join( - aTokens - .stream() - .map(token -> token.getStart() + ":" + token.getEnd()) - .collect(Collectors.toList()), - ",") - ) - ); - } - datacenters.map( dcs -> - options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ",")) - ); + associatedTokens.map( + aTokens -> + options.put( + RepairOption.RANGES_KEY, + StringUtils.join( + aTokens.stream() + .map(token -> token.getStart() + ":" + token.getEnd()) + .collect(Collectors.toList()), + ","))); + } + datacenters.map(dcs -> options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ","))); - // Since Cassandra provides us with a async, we don't need to use our executor interface for - // this. - final int repairJobId = - ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options); + // Since Cassandra provides us with a async, we don't need to use our executor interface for + // this. + final int repairJobId = + ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options); - if (!notifications) { - return Integer.valueOf(repairJobId).toString(); - } + if (!notifications) { + return Integer.valueOf(repairJobId).toString(); + } - String jobId = String.format("repair-%d", repairJobId); - final Job job = service.createJob("repair", jobId); + String jobId = String.format("repair-%d", repairJobId); + final Job job = service.createJob("repair", jobId); - if (repairJobId == 0) { - // Job is done and won't continue - job.setStatusChange(ProgressEventType.COMPLETE, ""); - job.setStatus(Job.JobStatus.COMPLETED); - job.setFinishedTime(System.currentTimeMillis()); - service.updateJob(job); - return job.getJobId(); - } + if (repairJobId == 0) { + // Job is done and won't continue + job.setStatusChange(ProgressEventType.COMPLETE, ""); + job.setStatus(Job.JobStatus.COMPLETED); + job.setFinishedTime(System.currentTimeMillis()); + service.updateJob(job); + return job.getJobId(); + } - ShimLoader.instance - .get() - .getStorageService() - .addNotificationListener( - (notification, handback) -> { - if (notification.getType().equals("progress")) { - Map data = (Map) notification.getUserData(); - ProgressEventType progress = ProgressEventType.values()[data.get("type")]; - - switch (progress) { - case START: - job.setStatusChange(progress, notification.getMessage()); - job.setStartTime(System.currentTimeMillis()); - break; - case NOTIFICATION: - case PROGRESS: - break; - case ERROR: - case ABORT: - job.setError(new RuntimeException(notification.getMessage())); - job.setStatus(Job.JobStatus.ERROR); - job.setFinishedTime(System.currentTimeMillis()); - break; - case SUCCESS: - job.setStatusChange(progress, notification.getMessage()); - // SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that) - break; - case COMPLETE: - job.setStatusChange(progress, notification.getMessage()); - job.setStatus(Job.JobStatus.COMPLETED); - job.setFinishedTime(System.currentTimeMillis()); - break; - } - service.updateJob(job); + ShimLoader.instance + .get() + .getStorageService() + .addNotificationListener( + (notification, handback) -> { + if (notification.getType().equals("progress")) { + Map data = (Map) notification.getUserData(); + ProgressEventType progress = ProgressEventType.values()[data.get("type")]; + + switch (progress) { + case START: + job.setStatusChange(progress, notification.getMessage()); + job.setStartTime(System.currentTimeMillis()); + break; + case NOTIFICATION: + case PROGRESS: + break; + case ERROR: + case ABORT: + job.setError(new RuntimeException(notification.getMessage())); + job.setStatus(Job.JobStatus.ERROR); + job.setFinishedTime(System.currentTimeMillis()); + break; + case SUCCESS: + job.setStatusChange(progress, notification.getMessage()); + // SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that) + break; + case COMPLETE: + job.setStatusChange(progress, notification.getMessage()); + job.setStatus(Job.JobStatus.COMPLETED); + job.setFinishedTime(System.currentTimeMillis()); + break; } - }, - (NotificationFilter) - notification -> { - final int repairNo = - Integer.parseInt(((String) notification.getSource()).split(":")[1]); - return repairNo == repairJobId; - }, - null); + service.updateJob(job); + } + }, + (NotificationFilter) + notification -> { + final int repairNo = + Integer.parseInt(((String) notification.getSource()).split(":")[1]); + return repairNo == repairJobId; + }, + null); - return job.getJobId(); - } + return job.getJobId(); + } @Rpc(name = "move") public String move( @@ -868,4 +857,4 @@ public String move( return submitJob("move", moveOperation, async); } -} \ No newline at end of file +} diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/rpc/models/RingRange.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/rpc/models/RingRange.java index 8d81b8ed..57fc1065 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/rpc/models/RingRange.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/rpc/models/RingRange.java @@ -11,8 +11,8 @@ public final class RingRange { - public static final Comparator START_COMPARATOR - = (RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start); + public static final Comparator START_COMPARATOR = + (RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start); public final BigInteger start; public final BigInteger end; @@ -34,4 +34,4 @@ public BigInteger getStart() { public BigInteger getEnd() { return end; } -} \ No newline at end of file +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java index 2337d548..a90d1511 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java @@ -508,7 +508,8 @@ public Response repair(RepairRequest repairRequest) { repairRequest.keyspaceName, repairRequest.tables, repairRequest.full, - // The default repair does not allow for specifying things like parallelism, threadCounts, source DCs or ranges etc. + // The default repair does not allow for specifying things like parallelism, + // threadCounts, source DCs or ranges etc. Optional.empty(), Optional.empty(), Optional.empty(), @@ -642,4 +643,4 @@ public Response move(@QueryParam(value = "newToken") String newToken) { + " \"language\": null,\n" + " \"encoding\": null\n" + "}"; -} \ No newline at end of file +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java index cd7a016c..cc31c8c3 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -154,11 +155,15 @@ public Response repair(RepairRequest repairRequest) { ResponseTools.getSingleRowStringResponse( app.dbUnixSocketFile, app.cqlService, - "CALL NodeOps.repair(?, ?, ?, ?)", + "CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)", repairRequest.keyspaceName, repairRequest.tables, repairRequest.full, - true)) + true, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty())) .build(); }); } diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResources.java index 8184a422..acd00321 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/RepairResources.java @@ -1,28 +1,25 @@ package com.datastax.mgmtapi.resources.v2; -import javax.ws.rs.Consumes; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - import com.datastax.mgmtapi.ManagementApplication; import com.datastax.mgmtapi.resources.common.BaseResources; import com.datastax.mgmtapi.resources.v2.models.RepairRequest; import com.datastax.mgmtapi.resources.v2.models.RepairRequestResponse; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.datastax.oss.driver.api.core.cql.ResultSet; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.ExampleObject; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; @Path("/api/v2/repairs") public class RepairResources extends BaseResources { -private static final ObjectMapper jsonMapper = new ObjectMapper(); - public RepairResources(ManagementApplication application) { super(application); } @@ -47,6 +44,17 @@ public RepairResources(ManagementApplication application) { mediaType = MediaType.TEXT_PLAIN, schema = @Schema(implementation = Response.Status.class), examples = @ExampleObject(value = "keyspace must be specified"))) + @ApiResponse( + responseCode = "500", + description = "internal error, we did not receive the expected repair ID from Cassandra.", + content = + @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema(implementation = Response.Status.class), + examples = + @ExampleObject( + value = + "internal error, we did not receive the expected repair ID from Cassandra."))) public final Response repair(RepairRequest request) { return handle( () -> { @@ -55,19 +63,27 @@ public final Response repair(RepairRequest request) { .entity("keyspaceName must be specified") .build(); } - app.cqlService.executePreparedStatement( - app.dbUnixSocketFile, - "CALL NodeOps.repair(?, ?, ?)", - repairRequest.keyspaceName, - repairRequest.tables, - repairRequest.full); + ResultSet res = + app.cqlService.executePreparedStatement( + app.dbUnixSocketFile, + "CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?, ?)", + request.keyspace, + request.tables, + request.fullRepair, + request.notifications, + request.repairParallelism, + request.datacenters, + request.associatedTokens, + request.repairThreadCount); - return Response.ok("OK").build(); + try { + String repairID = res.one().getString(0); + return Response.accepted(new RepairRequestResponse(repairID)).build(); + } catch (Exception e) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Repair request failed: " + e.getMessage()) + .build(); + } }); - - String repairID = ""; // TODO: implement me - return Response.ok(new RepairRequestResponse(repairID)).build(); } - - -} \ No newline at end of file +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java index 416abe90..c109790d 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairParallelism.java @@ -1,6 +1,5 @@ package com.datastax.mgmtapi.resources.v2.models; - import com.fasterxml.jackson.annotation.JsonValue; public enum RepairParallelism { @@ -30,4 +29,4 @@ public String getName() { public String toString() { return this.getName(); } -} \ No newline at end of file +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java index 066d6875..24513d10 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequest.java @@ -1,40 +1,51 @@ package com.datastax.mgmtapi.resources.v2.models; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Collection; import java.util.List; import java.util.Objects; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - public class RepairRequest { @JsonProperty(value = "keyspace", required = true) public final String keyspace; + @JsonProperty(value = "tables", required = true) public final List tables; + @JsonProperty(value = "full_repair", defaultValue = "true") public final Boolean fullRepair; + + @JsonProperty(value = "notifications", defaultValue = "true") + public final Boolean notifications; + @JsonProperty(value = "associated_tokens") public final List associatedTokens; + @JsonProperty(value = "repair_parallelism") public final RepairParallelism repairParallelism; + @JsonProperty(value = "datacenters") public final Collection datacenters; + @JsonProperty(value = "repair_thread_count") public final int repairThreadCount; + @JsonCreator public RepairRequest( @JsonProperty(value = "keyspace", required = true) String keyspace, @JsonProperty(value = "tables", required = true) List tables, @JsonProperty(value = "full_repair", defaultValue = "true") Boolean fullRepair, + @JsonProperty(value = "notifications", defaultValue = "true") boolean notifications, @JsonProperty(value = "associated_tokens") List associatedTokens, @JsonProperty(value = "repair_parallelism") RepairParallelism repairParallelism, @JsonProperty(value = "datacenters") Collection datacenters, - @JsonProperty(value = "repair_thread_count") int repairThreadCount){ + @JsonProperty(value = "repair_thread_count") int repairThreadCount) { this.keyspace = keyspace; this.tables = tables; this.fullRepair = fullRepair; + this.notifications = notifications; this.associatedTokens = associatedTokens; this.datacenters = datacenters; this.repairParallelism = repairParallelism; @@ -48,22 +59,22 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - return Objects.equals(keyspace, ((RepairRequest) o).keyspace) && - Objects.equals(tables, ((RepairRequest) o).tables) && - Objects.equals(fullRepair, ((RepairRequest) o).fullRepair) && - Objects.equals(associatedTokens, ((RepairRequest) o).associatedTokens) && - Objects.equals(datacenters, ((RepairRequest) o).datacenters) && - Objects.equals(repairParallelism, ((RepairRequest) o).repairParallelism) && - Objects.equals(repairThreadCount, ((RepairRequest) o).repairThreadCount); + return Objects.equals(keyspace, ((RepairRequest) o).keyspace) + && Objects.equals(tables, ((RepairRequest) o).tables) + && Objects.equals(fullRepair, ((RepairRequest) o).fullRepair) + && Objects.equals(associatedTokens, ((RepairRequest) o).associatedTokens) + && Objects.equals(datacenters, ((RepairRequest) o).datacenters) + && Objects.equals(repairParallelism, ((RepairRequest) o).repairParallelism) + && Objects.equals(repairThreadCount, ((RepairRequest) o).repairThreadCount); } public int hashCode() { - return Objects.hashCode(keyspace) + - Objects.hashCode(tables) + - Objects.hashCode(fullRepair) + - Objects.hashCode(associatedTokens) + - Objects.hashCode(datacenters) + - Objects.hashCode(repairParallelism) + - Objects.hashCode(repairThreadCount); + return Objects.hashCode(keyspace) + + Objects.hashCode(tables) + + Objects.hashCode(fullRepair) + + Objects.hashCode(associatedTokens) + + Objects.hashCode(datacenters) + + Objects.hashCode(repairParallelism) + + Objects.hashCode(repairThreadCount); } -} \ No newline at end of file +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java index 0392949b..4871d2de 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RepairRequestResponse.java @@ -1,15 +1,16 @@ package com.datastax.mgmtapi.resources.v2.models; -import java.util.Objects; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; public class RepairRequestResponse { @JsonProperty(value = "repair_id", required = true) public final String repairID; + @JsonCreator - public RepairRequestResponse(@JsonProperty(value = "repair_id", required = true) String repairID) { + public RepairRequestResponse( + @JsonProperty(value = "repair_id", required = true) String repairID) { this.repairID = repairID; } @@ -26,4 +27,4 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode(repairID); } -} \ No newline at end of file +} diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java index f20fcf42..c5947ad3 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v2/models/RingRange.java @@ -1,16 +1,16 @@ package com.datastax.mgmtapi.resources.v2.models; +import com.fasterxml.jackson.annotation.JsonProperty; import java.math.BigInteger; import java.util.Comparator; -import com.fasterxml.jackson.annotation.JsonProperty; - public final class RingRange { - public static final Comparator START_COMPARATOR - = (RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start); + public static final Comparator START_COMPARATOR = + (RingRange o1, RingRange o2) -> o1.start.compareTo(o2.start); @JsonProperty(value = "start", required = true) public final BigInteger start; + @JsonProperty(value = "end", required = true) public final BigInteger end; @@ -33,4 +33,4 @@ public BigInteger getStart() { public BigInteger getEnd() { return end; } -} \ No newline at end of file +}