Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into mtp_test
Browse files Browse the repository at this point in the history
# Conflicts:
#	streampipes-test-utils/pom.xml
#	streampipes-test-utils/src/main/java/org/apache/streampipes/test/extensions/api/StoreEventCollector.java
  • Loading branch information
bossenti committed Oct 13, 2023
2 parents 26575e1 + 71a0ea3 commit 87e4461
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 78 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
<httpcore-osgi.version>4.4.9</httpcore-osgi.version>
<immutable-value.version>2.9.3</immutable-value.version>
<influxdb.version>2.23</influxdb.version>
<j2html.version>1.6.0</j2html.version>
<jackson.version>2.15.0</jackson.version>
Expand Down Expand Up @@ -1314,11 +1313,6 @@
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<version>${immutable-value.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.filters.jvm.processor.booleanfilter;

import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
import org.apache.streampipes.test.extensions.api.StoreEventCollector;
import org.apache.streampipes.test.generator.EventStreamGenerator;
import org.apache.streampipes.test.generator.InvocationGraphGenerator;
import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class TestBooleanFilterProcessor {

private static final Logger LOG = LoggerFactory.getLogger(TestBooleanFilterProcessor.class);

@org.junit.runners.Parameterized.Parameters
public static Iterable<Object[]> data() {
List<Boolean> allTrue = Arrays.asList(true, true, true);
List<Boolean> allFalse = Arrays.asList(false, false, false);
List<Boolean> someTrueSomeFalse = Arrays.asList(true, false, true, false, true, false, true, false, true);
List<Boolean> empty = Arrays.asList();
return Arrays.asList(new Object[][]{
{"True", "Test", someTrueSomeFalse, 5},
{"True", "Test", allTrue, 3},
{"True", "Test", allFalse, 0},
{"True", "Test", empty, 0},
{"False", "Test", someTrueSomeFalse, 4},
{"False", "Test", allTrue, 0},
{"False", "Test", allFalse, 3},
{"False", "Test", empty, 0},
});
}

@org.junit.runners.Parameterized.Parameter
public String boolToKeep;

@org.junit.runners.Parameterized.Parameter(1)
public String fieldName;

@org.junit.runners.Parameterized.Parameter(2)
public List<Boolean> eventBooleans;

@org.junit.runners.Parameterized.Parameter(3)
public int expectedFilteredBooleanCount;

@Test
public void testBoolenFilter() {
BooleanFilterProcessor bfp = new BooleanFilterProcessor();
DataProcessorDescription originalGraph = bfp.declareModel();
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());

DataProcessorInvocation graph =
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);

graph.setInputStreams(Collections
.singletonList(EventStreamGenerator
.makeStreamWithProperties(Collections.singletonList(fieldName))));

graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList(fieldName)));

graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
.setActualTopicName("output-topic");

graph.getStaticProperties().stream()
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
// Must hardcode since BOOLEAN_MAPPING is private
.filter(p -> p.getInternalName().equals("boolean-mapping"))
.findFirst().get().setSelectedProperty("s0::" + fieldName);
ProcessorParams params = new ProcessorParams(graph);
params.extractor().getStaticPropertyByName(BooleanFilterProcessor.VALUE, OneOfStaticProperty.class).getOptions()
.stream().filter(ot -> ot.getName().equals(boolToKeep)).findFirst()
.get().setSelected(true);
StoreEventCollector collector = new StoreEventCollector();

bfp.onInvocation(params, collector, null);

int result = sendEvents(bfp, collector);

LOG.info("Expected filtered boolean count is {}", expectedFilteredBooleanCount);
LOG.info("Actual filtered boolean count is {}", result);
assertEquals(expectedFilteredBooleanCount, result);
}

private int sendEvents(BooleanFilterProcessor processor, StoreEventCollector collector) {
List<Event> events = makeEvents();
for (Event event : events) {
LOG.info("Sending event with value "
+ event.getFieldBySelector("s0::" + fieldName).getAsPrimitive().getAsBoolean());
processor.onEvent(event, collector);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return collector.getEvents().size();
}

private List<Event> makeEvents() {
List<Event> events = new ArrayList<>();
for (Boolean eventSetting : eventBooleans) {
events.add(makeEvent(eventSetting));
}
return events;
}

private Event makeEvent(Boolean value) {
Map<String, Object> map = new HashMap<>();
map.put(fieldName, value);
return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "s0"),
new SchemaInfo(null, new ArrayList<>()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.streampipes.processors.transformation.jvm.processor.value.change;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
Expand All @@ -31,14 +28,14 @@
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.test.extensions.api.StoreEventCollector;
import org.apache.streampipes.test.generator.InvocationGraphGenerator;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;

import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
Expand All @@ -60,8 +57,8 @@ public void getDimensionKeyForOneDimension() {

// Create event with no DIMENSION_PROPERTY
event.getSchemaInfo()
.getEventSchema()
.getEventProperties().get(0)
.getEventSchema()
.getEventProperties().get(0)
.setPropertyScope(PropertyScope.MEASUREMENT_PROPERTY.name());

assertEquals("l1", processor.getDimensionKey(event));
Expand All @@ -87,9 +84,9 @@ public void detectChangedValue() {
DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(processor.declareModel());

graph.getStaticProperties().stream()
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.findFirst().get().setSelectedProperty("s0::value");
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.findFirst().get().setSelectedProperty("s0::value");

ProcessorParams params = new ProcessorParams(graph);
processor.onInvocation(params, null, null);
Expand Down Expand Up @@ -117,9 +114,9 @@ public void detectChangedValueMultiDim() {
DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(processor.declareModel());

graph.getStaticProperties().stream()
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.findFirst().get().setSelectedProperty("s0::value");
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.findFirst().get().setSelectedProperty("s0::value");

ProcessorParams params = new ProcessorParams(graph);
processor.onInvocation(params, null, null);
Expand Down Expand Up @@ -170,31 +167,4 @@ private Event createTestEvent(Integer value, String location) {

return EventFactory.fromMap(map, new SourceInfo("", "s0"), new SchemaInfo(eventSchema, new ArrayList<>()));
}

class StoreEventCollector implements SpOutputCollector {

ArrayList<Event> events = new ArrayList<Event>();

@Override
public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {}

@Override
public void unregisterConsumer(String routeId) {}

@Override
public void connect() throws SpRuntimeException {}

@Override
public void disconnect() throws SpRuntimeException {}

@Override
public void collect(Event event) {
events.add(event);
}

public List<Event> getEvents() {
return this.events;
}

}
}
12 changes: 6 additions & 6 deletions streampipes-test-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging</artifactId>
<version>0.93.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging</artifactId>
<version>0.93.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-extensions-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Map;

public class StoreEventCollector implements SpOutputCollector {
ArrayList<Event> events = new ArrayList<Event>();
ArrayList<Event> events = new ArrayList<>();

@Override
public void registerConsumer(String routeId,
Expand Down
43 changes: 43 additions & 0 deletions ui/cypress/tests/configuration/labels/general.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

describe('Change basic settings', () => {
beforeEach('Setup Test', () => {
cy.initStreamPipesTest();
});

it('Perform Test', () => {
cy.visit('#/configuration/general');

// Rename app, change localhost and port
cy.dataCy('general-config-app-name').clear();
cy.dataCy('general-config-app-name').type('TEST APP');
cy.dataCy('general-config-hostname').clear();
cy.dataCy('general-config-hostname').type('testhost');
cy.dataCy('general-config-port').clear();
cy.dataCy('general-config-port').type('123');
cy.dataCy('sp-element-general-config-save').click();

// Leave, Re-visit configuration and check values
cy.visit('#/dashboard');
cy.visit('#/configuration/general');
cy.dataCy('general-config-app-name').should('have.value', 'TEST APP');
cy.dataCy('general-config-hostname').should('have.value', 'testhost');
cy.dataCy('general-config-port').should('have.value', '123');
});
});
41 changes: 41 additions & 0 deletions ui/cypress/tests/configuration/labels/labels.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

describe('Add and Delete Label', () => {
beforeEach('Setup Test', () => {
cy.initStreamPipesTest();
});

it('Perform Test', () => {
cy.visit('#/configuration/labels');

// Add new label
cy.dataCy('new-label-button').click();
cy.dataCy('label-name').type('test');
cy.dataCy('label-description').type('test test');
cy.dataCy('save-label-button').click();

// Check label
cy.dataCy('available-labels-list').should('have.length', 1);
cy.dataCy('label-text').should('have.text', ' test\n');

// Delete label
cy.dataCy('delete-label-button').click();
cy.dataCy('available-labels-list').should('have.length', 0);
});
});
Loading

0 comments on commit 87e4461

Please sign in to comment.