Skip to content

Commit

Permalink
fix: filter for running adapter instances in adapter health check (#2275
Browse files Browse the repository at this point in the history
)

* fix: filter for running adapter instances in adapter health check

* refactor: move strings to variables
  • Loading branch information
bossenti authored Dec 6, 2023
1 parent 3363175 commit 4cecb6e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.streampipes.connect.management.util.WorkerPaths;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,13 +40,10 @@ public class AdapterHealthCheck implements Runnable {
private final IAdapterStorage adapterStorage;
private final AdapterMasterManagement adapterMasterManagement;

public AdapterHealthCheck() {
this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
this.adapterMasterManagement = new AdapterMasterManagement();
}

public AdapterHealthCheck(IAdapterStorage adapterStorage,
AdapterMasterManagement adapterMasterManagement) {
public AdapterHealthCheck(
IAdapterStorage adapterStorage,
AdapterMasterManagement adapterMasterManagement
) {
this.adapterStorage = adapterStorage;
this.adapterMasterManagement = adapterMasterManagement;
}
Expand Down Expand Up @@ -82,8 +78,14 @@ public void checkAndRestoreAdapters() {
public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
Map<String, AdapterDescription> result = new HashMap<>();
List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
allRunningInstancesAdapterDescription.forEach(adapterDescription ->
result.put(adapterDescription.getElementId(), adapterDescription));
allRunningInstancesAdapterDescription
.stream()
.filter(AdapterDescription::isRunning)
.forEach(adapterDescription ->
result.put(
adapterDescription.getElementId(),
adapterDescription
));

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.slf4j.Logger;
Expand All @@ -47,7 +48,11 @@ public class WorkerAdministrationManagement {
private final AdapterHealthCheck adapterHealthCheck;

public WorkerAdministrationManagement() {
this.adapterHealthCheck = new AdapterHealthCheck();
this.adapterHealthCheck = new AdapterHealthCheck(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
new AdapterMasterManagement()
);
this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.management.health;

import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;

import org.junit.Before;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AdapterHealthCheckTest {

private IAdapterStorage adapterInstanceStorageMock;

@Before
public void setUp() {
adapterInstanceStorageMock = mock(IAdapterStorage.class);
}

@Test
public void getAllRunningInstancesAdapterDescriptionsEmpty() {
when(adapterInstanceStorageMock.getAllAdapters()).thenReturn(List.of());

var healthCheck = new AdapterHealthCheck(adapterInstanceStorageMock, new AdapterMasterManagement());
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();

assertTrue(result.isEmpty());
}

@Test
public void getAllRunningInstancesAdapterDescriptionsMixed() {

var nameRunningAdapter = "running-adapter";
var nameStoppedAdapter = "stopped-adapter";

var stoppedAdapter = new AdapterDescription();
stoppedAdapter.setElementId(nameStoppedAdapter);
stoppedAdapter.setRunning(false);

var runningAdapter = new AdapterDescription();
runningAdapter.setElementId(nameRunningAdapter);
runningAdapter.setRunning(true);

when(adapterInstanceStorageMock.getAllAdapters()).thenReturn(List.of(stoppedAdapter, runningAdapter));

var healthCheck = new AdapterHealthCheck(adapterInstanceStorageMock, new AdapterMasterManagement());
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();

assertEquals(1, result.size());
assertTrue(result.containsKey(nameRunningAdapter));
assertEquals(runningAdapter, result.get(nameRunningAdapter));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.connect.management.health.AdapterHealthCheck;
import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.manager.health.PipelineHealthCheck;
Expand Down Expand Up @@ -139,7 +140,10 @@ public void init() {
List.of(
new ServiceHealthCheck(),
new PipelineHealthCheck(),
new AdapterHealthCheck())
new AdapterHealthCheck(
StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(),
new AdapterMasterManagement()
))
);

var logFetchInterval = env.getLogFetchIntervalInMillis().getValueOrDefault();
Expand Down

0 comments on commit 4cecb6e

Please sign in to comment.