From 4cecb6e68416d75fe32e04a537b768cabbba4527 Mon Sep 17 00:00:00 2001 From: Tim <50115603+bossenti@users.noreply.github.com> Date: Wed, 6 Dec 2023 11:42:08 +0100 Subject: [PATCH] fix: filter for running adapter instances in adapter health check (#2275) * fix: filter for running adapter instances in adapter health check * refactor: move strings to variables --- .../management/health/AdapterHealthCheck.java | 22 +++--- .../WorkerAdministrationManagement.java | 7 +- .../health/AdapterHealthCheckTest.java | 79 +++++++++++++++++++ .../core/StreamPipesCoreApplication.java | 6 +- 4 files changed, 102 insertions(+), 12 deletions(-) create mode 100644 streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index e59187ea69..fabf477af5 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -24,7 +24,6 @@ import org.apache.streampipes.connect.management.util.WorkerPaths; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.storage.api.IAdapterStorage; -import org.apache.streampipes.storage.couchdb.CouchDbStorageManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +40,10 @@ public class AdapterHealthCheck implements Runnable { private final IAdapterStorage adapterStorage; private final AdapterMasterManagement adapterMasterManagement; - public AdapterHealthCheck() { - this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage(); - this.adapterMasterManagement = new AdapterMasterManagement(); - } - - public AdapterHealthCheck(IAdapterStorage adapterStorage, - AdapterMasterManagement adapterMasterManagement) { + public AdapterHealthCheck( + IAdapterStorage adapterStorage, + AdapterMasterManagement adapterMasterManagement + ) { this.adapterStorage = adapterStorage; this.adapterMasterManagement = adapterMasterManagement; } @@ -82,8 +78,14 @@ public void checkAndRestoreAdapters() { public Map getAllRunningInstancesAdapterDescriptions() { Map result = new HashMap<>(); List allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters(); - allRunningInstancesAdapterDescription.forEach(adapterDescription -> - result.put(adapterDescription.getElementId(), adapterDescription)); + allRunningInstancesAdapterDescription + .stream() + .filter(AdapterDescription::isRunning) + .forEach(adapterDescription -> + result.put( + adapterDescription.getElementId(), + adapterDescription + )); return result; } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java index df88329323..5378c9acc0 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerAdministrationManagement.java @@ -28,6 +28,7 @@ import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.storage.api.IAdapterStorage; import org.apache.streampipes.storage.couchdb.CouchDbStorageManager; +import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; @@ -47,7 +48,11 @@ public class WorkerAdministrationManagement { private final AdapterHealthCheck adapterHealthCheck; public WorkerAdministrationManagement() { - this.adapterHealthCheck = new AdapterHealthCheck(); + this.adapterHealthCheck = new AdapterHealthCheck( + StorageDispatcher.INSTANCE.getNoSqlStore() + .getAdapterInstanceStorage(), + new AdapterMasterManagement() + ); this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage(); } diff --git a/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java new file mode 100644 index 0000000000..12f34c4c5c --- /dev/null +++ b/streampipes-connect-management/src/test/java/org/apache/streampipes/connect/management/health/AdapterHealthCheckTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.connect.management.health; + +import org.apache.streampipes.connect.management.management.AdapterMasterManagement; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.storage.api.IAdapterStorage; + +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AdapterHealthCheckTest { + + private IAdapterStorage adapterInstanceStorageMock; + + @Before + public void setUp() { + adapterInstanceStorageMock = mock(IAdapterStorage.class); + } + + @Test + public void getAllRunningInstancesAdapterDescriptionsEmpty() { + when(adapterInstanceStorageMock.getAllAdapters()).thenReturn(List.of()); + + var healthCheck = new AdapterHealthCheck(adapterInstanceStorageMock, new AdapterMasterManagement()); + var result = healthCheck.getAllRunningInstancesAdapterDescriptions(); + + assertTrue(result.isEmpty()); + } + + @Test + public void getAllRunningInstancesAdapterDescriptionsMixed() { + + var nameRunningAdapter = "running-adapter"; + var nameStoppedAdapter = "stopped-adapter"; + + var stoppedAdapter = new AdapterDescription(); + stoppedAdapter.setElementId(nameStoppedAdapter); + stoppedAdapter.setRunning(false); + + var runningAdapter = new AdapterDescription(); + runningAdapter.setElementId(nameRunningAdapter); + runningAdapter.setRunning(true); + + when(adapterInstanceStorageMock.getAllAdapters()).thenReturn(List.of(stoppedAdapter, runningAdapter)); + + var healthCheck = new AdapterHealthCheck(adapterInstanceStorageMock, new AdapterMasterManagement()); + var result = healthCheck.getAllRunningInstancesAdapterDescriptions(); + + assertEquals(1, result.size()); + assertTrue(result.containsKey(nameRunningAdapter)); + assertEquals(runningAdapter, result.get(nameRunningAdapter)); + + } + +} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index f24ce00909..d8c9781bd7 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -19,6 +19,7 @@ import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.connect.management.health.AdapterHealthCheck; +import org.apache.streampipes.connect.management.management.AdapterMasterManagement; import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.manager.health.PipelineHealthCheck; @@ -139,7 +140,10 @@ public void init() { List.of( new ServiceHealthCheck(), new PipelineHealthCheck(), - new AdapterHealthCheck()) + new AdapterHealthCheck( + StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(), + new AdapterMasterManagement() + )) ); var logFetchInterval = env.getLogFetchIntervalInMillis().getValueOrDefault();