From d01f3cd438317c688313d8e2c652f92f1e498773 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 10 Aug 2023 11:17:52 +0300 Subject: [PATCH 1/4] Add stubbed polling of job details from the mgmt-api. This will not work without the actual client implementation Implement using apiClient the triggerRepair, getJobDetails, scheduler as well as add a simple test to ensure the state is managed correctly --- src/server/pom.xml | 2 +- .../io/cassandrareaper/ReaperApplication.java | 9 +- .../http/HttpCassandraManagementProxy.java | 101 ++++++++++++++++-- .../http/HttpManagementConnectionFactory.java | 18 +++- .../http/models/JobStatusTracker.java | 23 ++++ .../HttpCassandraManagementProxyTests.java | 77 +++++++++++++ 6 files changed, 217 insertions(+), 13 deletions(-) create mode 100644 src/server/src/main/java/io/cassandrareaper/management/http/models/JobStatusTracker.java create mode 100644 src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java diff --git a/src/server/pom.xml b/src/server/pom.xml index 5d83036cb..c0327dfa6 100644 --- a/src/server/pom.xml +++ b/src/server/pom.xml @@ -274,7 +274,7 @@ io.k8ssandra datastax-mgmtapi-client-openapi - 0.1.0-c22a2fc + 0.1.0-4d2a772 diff --git a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java index 577f12112..6b2a8938e 100644 --- a/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java +++ b/src/server/src/main/java/io/cassandrareaper/ReaperApplication.java @@ -179,7 +179,7 @@ public void run(ReaperApplicationConfiguration config, Environment environment) Cryptograph cryptograph = context.config == null || context.config.getCryptograph() == null ? new NoopCrypotograph() : context.config.getCryptograph().create(); - initializeManagement(context, cryptograph); + initializeManagement(context, environment, cryptograph); context.repairManager = RepairManager.create( context, @@ -299,14 +299,17 @@ public void run(ReaperApplicationConfiguration config, Environment environment) } - private void initializeManagement(AppContext context, Cryptograph cryptograph) { + private void initializeManagement(AppContext context, Environment environment, Cryptograph cryptograph) { if (context.managementConnectionFactory == null) { LOG.info("no management connection factory given in context, creating default"); if (context.config.getHttpManagement() == null || !context.config.getHttpManagement().isEnabled()) { LOG.info("HTTP management connection config not set, or set disabled. Creating JMX connection factory instead"); context.managementConnectionFactory = new JmxManagementConnectionFactory(context, cryptograph); } else { - context.managementConnectionFactory = new HttpManagementConnectionFactory(context); + ScheduledExecutorService jobStatusPollerExecutor = environment.lifecycle() + .scheduledExecutorService("JobStatusPoller") + .threads(2).build(); + context.managementConnectionFactory = new HttpManagementConnectionFactory(context, jobStatusPollerExecutor); } } } diff --git a/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java b/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java index d2efaf59f..d7db3436d 100644 --- a/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java +++ b/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 The Last Pickle Ltd + * Copyright 2023-2023 DataStax, Inc. * * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,6 +22,7 @@ import io.cassandrareaper.core.Table; import io.cassandrareaper.management.ICassandraManagementProxy; import io.cassandrareaper.management.RepairStatusHandler; +import io.cassandrareaper.management.http.models.JobStatusTracker; import io.cassandrareaper.service.RingRange; import java.io.IOException; @@ -35,25 +36,37 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.management.JMException; import javax.management.openmbean.CompositeData; import javax.validation.constraints.NotNull; import com.codahale.metrics.MetricRegistry; import com.datastax.mgmtapi.client.api.DefaultApi; -import com.datastax.mgmtapi.client.invoker.ApiClient; import com.datastax.mgmtapi.client.invoker.ApiException; import com.datastax.mgmtapi.client.model.EndpointStates; +import com.datastax.mgmtapi.client.model.Job; +import com.datastax.mgmtapi.client.model.RepairRequest; import com.datastax.mgmtapi.client.model.SnapshotDetails; +import com.datastax.mgmtapi.client.model.StatusChange; import com.datastax.mgmtapi.client.model.TakeSnapshotRequest; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.utils.progress.ProgressEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HttpCassandraManagementProxy implements ICassandraManagementProxy { + + public static final int DEFAULT_POLL_INTERVAL_IN_MILLISECONDS = 5000; private static final Logger LOG = LoggerFactory.getLogger(HttpCassandraManagementProxy.class); final String host; final MetricRegistry metricRegistry; @@ -61,16 +74,28 @@ public class HttpCassandraManagementProxy implements ICassandraManagementProxy { final InetSocketAddress endpoint; final DefaultApi apiClient; + final ConcurrentMap repairStatusHandlers = Maps.newConcurrentMap(); + final ConcurrentMap jobTracker = Maps.newConcurrentMap(); + final ConcurrentMap repairStatusExecutors = Maps.newConcurrentMap(); + + + private ScheduledExecutorService statusTracker; + public HttpCassandraManagementProxy(MetricRegistry metricRegistry, String rootPath, - InetSocketAddress endpoint + InetSocketAddress endpoint, + ScheduledExecutorService executor, + DefaultApi apiClient ) { this.host = endpoint.getHostString(); this.metricRegistry = metricRegistry; this.rootPath = rootPath; this.endpoint = endpoint; - this.apiClient = new DefaultApi( - new ApiClient().setBasePath("http://" + endpoint.getHostName() + ":" + endpoint.getPort() + rootPath)); + this.apiClient = apiClient; + this.statusTracker = executor; + + // TODO Perhaps the poll interval should be configurable through context.config ? + this.scheduleJobPoller(statusTracker, DEFAULT_POLL_INTERVAL_IN_MILLISECONDS); } @Override @@ -192,13 +217,31 @@ public int triggerRepair( List associatedTokens, int repairThreadCount) throws ReaperException { - return 1; //TODO: implement me + String jobId; + try { + jobId = apiClient.repair1(new RepairRequest()); + } catch (ApiException e) { + throw new ReaperException(e); + } + + int repairNo = Integer.parseInt(jobId.substring(7)); + + repairStatusExecutors.putIfAbsent(repairNo, Executors.newSingleThreadExecutor()); + repairStatusHandlers.putIfAbsent(repairNo, repairStatusHandler); + jobTracker.put(jobId, new JobStatusTracker()); + return repairNo; } @Override public void removeRepairStatusHandler(int repairNo) { - // TODO: implement me. + repairStatusHandlers.remove(repairNo); + ExecutorService repairStatusExecutor = repairStatusExecutors.remove(repairNo); + if (null != repairStatusExecutor) { + repairStatusExecutor.shutdown(); + } + String jobId = String.format("repair-%d", repairNo); + jobTracker.remove(jobId); } @Override @@ -348,4 +391,48 @@ public String getUntranslatedHost() { //TODO: implement me return ""; } + + Job getJobStatus(String id) { + // Poll with HTTP client the job's status + try { + Job job = apiClient.getJobStatus(id); + return job; + } catch (ApiException e) { + throw new RuntimeException(e); + } + } + + private void scheduleJobPoller(ScheduledExecutorService scheduler, int pollInterval) { + scheduler.scheduleWithFixedDelay( + () -> { + if (jobTracker.size() > 0) { + for (Map.Entry entry : jobTracker.entrySet()) { + Job job = getJobStatus(entry.getKey()); + int availableNotifications = job.getStatusChanges().size(); + int currentNotificationCount = entry.getValue().latestNotificationCount.get(); + if (currentNotificationCount < availableNotifications) { + // We need to process the new ones + for (int i = currentNotificationCount; i < availableNotifications; i++) { + StatusChange statusChange = job.getStatusChanges().get(i); + // remove "repair-" prefix + int repairNo = Integer.parseInt(job.getId().substring(7)); + ProgressEventType progressType = ProgressEventType.valueOf(statusChange.toString()); + repairStatusExecutors.get(repairNo).submit(() -> { + repairStatusHandlers + .get(repairNo) + .handle(repairNo, Optional.empty(), Optional.of(progressType), + statusChange.getMessage(), this); + }); + + // Update the count as we process them + entry.getValue().latestNotificationCount.incrementAndGet(); + } + } + } + } + }, + 10000, + pollInterval, + TimeUnit.MILLISECONDS); + } } diff --git a/src/server/src/main/java/io/cassandrareaper/management/http/HttpManagementConnectionFactory.java b/src/server/src/main/java/io/cassandrareaper/management/http/HttpManagementConnectionFactory.java index f689ca216..c730b9761 100644 --- a/src/server/src/main/java/io/cassandrareaper/management/http/HttpManagementConnectionFactory.java +++ b/src/server/src/main/java/io/cassandrareaper/management/http/HttpManagementConnectionFactory.java @@ -31,10 +31,14 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; import javax.ws.rs.core.Response; import com.codahale.metrics.Gauge; +import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.codahale.metrics.MetricRegistry; +import com.datastax.mgmtapi.client.api.DefaultApi; +import com.datastax.mgmtapi.client.invoker.ApiClient; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -47,14 +51,17 @@ public class HttpManagementConnectionFactory implements IManagementConnectionFac private final MetricRegistry metricRegistry; private final HostConnectionCounters hostConnectionCounters; + private ScheduledExecutorService jobStatusPollerExecutor; + private final Set accessibleDatacenters = Sets.newHashSet(); // Constructor for HttpManagementConnectionFactory - public HttpManagementConnectionFactory(AppContext context) { + public HttpManagementConnectionFactory(AppContext context, ScheduledExecutorService jobStatusPollerExecutor) { this.metricRegistry = context.metricRegistry == null ? new MetricRegistry() : context.metricRegistry; hostConnectionCounters = new HostConnectionCounters(metricRegistry); registerConnectionsGauge(); + this.jobStatusPollerExecutor = jobStatusPollerExecutor; } public ICassandraManagementProxy connectAny(Collection nodes) throws ReaperException { @@ -118,10 +125,17 @@ private ICassandraManagementProxy connectImpl(Node node) if (pidResponse.getStatus() != 200) { throw new ReaperException("Could not get PID for node " + node.getHostname()); } + DefaultApi apiClient = new DefaultApi( + new ApiClient().setBasePath("https://" + node.getHostname() + ":" + managementPort + rootPath)); + + InstrumentedScheduledExecutorService statusTracker = new InstrumentedScheduledExecutorService( + jobStatusPollerExecutor, metricRegistry); return new HttpCassandraManagementProxy( metricRegistry, rootPath, - new InetSocketAddress(node.getHostname(), managementPort) + new InetSocketAddress(node.getHostname(), managementPort), + statusTracker, + apiClient ); } diff --git a/src/server/src/main/java/io/cassandrareaper/management/http/models/JobStatusTracker.java b/src/server/src/main/java/io/cassandrareaper/management/http/models/JobStatusTracker.java new file mode 100644 index 000000000..05d997cd1 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/management/http/models/JobStatusTracker.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023-2023 DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.management.http.models; + +import java.util.concurrent.atomic.AtomicInteger; + +public class JobStatusTracker { + public AtomicInteger latestNotificationCount = new AtomicInteger(0); +} diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java new file mode 100644 index 000000000..363f73039 --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java @@ -0,0 +1,77 @@ +/* + * Copyright 2023-2023 DataStax, Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.management.http; + +import io.cassandrareaper.management.RepairStatusHandler; +import io.cassandrareaper.management.http.models.JobStatusTracker; + +import java.math.BigInteger; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; + +import com.codahale.metrics.MetricRegistry; +import com.datastax.mgmtapi.client.api.DefaultApi; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.commons.lang3.concurrent.ConcurrentUtils; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +public class HttpCassandraManagementProxyTests { + + @Test + public void testRepairHandlers() throws Exception { + HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class); + DefaultApi mockClient = Mockito.mock(DefaultApi.class); + Mockito.doReturn("repair-123456789").when(mockClient).repair1(any()); + ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); + + HttpCassandraManagementProxy httpCassandraManagementProxy = new HttpCassandraManagementProxy( + Mockito.mock(MetricRegistry.class), "", + Mockito.mock(InetSocketAddress.class), executorService, mockClient); + when(connectionFactory.connectAny(any())).thenReturn(httpCassandraManagementProxy); + + RepairStatusHandler repairStatusHandler = Mockito.mock(RepairStatusHandler.class); + + int repairNo = httpCassandraManagementProxy.triggerRepair(BigInteger.ZERO, BigInteger.ONE, "ks", + RepairParallelism.PARALLEL, + Collections.singleton("table"), true, Collections.emptyList(), repairStatusHandler, Collections.emptyList(), 1); + + assertEquals(123456789, repairNo); + assertEquals(1, httpCassandraManagementProxy.jobTracker.size()); + String jobId = String.format("repair-%d", repairNo); + assertTrue(httpCassandraManagementProxy.jobTracker.containsKey(jobId)); + JobStatusTracker jobStatus = httpCassandraManagementProxy.jobTracker.get(jobId); + assertEquals(0, jobStatus.latestNotificationCount.get()); + assertEquals(1, httpCassandraManagementProxy.repairStatusExecutors.size()); + assertEquals(1, httpCassandraManagementProxy.repairStatusHandlers.size()); + assertTrue(httpCassandraManagementProxy.repairStatusHandlers.containsKey(repairNo)); + + httpCassandraManagementProxy.removeRepairStatusHandler(repairNo); + assertEquals(0, httpCassandraManagementProxy.jobTracker.size()); + assertEquals(0, httpCassandraManagementProxy.repairStatusExecutors.size()); + assertEquals(0, httpCassandraManagementProxy.repairStatusHandlers.size()); + } +} From 23fb8391f8ef4ea5719554097e54786b556ac183 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 23 Aug 2023 13:56:52 +0300 Subject: [PATCH 2/4] Merge test files after the rebase --- .../HttpCassandraManagementProxyTest.java | 54 ++++++++++++- .../HttpCassandraManagementProxyTests.java | 77 ------------------- 2 files changed, 53 insertions(+), 78 deletions(-) delete mode 100644 src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java index b48ff3518..a87a9cfbb 100644 --- a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java +++ b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java @@ -18,24 +18,41 @@ package io.cassandrareaper.management.http; import io.cassandrareaper.core.Snapshot; +import io.cassandrareaper.management.RepairStatusHandler; +import io.cassandrareaper.management.http.models.JobStatusTracker; +import java.math.BigInteger; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import com.codahale.metrics.MetricRegistry; +import com.datastax.mgmtapi.client.api.DefaultApi; import com.datastax.mgmtapi.client.model.SnapshotDetails; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; public class HttpCassandraManagementProxyTest { @Test public void testConvertSnapshots() throws Exception { + DefaultApi mockClient = Mockito.mock(DefaultApi.class); + ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); HttpCassandraManagementProxy proxy = new HttpCassandraManagementProxy( - null, "/", InetSocketAddress.createUnresolved("localhost", 8080)); + null, "/", InetSocketAddress.createUnresolved("localhost", 8080), executorService, mockClient); SnapshotDetails details = jsonFromResourceFile("example_snapshot_details.json", SnapshotDetails.class); List snapshots = proxy.convertSnapshots(details); assertEquals(3, snapshots.size()); @@ -83,4 +100,39 @@ private T jsonFromResourceFile(String filename, Class claz return new ObjectMapper().readValue( this.getClass().getResource(filename).openStream(), clazz); } + + @Test + public void testRepairHandlers() throws Exception { + HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class); + DefaultApi mockClient = Mockito.mock(DefaultApi.class); + Mockito.doReturn("repair-123456789").when(mockClient).repair1(any()); + ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); + + HttpCassandraManagementProxy httpCassandraManagementProxy = new HttpCassandraManagementProxy( + Mockito.mock(MetricRegistry.class), "", + Mockito.mock(InetSocketAddress.class), executorService, mockClient); + when(connectionFactory.connectAny(any())).thenReturn(httpCassandraManagementProxy); + + RepairStatusHandler repairStatusHandler = Mockito.mock(RepairStatusHandler.class); + + int repairNo = httpCassandraManagementProxy.triggerRepair(BigInteger.ZERO, BigInteger.ONE, "ks", + RepairParallelism.PARALLEL, + Collections.singleton("table"), true, Collections.emptyList(), repairStatusHandler, Collections.emptyList(), 1); + + assertEquals(123456789, repairNo); + assertEquals(1, httpCassandraManagementProxy.jobTracker.size()); + String jobId = String.format("repair-%d", repairNo); + assertTrue(httpCassandraManagementProxy.jobTracker.containsKey(jobId)); + JobStatusTracker jobStatus = httpCassandraManagementProxy.jobTracker.get(jobId); + assertEquals(0, jobStatus.latestNotificationCount.get()); + assertEquals(1, httpCassandraManagementProxy.repairStatusExecutors.size()); + assertEquals(1, httpCassandraManagementProxy.repairStatusHandlers.size()); + assertTrue(httpCassandraManagementProxy.repairStatusHandlers.containsKey(repairNo)); + + httpCassandraManagementProxy.removeRepairStatusHandler(repairNo); + assertEquals(0, httpCassandraManagementProxy.jobTracker.size()); + assertEquals(0, httpCassandraManagementProxy.repairStatusExecutors.size()); + assertEquals(0, httpCassandraManagementProxy.repairStatusHandlers.size()); + } } diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java deleted file mode 100644 index 363f73039..000000000 --- a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTests.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2023-2023 DataStax, Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.cassandrareaper.management.http; - -import io.cassandrareaper.management.RepairStatusHandler; -import io.cassandrareaper.management.http.models.JobStatusTracker; - -import java.math.BigInteger; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; - -import com.codahale.metrics.MetricRegistry; -import com.datastax.mgmtapi.client.api.DefaultApi; -import org.apache.cassandra.repair.RepairParallelism; -import org.apache.commons.lang3.concurrent.ConcurrentUtils; -import org.junit.Test; -import org.mockito.Mockito; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -public class HttpCassandraManagementProxyTests { - - @Test - public void testRepairHandlers() throws Exception { - HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class); - DefaultApi mockClient = Mockito.mock(DefaultApi.class); - Mockito.doReturn("repair-123456789").when(mockClient).repair1(any()); - ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); - Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); - - HttpCassandraManagementProxy httpCassandraManagementProxy = new HttpCassandraManagementProxy( - Mockito.mock(MetricRegistry.class), "", - Mockito.mock(InetSocketAddress.class), executorService, mockClient); - when(connectionFactory.connectAny(any())).thenReturn(httpCassandraManagementProxy); - - RepairStatusHandler repairStatusHandler = Mockito.mock(RepairStatusHandler.class); - - int repairNo = httpCassandraManagementProxy.triggerRepair(BigInteger.ZERO, BigInteger.ONE, "ks", - RepairParallelism.PARALLEL, - Collections.singleton("table"), true, Collections.emptyList(), repairStatusHandler, Collections.emptyList(), 1); - - assertEquals(123456789, repairNo); - assertEquals(1, httpCassandraManagementProxy.jobTracker.size()); - String jobId = String.format("repair-%d", repairNo); - assertTrue(httpCassandraManagementProxy.jobTracker.containsKey(jobId)); - JobStatusTracker jobStatus = httpCassandraManagementProxy.jobTracker.get(jobId); - assertEquals(0, jobStatus.latestNotificationCount.get()); - assertEquals(1, httpCassandraManagementProxy.repairStatusExecutors.size()); - assertEquals(1, httpCassandraManagementProxy.repairStatusHandlers.size()); - assertTrue(httpCassandraManagementProxy.repairStatusHandlers.containsKey(repairNo)); - - httpCassandraManagementProxy.removeRepairStatusHandler(repairNo); - assertEquals(0, httpCassandraManagementProxy.jobTracker.size()); - assertEquals(0, httpCassandraManagementProxy.repairStatusExecutors.size()); - assertEquals(0, httpCassandraManagementProxy.repairStatusHandlers.size()); - } -} From a1446bb52774225c9f6ddce093663225408e86e4 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 23 Aug 2023 17:08:07 +0300 Subject: [PATCH 3/4] Add a test to verify the behavior of the notifications polling --- .../http/HttpCassandraManagementProxy.java | 68 +++++++++-------- .../HttpCassandraManagementProxyTest.java | 74 ++++++++++++++++++- 2 files changed, 108 insertions(+), 34 deletions(-) diff --git a/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java b/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java index d7db3436d..75a02948b 100644 --- a/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java +++ b/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java @@ -392,47 +392,53 @@ public String getUntranslatedHost() { return ""; } - Job getJobStatus(String id) { + private Job getJobStatus(String id) { // Poll with HTTP client the job's status try { - Job job = apiClient.getJobStatus(id); - return job; + return apiClient.getJobStatus(id); } catch (ApiException e) { throw new RuntimeException(e); } } + @VisibleForTesting private void scheduleJobPoller(ScheduledExecutorService scheduler, int pollInterval) { scheduler.scheduleWithFixedDelay( - () -> { - if (jobTracker.size() > 0) { - for (Map.Entry entry : jobTracker.entrySet()) { - Job job = getJobStatus(entry.getKey()); - int availableNotifications = job.getStatusChanges().size(); - int currentNotificationCount = entry.getValue().latestNotificationCount.get(); - if (currentNotificationCount < availableNotifications) { - // We need to process the new ones - for (int i = currentNotificationCount; i < availableNotifications; i++) { - StatusChange statusChange = job.getStatusChanges().get(i); - // remove "repair-" prefix - int repairNo = Integer.parseInt(job.getId().substring(7)); - ProgressEventType progressType = ProgressEventType.valueOf(statusChange.toString()); - repairStatusExecutors.get(repairNo).submit(() -> { - repairStatusHandlers - .get(repairNo) - .handle(repairNo, Optional.empty(), Optional.of(progressType), - statusChange.getMessage(), this); - }); - - // Update the count as we process them - entry.getValue().latestNotificationCount.incrementAndGet(); - } - } - } - } - }, - 10000, + notificationsTracker(), + pollInterval * 2, pollInterval, TimeUnit.MILLISECONDS); } + + @VisibleForTesting + Runnable notificationsTracker() { + return () -> { + if (jobTracker.size() > 0) { + for (Map.Entry entry : jobTracker.entrySet()) { + Job job = getJobStatus(entry.getKey()); + int availableNotifications = job.getStatusChanges().size(); + int currentNotificationCount = entry.getValue().latestNotificationCount.get(); + + if (currentNotificationCount < availableNotifications) { + // We need to process the new ones + for (int i = currentNotificationCount; i < availableNotifications; i++) { + StatusChange statusChange = job.getStatusChanges().get(i); + // remove "repair-" prefix + int repairNo = Integer.parseInt(job.getId().substring(7)); + ProgressEventType progressType = ProgressEventType.valueOf(statusChange.getStatus()); + repairStatusExecutors.get(repairNo).submit(() -> { + repairStatusHandlers + .get(repairNo) + .handle(repairNo, Optional.empty(), Optional.of(progressType), + statusChange.getMessage(), this); + }); + + // Update the count as we process them + entry.getValue().latestNotificationCount.incrementAndGet(); + } + } + } + } + }; + } } diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java index a87a9cfbb..6a4ad4955 100644 --- a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java +++ b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java @@ -23,15 +23,20 @@ import java.math.BigInteger; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import com.codahale.metrics.MetricRegistry; import com.datastax.mgmtapi.client.api.DefaultApi; +import com.datastax.mgmtapi.client.model.Job; import com.datastax.mgmtapi.client.model.SnapshotDetails; +import com.datastax.mgmtapi.client.model.StatusChange; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.cassandra.repair.RepairParallelism; import org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.junit.Test; @@ -42,6 +47,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class HttpCassandraManagementProxyTest { @@ -50,7 +59,7 @@ public class HttpCassandraManagementProxyTest { public void testConvertSnapshots() throws Exception { DefaultApi mockClient = Mockito.mock(DefaultApi.class); ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); - Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); + doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); HttpCassandraManagementProxy proxy = new HttpCassandraManagementProxy( null, "/", InetSocketAddress.createUnresolved("localhost", 8080), executorService, mockClient); SnapshotDetails details = jsonFromResourceFile("example_snapshot_details.json", SnapshotDetails.class); @@ -105,9 +114,9 @@ private T jsonFromResourceFile(String filename, Class claz public void testRepairHandlers() throws Exception { HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class); DefaultApi mockClient = Mockito.mock(DefaultApi.class); - Mockito.doReturn("repair-123456789").when(mockClient).repair1(any()); + doReturn("repair-123456789").when(mockClient).repair1(any()); ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); - Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); + doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); HttpCassandraManagementProxy httpCassandraManagementProxy = new HttpCassandraManagementProxy( Mockito.mock(MetricRegistry.class), "", @@ -135,4 +144,63 @@ public void testRepairHandlers() throws Exception { assertEquals(0, httpCassandraManagementProxy.repairStatusExecutors.size()); assertEquals(0, httpCassandraManagementProxy.repairStatusHandlers.size()); } + + @Test + public void testNotificationsTracker() throws Exception { + HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class); + DefaultApi mockClient = mock(DefaultApi.class); + doReturn("repair-123456789").when(mockClient).repair1(any()); + ScheduledExecutorService executorService = mock(ScheduledExecutorService.class); + doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class)); + HttpCassandraManagementProxy httpCassandraManagementProxy = new HttpCassandraManagementProxy( + mock(MetricRegistry.class), "", + mock(InetSocketAddress.class), executorService, mockClient); + when(connectionFactory.connectAny(any())).thenReturn(httpCassandraManagementProxy); + + final AtomicInteger callTimes = new AtomicInteger(0); + RepairStatusHandler workAroundHandler = (repairNumber, status, progress, message, cassandraManagementProxy) + -> callTimes.incrementAndGet(); + + // RepairStatusHandler repairStatusHandler = mock(RepairStatusHandler.class); + // doNothing().when(repairStatusHandler).handle(any(), any(), any(), any(), any()); + int repairNo = httpCassandraManagementProxy.triggerRepair(BigInteger.ZERO, BigInteger.ONE, "ks", + RepairParallelism.PARALLEL, + Collections.singleton("table"), true, Collections.emptyList(), workAroundHandler, + Collections.emptyList(), 1); + + // We want the execution to happen in the same thread for this test + httpCassandraManagementProxy.repairStatusExecutors.put(repairNo, MoreExecutors.newDirectExecutorService()); + + Job job = new Job(); + job.setId("repair-123456789"); + StatusChange firstSc = new StatusChange(); + firstSc.setStatus("START"); + firstSc.setMessage(""); + List statusChanges = new ArrayList<>(); + statusChanges.add(firstSc); + job.setStatusChanges(statusChanges); + doReturn(job).when(mockClient).getJobStatus("repair-123456789"); + + httpCassandraManagementProxy.notificationsTracker().run(); + + assertEquals(1, httpCassandraManagementProxy.jobTracker.size()); + String jobId = String.format("repair-%d", repairNo); + assertTrue(httpCassandraManagementProxy.jobTracker.containsKey(jobId)); + JobStatusTracker jobStatus = httpCassandraManagementProxy.jobTracker.get(jobId); + assertEquals(1, jobStatus.latestNotificationCount.get()); + + verify(mockClient, times(1)).getJobStatus(any()); + assertEquals(1, callTimes.get()); + // verify(repairStatusHandler, times(1)).handle(any(), any(), any(), any(), any()); + + StatusChange secondSc = new StatusChange(); + secondSc.setStatus("COMPLETE"); + secondSc.setMessage(""); + statusChanges.add(secondSc); + + httpCassandraManagementProxy.notificationsTracker().run(); + jobStatus = httpCassandraManagementProxy.jobTracker.get(jobId); + assertEquals(2, jobStatus.latestNotificationCount.get()); + assertEquals(2, callTimes.get()); + } } From 62226ddb53b135b7f61bb5666285f21c52ffaebb Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 28 Aug 2023 10:47:58 +0300 Subject: [PATCH 4/4] Address comments --- .../management/http/HttpCassandraManagementProxy.java | 6 +++--- .../http/HttpCassandraManagementProxyTest.java | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java b/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java index 75a02948b..3227a0419 100644 --- a/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java +++ b/src/server/src/main/java/io/cassandrareaper/management/http/HttpCassandraManagementProxy.java @@ -95,7 +95,7 @@ public HttpCassandraManagementProxy(MetricRegistry metricRegistry, this.statusTracker = executor; // TODO Perhaps the poll interval should be configurable through context.config ? - this.scheduleJobPoller(statusTracker, DEFAULT_POLL_INTERVAL_IN_MILLISECONDS); + this.scheduleJobPoller(DEFAULT_POLL_INTERVAL_IN_MILLISECONDS); } @Override @@ -402,8 +402,8 @@ private Job getJobStatus(String id) { } @VisibleForTesting - private void scheduleJobPoller(ScheduledExecutorService scheduler, int pollInterval) { - scheduler.scheduleWithFixedDelay( + private void scheduleJobPoller(int pollInterval) { + statusTracker.scheduleWithFixedDelay( notificationsTracker(), pollInterval * 2, pollInterval, diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java index 6a4ad4955..de47970b7 100644 --- a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java +++ b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java @@ -110,8 +110,10 @@ private T jsonFromResourceFile(String filename, Class claz this.getClass().getResource(filename).openStream(), clazz); } + // Verify all the maps are correctly updated in the triggerRepair and removeRepairHandler which get called + // from other classes @Test - public void testRepairHandlers() throws Exception { + public void testRepairProcessMapHandlers() throws Exception { HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class); DefaultApi mockClient = Mockito.mock(DefaultApi.class); doReturn("repair-123456789").when(mockClient).repair1(any()); @@ -157,12 +159,12 @@ public void testNotificationsTracker() throws Exception { mock(InetSocketAddress.class), executorService, mockClient); when(connectionFactory.connectAny(any())).thenReturn(httpCassandraManagementProxy); + // Since we don't have existing implementation of RepairStatusHandler interface, we'll create a small "mock + // implementation" here to catch all the calls to the handler() method final AtomicInteger callTimes = new AtomicInteger(0); RepairStatusHandler workAroundHandler = (repairNumber, status, progress, message, cassandraManagementProxy) -> callTimes.incrementAndGet(); - // RepairStatusHandler repairStatusHandler = mock(RepairStatusHandler.class); - // doNothing().when(repairStatusHandler).handle(any(), any(), any(), any(), any()); int repairNo = httpCassandraManagementProxy.triggerRepair(BigInteger.ZERO, BigInteger.ONE, "ks", RepairParallelism.PARALLEL, Collections.singleton("table"), true, Collections.emptyList(), workAroundHandler, @@ -191,7 +193,6 @@ public void testNotificationsTracker() throws Exception { verify(mockClient, times(1)).getJobStatus(any()); assertEquals(1, callTimes.get()); - // verify(repairStatusHandler, times(1)).handle(any(), any(), any(), any(), any()); StatusChange secondSc = new StatusChange(); secondSc.setStatus("COMPLETE");