diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/SupportsNestedTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/SupportsNestedTransformationRule.java index 86a36b7d6c..d4ee006249 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/SupportsNestedTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/SupportsNestedTransformationRule.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.List; import java.util.Map; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/AdapterTransformationPipelineElement.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/AdapterTransformationPipelineElement.java index 2b6d5070b2..c3d6bb2445 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/AdapterTransformationPipelineElement.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/AdapterTransformationPipelineElement.java @@ -19,9 +19,9 @@ package org.apache.streampipes.connect.shared.preprocessing.elements; import org.apache.streampipes.connect.shared.preprocessing.generator.TransformationRuleGeneratorVisitor; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; import org.apache.streampipes.connect.shared.preprocessing.utils.Utils; import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import org.apache.streampipes.model.connect.rules.TransformationRuleDescription; import java.util.List; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/TransformationRuleGeneratorVisitor.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/TransformationRuleGeneratorVisitor.java index 2b6a4edaff..87c9bb6881 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/TransformationRuleGeneratorVisitor.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/generator/TransformationRuleGeneratorVisitor.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.generator; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import org.apache.streampipes.model.connect.rules.ITransformationRuleVisitor; import java.util.ArrayList; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java index 77b8b12f20..8a31382ebe 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/AddValueTransformationRule.java @@ -19,7 +19,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.schema; import org.apache.streampipes.connect.shared.DatatypeUtils; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.Map; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/MoveTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/MoveTransformationRule.java index f6506f376c..b584b9eff2 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/MoveTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/MoveTransformationRule.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.schema; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.HashMap; import java.util.List; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/DuplicateFilterPipelineElement.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/DuplicateFilterPipelineElement.java index bbca98fb1e..fedf106a24 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/DuplicateFilterPipelineElement.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/DuplicateFilterPipelineElement.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.stream; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.HashMap; import java.util.Map; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/EventRateTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/EventRateTransformationRule.java index 229952c8b3..b5b586f65e 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/EventRateTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/stream/EventRateTransformationRule.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.stream; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.Map; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/AddTimestampTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/AddTimestampTransformationRule.java index c8267ef34a..f075933c33 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/AddTimestampTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/AddTimestampTransformationRule.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.value; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.Map; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/DatatypeTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/DatatypeTransformationRule.java index ce0f0509a5..df61f2e131 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/DatatypeTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/DatatypeTransformationRule.java @@ -20,7 +20,7 @@ import org.apache.streampipes.connect.shared.DatatypeUtils; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import java.util.Map; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/TimestampTransformationRule.java b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/TimestampTransformationRule.java index 5f9f30de61..05810d6072 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/TimestampTransformationRule.java +++ b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/TimestampTransformationRule.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; +import static java.util.TimeZone.getTimeZone; + public class TimestampTransformationRule extends SupportsNestedTransformationRule { private final List eventKey; @@ -48,6 +50,7 @@ public TimestampTransformationRule(List eventKey, if (mode == TimestampTranformationRuleMode.FORMAT_STRING) { dateFormatter = new SimpleDateFormat(formatString); + dateFormatter.setTimeZone(getTimeZone("UTC")); } } diff --git a/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/SchemaEventTransformerTest.java b/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/SchemaEventTransformerTest.java index ec87937de1..56400a8066 100644 --- a/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/SchemaEventTransformerTest.java +++ b/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/schema/SchemaEventTransformerTest.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.schema; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/ValueEventTransformerTest.java b/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/ValueEventTransformerTest.java index 89cd25be8a..286be99a00 100644 --- a/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/ValueEventTransformerTest.java +++ b/streampipes-connect-shared/src/test/java/org/apache/streampipes/connect/shared/preprocessing/transform/value/ValueEventTransformerTest.java @@ -18,7 +18,7 @@ package org.apache.streampipes.connect.shared.preprocessing.transform.value; -import org.apache.streampipes.connect.shared.preprocessing.transform.TransformationRule; +import org.apache.streampipes.extensions.api.connect.TransformationRule; import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.schema.EventSchema; diff --git a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/StreamPipesAdapter.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/StreamPipesAdapter.java index 4a5fb1d362..8651b05ee5 100644 --- a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/StreamPipesAdapter.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/StreamPipesAdapter.java @@ -22,11 +22,26 @@ import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext; import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.GuessSchema; public interface StreamPipesAdapter { IAdapterConfiguration declareConfig(); + /** + * Preprocesses the adapter description before the adapter is invoked. + * + *

This method is designed to allow adapters to modify the adapter description prior to invocation. + * It is particularly useful for adapters that need to manipulate certain values internally, + * e.g. bypassing the adapter preprocessing pipeline. An example of such an adapter is the FileReplayAdapter, + * which manipulates timestamp values.

+ * + *

This is a default method and does not need to be overridden unless specific preprocessing is required.

+ * + * @param adapterDescription The adapter description to be preprocessed. + */ + default void preprocessAdapterDescription(AdapterDescription adapterDescription) {}; + void onAdapterStarted(IAdapterParameterExtractor extractor, IEventCollector collector, IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException; diff --git a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/TransformationRule.java b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/TransformationRule.java similarity index 92% rename from streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/TransformationRule.java rename to streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/TransformationRule.java index f6443afc62..91b877825d 100644 --- a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/transform/TransformationRule.java +++ b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/connect/TransformationRule.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.connect.shared.preprocessing.transform; +package org.apache.streampipes.extensions.api.connect; import java.util.Map; diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java index 958d413f1f..0d9cfd31f2 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/AdapterWorkerManagement.java @@ -63,6 +63,11 @@ public void invokeAdapter(AdapterDescription adapterDescription) throws AdapterE newAdapterInstance, adapterDescription); + // This method allows adapters to modify the adapter description prior to invocation. + // It is particularly useful for adapters like FileReplayAdapter that need to manipulate timestamp values + // internally, bypassing the adapter preprocessing pipeline. + newAdapterInstance.preprocessAdapterDescription(adapterDescription); + var registeredParsers = newAdapterInstance.declareConfig().getSupportedParsers(); var extractor = AdapterParameterExtractor.from(adapterDescription, registeredParsers); var eventCollector = EventCollector.from(adapterDescription); diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java index 6ca6217553..ed0d256605 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapter.java @@ -20,8 +20,10 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils; +import org.apache.streampipes.connect.shared.preprocessing.generator.StatelessTransformationRuleGeneratorVisitor; import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration; import org.apache.streampipes.extensions.api.connect.IEventCollector; +import org.apache.streampipes.extensions.api.connect.IParser; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext; import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext; @@ -31,8 +33,11 @@ import org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers; import org.apache.streampipes.extensions.management.connect.adapter.parser.xml.XmlParser; import org.apache.streampipes.model.AdapterType; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.GuessSchema; import org.apache.streampipes.model.connect.rules.schema.RenameRuleDescription; +import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; +import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription; import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; @@ -50,6 +55,7 @@ import java.io.InputStream; import java.net.URI; import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -76,6 +82,8 @@ public class FileReplayAdapter implements StreamPipesAdapter { private boolean replaceTimestamp; private String timestampRuntimeName; + private TimestampTranfsformationRuleDescription timestampTranfsformationRuleDescription; + private String timestampSourceFieldName; private float speedUp; @@ -84,7 +92,8 @@ public class FileReplayAdapter implements StreamPipesAdapter { @Override public IAdapterConfiguration declareConfig() { - return AdapterConfigurationBuilder.create(ID, 0, FileReplayAdapter::new) + return AdapterConfigurationBuilder + .create(ID, 0, FileReplayAdapter::new) .withSupportedParsers( new JsonParsers(), new CsvParser(), @@ -105,7 +114,8 @@ public IAdapterConfiguration declareConfig() { Labels.withId(REPLAY_ONCE), Options.from("no", "yes") ) - .requiredAlternatives(Labels.withId(SPEED), + .requiredAlternatives( + Labels.withId(SPEED), Alternatives.from(Labels.withId(KEEP_ORIGINAL_TIME), true), Alternatives.from(Labels.withId(FASTEST)), Alternatives.from( @@ -127,8 +137,54 @@ public void onAdapterStarted( IAdapterRuntimeContext adapterRuntimeContext ) throws AdapterException { - // extract user input + throwExceptionWhenAddTimestampRuleIsSelected(extractor); + + boolean replayOnce = extractUserInputsAndReturnValueOfReplayOnce(extractor); + + determineTimestampRuntimeName(extractor); + + determineSourceTimestampField(extractor); + + startAdapterReplayThread(extractor, collector, adapterRuntimeContext, replayOnce); + } + + protected static void throwExceptionWhenAddTimestampRuleIsSelected(IAdapterParameterExtractor extractor) + throws AdapterException { + boolean ruleExists = extractor.getAdapterDescription() + .getRules() + .stream() + .anyMatch(rule -> rule instanceof AddTimestampRuleDescription); + + if (ruleExists) { + throw new AdapterException("The file replay adapter requires a valid timestamp within the file. The add " + + "timestamp option in the schema editor is not supported. Please edit the " + + "adater to resolve this problem."); + } + } + + private void startAdapterReplayThread( + IAdapterParameterExtractor extractor, + IEventCollector collector, + IAdapterRuntimeContext adapterRuntimeContext, + boolean replayOnce + ) { executor = Executors.newScheduledThreadPool(1); + if (replayOnce) { + executor.schedule( + () -> getFileFromEndpointAndParseFile(extractor, collector, adapterRuntimeContext), + 0, + TimeUnit.SECONDS + ); + } else { + executor.scheduleAtFixedRate( + () -> getFileFromEndpointAndParseFile(extractor, collector, adapterRuntimeContext), + 0, + 1, + TimeUnit.SECONDS); + } + } + + private boolean extractUserInputsAndReturnValueOfReplayOnce(IAdapterParameterExtractor extractor) { boolean replayOnce = extractor .getStaticPropertyExtractor() .selectedSingleValue(REPLAY_ONCE, String.class) @@ -147,41 +203,25 @@ public void onAdapterStarted( .singleValueParameter(SPEED_UP, Float.class); default -> 1.0f; }; + return replayOnce; + } - // get timestamp field + private void determineTimestampRuntimeName(IAdapterParameterExtractor extractor) throws AdapterException { var timestampField = extractor .getAdapterDescription() .getEventSchema() .getEventProperties() .stream() .filter(eventProperty -> - eventProperty.getDomainProperties() - .contains(URI.create(SO.DATE_TIME))) + eventProperty.getDomainProperties() + .contains(URI.create(SO.DATE_TIME))) .findFirst(); if (timestampField.isEmpty()) { throw new AdapterException("Could not find a timestamp field in event schema"); } else { timestampRuntimeName = timestampField.get() - .getRuntimeName(); - } - - determineSourceTimestampField(extractor); - - // start replay adapter - if (replayOnce) { - executor.schedule( - () -> parseFile(extractor, collector, adapterRuntimeContext), - 0, - TimeUnit.SECONDS - ); - } else { - executor.scheduleAtFixedRate( - () -> parseFile(extractor, collector, adapterRuntimeContext), - 0, - 1, - TimeUnit.SECONDS - ); + .getRuntimeName(); } } @@ -199,10 +239,10 @@ private void determineSourceTimestampField(IAdapterParameterExtractor extractor) if (!renamingRulesTimestamp.isEmpty()) { if (renamingRulesTimestamp.size() == 1) { timestampSourceFieldName = renamingRulesTimestamp.get(0) - .getOldRuntimeKey(); + .getOldRuntimeKey(); } else { throw new AdapterException("Invalid configuration - multiple renaming rules detected which affect the " - + "timestamp property."); + + "timestamp property."); } } else { // no renaming rules can be found, timestamp name does not change @@ -219,73 +259,32 @@ private void determineSourceTimestampField(IAdapterParameterExtractor extractor) private List getRenamingRulesTimestamp(IAdapterParameterExtractor extractor) { var renamingRules = extractor.getAdapterDescription() - .getRules() - .stream() - .filter(rule -> rule instanceof RenameRuleDescription) - .map(rule -> (RenameRuleDescription) rule) - .toList(); + .getRules() + .stream() + .filter(rule -> rule instanceof RenameRuleDescription) + .map(rule -> (RenameRuleDescription) rule) + .toList(); return renamingRules.stream() - .filter(rule -> rule.getNewRuntimeKey() - .equals(timestampRuntimeName)) - .toList(); + .filter(rule -> rule.getNewRuntimeKey() + .equals(timestampRuntimeName)) + .toList(); } - private void parseFile( + private void getFileFromEndpointAndParseFile( IAdapterParameterExtractor extractor, IEventCollector collector, IAdapterRuntimeContext adapterRuntimeContext ) { try { - InputStream inputStream = getDataFromEndpoint(extractor - .getStaticPropertyExtractor() - .selectedFilename(FILE_PATH)); - - extractor.selectedParser() - .parse(inputStream, (event) -> { - - long actualEventTimestamp = -1; - var timestampFieldValue = event.get(timestampSourceFieldName); - if (timestampFieldValue instanceof Long) { - actualEventTimestamp = (Long) timestampFieldValue; - } else if (timestampFieldValue instanceof Integer) { - actualEventTimestamp = (Integer) timestampFieldValue; - } else if (!(timestampFieldValue == null && replaceTimestamp)){ - adapterRuntimeContext - .getLogger() - .error( - new AdapterException("Timestamp field is not a unix timestamp in ms, skipping event. Value: %s" - .formatted(event.get(timestampSourceFieldName)) - )); - } - - if (actualEventTimestamp == -1 && !replaceTimestamp) { - // Do not emit any data if timestamp could not be processed - return; - } - - long sleepTime; - if (timestampLastEvent != -1 && actualEventTimestamp != -1) { - sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) / speedUp); - } else { - sleepTime = 1; - } - // speed up is set to Float.MAX_VALUE when user selected fastest option - if (sleepTime > 0 && speedUp != Float.MAX_VALUE) { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("File stream adapter was stopped, the current replay is interrupted", e); - } - } - - timestampLastEvent = actualEventTimestamp; - if (replaceTimestamp) { - event.put(timestampSourceFieldName, System.currentTimeMillis()); - } - - collector.collect(event); - }); + var inputStream = getFileAsInputStreamFromEndpoint(extractor); + + parseFile( + extractor.selectedParser(), + collector, + inputStream, + adapterRuntimeContext + ); } catch (AdapterException e) { adapterRuntimeContext @@ -294,6 +293,103 @@ private void parseFile( } } + private void parseFile( + IParser parser, + IEventCollector collector, + InputStream inputStream, + IAdapterRuntimeContext adapterRuntimeContext + ) { + // The parse method does not throw AdapterExceptions, that's why the logging is handeled within the catch blog here + parser.parse(inputStream, (event) -> { + try { + processEvent(collector, event); + } catch (AdapterException e) { + adapterRuntimeContext + .getLogger() + .error(e); + } + }); + } + + protected void processEvent( + IEventCollector collector, + Map event + ) throws AdapterException { + + long actualEventTimestamp = getTimestampFromEvent(event); + + reduceReplaySpeedIfRequired(actualEventTimestamp); + + // This must be the last step, because the original timestamp must be used to simulate the replay frequency + // of the original file + replaceTimestampIfRequired(event); + + timestampLastEvent = actualEventTimestamp; + collector.collect(event); + } + + protected long getTimestampFromEvent(Map event) throws AdapterException { + long actualEventTimestamp = -1; + + var timestampFieldValue = event.get(timestampSourceFieldName); + + if (timestampFieldValue instanceof Long) { + actualEventTimestamp = (Long) timestampFieldValue; + } else if (timestampFieldValue instanceof Integer) { + actualEventTimestamp = (Integer) timestampFieldValue; + } + + // transform timestamp if transformation rule is present + actualEventTimestamp = transformTimestampIfTransformationRuleIsPresent(event, actualEventTimestamp); + + + if (actualEventTimestamp == -1 && !replaceTimestamp) { + throw new AdapterException("Timestamp field could not be parsed, skipping event. " + + "Value: %s".formatted(event.get(timestampSourceFieldName))); + } + + return actualEventTimestamp; + } + + private long transformTimestampIfTransformationRuleIsPresent(Map event, long actualEventTimestamp) { + if (timestampTranfsformationRuleDescription != null) { + var transformationRuleDescription = timestampTranfsformationRuleDescription; + + var transformationRuleVisitor = new StatelessTransformationRuleGeneratorVisitor(); + transformationRuleVisitor.visit(transformationRuleDescription); + var timestampTransformationRule = transformationRuleVisitor.getTransformationRules() + .get(0); + + actualEventTimestamp = (Long) ( + timestampTransformationRule.apply(event) + .get(timestampSourceFieldName)); + } + return actualEventTimestamp; + } + + private void reduceReplaySpeedIfRequired(long actualEventTimestamp) { + long sleepTime; + if (timestampLastEvent != -1 && actualEventTimestamp != -1) { + sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) / speedUp); + } else { + sleepTime = 1; + } + // speed up is set to Float.MAX_VALUE when user selected fastest option + if (sleepTime > 0 && speedUp != Float.MAX_VALUE) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("File stream adapter was stopped, the current replay is interrupted", e); + } + } + } + + private void replaceTimestampIfRequired(Map event) { + if (replaceTimestamp) { + event.put(timestampSourceFieldName, System.currentTimeMillis()); + } + } + @Override public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext) { executor.shutdownNow(); @@ -301,19 +397,63 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRunti } @Override - public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor, - IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException { - var inputStream = getDataFromEndpoint(extractor - .getStaticPropertyExtractor() - .selectedFilename(FILE_PATH)); - return extractor.selectedParser().getGuessSchema(inputStream); + public GuessSchema onSchemaRequested( + IAdapterParameterExtractor extractor, + IAdapterGuessSchemaContext adapterGuessSchemaContext + ) throws AdapterException { + var inputStream = getFileAsInputStreamFromEndpoint(extractor); + return extractor.selectedParser() + .getGuessSchema(inputStream); } - private InputStream getDataFromEndpoint(String selectedFileName) throws AdapterException { + private InputStream getFileAsInputStreamFromEndpoint(IAdapterParameterExtractor extractor) throws AdapterException { + var selectedFileName = extractor + .getStaticPropertyExtractor() + .selectedFilename(FILE_PATH); + try { return FileProtocolUtils.getFileInputStream(selectedFileName); } catch (IOException e) { throw new AdapterException("Could not find file: " + selectedFileName, e); } } + + protected void setTimestampSourceFieldName(String timestampSourceFieldName) { + this.timestampSourceFieldName = timestampSourceFieldName; + } + + protected void setReplaceTimestamp(boolean replaceTimestamp) { + this.replaceTimestamp = replaceTimestamp; + } + + protected void setTimestampTranfsformationRuleDescription( + TimestampTranfsformationRuleDescription timestampTranfsformationRuleDescription + ) { + this.timestampTranfsformationRuleDescription = timestampTranfsformationRuleDescription; + } + + /** + * Removes the timestamp transformation rules from the adapter description. + * + *

The FileReplay adapter manages timestamp transformations internally to accurately simulate the replay frequency. + * This is necessary as the timestamp field values are crucial for this simulation. As a result, the timestamp rule + * description is stored locally within the FileReplay adapter and is applied when the onAdapterStarted method is + * invoked.

+ */ + @Override + public void preprocessAdapterDescription(AdapterDescription adapterDescription) { + + this.timestampTranfsformationRuleDescription = adapterDescription + .getRules() + .stream() + .filter(rule -> rule instanceof TimestampTranfsformationRuleDescription) + .map(rule -> (TimestampTranfsformationRuleDescription) rule) + .findFirst() + .orElse(null); + + // remove timestamp preprocessing rule + adapterDescription + .getRules() + .removeIf(rule -> rule instanceof TimestampTranfsformationRuleDescription); + } } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java new file mode 100644 index 0000000000..8a6c3d91a2 --- /dev/null +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/test/java/org/apache/streampipes/connect/iiot/protocol/stream/FileReplayAdapterTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.connect.iiot.protocol.stream; + +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.connect.shared.preprocessing.transform.value.TimestampTranformationRuleMode; +import org.apache.streampipes.extensions.api.connect.IEventCollector; +import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; +import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +class FileReplayAdapterTest { + + private FileReplayAdapter fileReplayAdapter; + private IEventCollector collector; + private static final String TIMESTAMP = "timestamp"; + private static final long TIMESTAMP_VALUE = 1622544682000L; + private Map event; + private ArgumentCaptor> resultEventCapture; + private IAdapterParameterExtractor extractor; + private AdapterDescription adapterDescription; + + @BeforeEach + void setUp() { + collector = mock(IEventCollector.class); + extractor = mock(IAdapterParameterExtractor.class); + adapterDescription = mock(AdapterDescription.class); + when(extractor.getAdapterDescription()).thenReturn(adapterDescription); + fileReplayAdapter = new FileReplayAdapter(); + fileReplayAdapter.setTimestampSourceFieldName(TIMESTAMP); + event = new HashMap<>(); + resultEventCapture = ArgumentCaptor.forClass(Map.class); + } + + + @Test + public void testThrowExceptionWhenAddTimestampRuleIsSelected_withAddTimestampRule() { + when(adapterDescription.getRules()).thenReturn(List.of(new AddTimestampRuleDescription())); + + assertThrows( + AdapterException.class, + () -> FileReplayAdapter.throwExceptionWhenAddTimestampRuleIsSelected(extractor) + ); + } + + @Test + public void testThrowExceptionWhenAddTimestampRuleIsSelected_withoutAddTimestampRule() { + when(adapterDescription.getRules()).thenReturn(Collections.emptyList()); + } + + @Test + void processEvent_shouldCollectEventWhenTimestampIsLong() throws AdapterException { + event.put(TIMESTAMP, TIMESTAMP_VALUE); + + fileReplayAdapter.processEvent(collector, event); + + verify(collector, times(1)).collect(event); + } + + @Test + void processEvent_shouldCollectEventWhenTimestampIsInteger() throws AdapterException { + event.put(TIMESTAMP, 1622544682); + + fileReplayAdapter.processEvent(collector, event); + + verify(collector, times(1)).collect(event); + } + + @Test + void processEvent_shouldThrowAdapterExceptionWhenTimestampIsNotUnixTimestampInMs() { + event.put(TIMESTAMP, "not a timestamp"); + + assertThrows(AdapterException.class, () -> fileReplayAdapter.processEvent(collector, event)); + } + + @Test + void processEvent_shouldNotCollectEventWhenTimestampCouldNotBeProcessed() { + event.put(TIMESTAMP, -1); + + assertThrows(AdapterException.class, () -> fileReplayAdapter.processEvent(collector, event)); + } + + @Test + void processEvent_shouldCollectEventWhenTimestampIsReplaced() throws AdapterException { + event.put(TIMESTAMP, TIMESTAMP_VALUE); + + fileReplayAdapter.setReplaceTimestamp(true); + fileReplayAdapter.processEvent(collector, event); + + verify(collector, times(1)).collect(resultEventCapture.capture()); + + var restultEvent = resultEventCapture.getValue(); + assertEquals(restultEvent.size(), 1); + assertNotEquals(restultEvent.get(TIMESTAMP), TIMESTAMP_VALUE); + } + + + @Test + void getTimestampFromEvent_returnsLongTimestamp() throws AdapterException { + event.put(TIMESTAMP, TIMESTAMP_VALUE); + + long actualEventTimestamp = fileReplayAdapter.getTimestampFromEvent(event); + + assertEquals(TIMESTAMP_VALUE, actualEventTimestamp); + } + + @Test + void getTimestampFromEvent_withLongInSeconds() throws AdapterException { + setupNumberTransformationRule(1622544682L); + assertEventTimestamp(TIMESTAMP_VALUE); + } + + @Test + void getTimestampFromEvent_withIntegerInSeconds() throws AdapterException { + setupNumberTransformationRule(1622544682); + assertEventTimestamp(TIMESTAMP_VALUE); + } + + @Test + void getTimestampFromEvent_withStringFormatInUtc() throws AdapterException { + setupStringTransformationRule( + "2024-07-01T12:00:00.000Z", + "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + ); + assertEventTimestamp(1719835200000L); + } + + @Test + void getTimestampFromEvent_withStringFormatAndPositiveOffset() throws AdapterException { + setupStringTransformationRule( + "2024-07-01T12:00:00.000+03:00", + "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" + ); + assertEventTimestamp(1719824400000L); + } + + @Test + void getTimestampFromEvent_withStringFormatAndNegativeOffset() throws AdapterException { + setupStringTransformationRule( + "2024-07-01T05:00:00.000-10:00", + "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" + ); + assertEventTimestamp(1719846000000L); + } + + @Test + void getTimestampFromEvent_withStringFormatAndZeroOffset() throws AdapterException { + setupStringTransformationRule( + "2024-07-01T12:00:00.000+00:00", + "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" + ); + assertEventTimestamp(1719835200000L); + } + + @Test + void getTimestampFromEvent_withStringFormatWithoutTimeZone() throws AdapterException { + setupStringTransformationRule( + "01.07.2024 12:00:00", + "dd.MM.yyyy HH:mm:ss" + ); + assertEventTimestamp(1719835200000L); + } + + + /** + * Sets up a string transformation rule for timestamp processing. + * This method configures a transformation rule that interprets and converts + * a timestamp string according to the specified format string. + */ + private void setupStringTransformationRule( + String timestampValue, + String formatString + ) { + event.put(TIMESTAMP, timestampValue); + + var rule = new TimestampTranfsformationRuleDescription(); + rule.setRuntimeKey(TIMESTAMP); + rule.setMode(TimestampTranformationRuleMode.FORMAT_STRING.internalName()); + rule.setFormatString(formatString); + + fileReplayAdapter.setTimestampTranfsformationRuleDescription(rule); + } + + /** + * Sets up a number transformation rule for timestamp processing. + * This method configures a transformation rule that multiplies a given timestamp value + * by a specified multiplier. + */ + private void setupNumberTransformationRule( + long timestampValue + ) { + event.put(TIMESTAMP, timestampValue); + + var rule = new TimestampTranfsformationRuleDescription(); + rule.setRuntimeKey(TIMESTAMP); + rule.setMode(TimestampTranformationRuleMode.TIME_UNIT.internalName()); + rule.setMultiplier(1000L); + + fileReplayAdapter.setTimestampTranfsformationRuleDescription(rule); + } + + private void assertEventTimestamp(long expected) throws AdapterException { + assertEquals(expected, fileReplayAdapter.getTimestampFromEvent(event)); + } + +} \ No newline at end of file diff --git a/ui/cypress/fixtures/connect/fileReplay/noTimestamp.csv b/ui/cypress/fixtures/connect/fileReplay/noTimestamp.csv new file mode 100644 index 0000000000..6f3e084fc7 --- /dev/null +++ b/ui/cypress/fixtures/connect/fileReplay/noTimestamp.csv @@ -0,0 +1,2 @@ +count;randomnumber;randomtext +122.0;62.0;c diff --git a/ui/cypress/fixtures/connect/fileReplay/timestampInMilliseconds/expected.csv b/ui/cypress/fixtures/connect/fileReplay/timestampInMilliseconds/expected.csv new file mode 100644 index 0000000000..b19de0291b --- /dev/null +++ b/ui/cypress/fixtures/connect/fileReplay/timestampInMilliseconds/expected.csv @@ -0,0 +1,4 @@ +timestamp;count +1623871499111;122.0 +1623871500222;123.0 +1623871508333;131.0 diff --git a/ui/cypress/fixtures/connect/fileReplay/timestampInMilliseconds/input.csv b/ui/cypress/fixtures/connect/fileReplay/timestampInMilliseconds/input.csv new file mode 100644 index 0000000000..b19de0291b --- /dev/null +++ b/ui/cypress/fixtures/connect/fileReplay/timestampInMilliseconds/input.csv @@ -0,0 +1,4 @@ +timestamp;count +1623871499111;122.0 +1623871500222;123.0 +1623871508333;131.0 diff --git a/ui/cypress/fixtures/connect/fileReplay/timestampInSeconds/expected.csv b/ui/cypress/fixtures/connect/fileReplay/timestampInSeconds/expected.csv new file mode 100644 index 0000000000..bba7da1ee1 --- /dev/null +++ b/ui/cypress/fixtures/connect/fileReplay/timestampInSeconds/expected.csv @@ -0,0 +1,4 @@ +timestamp;count +1623871499000;122.0 +1623871500000;123.0 +1623871508000;131.0 diff --git a/ui/cypress/fixtures/connect/fileReplay/timestampInSeconds/input.csv b/ui/cypress/fixtures/connect/fileReplay/timestampInSeconds/input.csv new file mode 100644 index 0000000000..af50b6c07c --- /dev/null +++ b/ui/cypress/fixtures/connect/fileReplay/timestampInSeconds/input.csv @@ -0,0 +1,4 @@ +timestamp;count +1623871499;122.0 +1623871500;123.0 +1623871508;131.0 diff --git a/ui/cypress/fixtures/connect/schemaRules/input.csv b/ui/cypress/fixtures/connect/schemaRules/input.csv index f92f0b8ceb..be4e9eff3b 100644 --- a/ui/cypress/fixtures/connect/schemaRules/input.csv +++ b/ui/cypress/fixtures/connect/schemaRules/input.csv @@ -1,2 +1,2 @@ -count;density;temperature -122.0;62.0;11 +timestamp;count;density;temperature +1720018277000;122.0;62.0;11 diff --git a/ui/cypress/support/builder/AdapterBuilder.ts b/ui/cypress/support/builder/AdapterBuilder.ts index c8c029c599..d590d95005 100644 --- a/ui/cypress/support/builder/AdapterBuilder.ts +++ b/ui/cypress/support/builder/AdapterBuilder.ts @@ -41,6 +41,12 @@ export class AdapterBuilder { public setTimestampProperty(timestsmpProperty: string) { this.adapterInput.timestampProperty = timestsmpProperty; + this.adapterInput.autoAddTimestamp; + return this; + } + + public setAutoAddTimestampPropery() { + this.adapterInput.autoAddTimestamp = true; return this; } diff --git a/ui/cypress/support/utils/connect/ConnectBtns.ts b/ui/cypress/support/utils/connect/ConnectBtns.ts index f828bcf148..f31db5f0e8 100644 --- a/ui/cypress/support/utils/connect/ConnectBtns.ts +++ b/ui/cypress/support/utils/connect/ConnectBtns.ts @@ -71,6 +71,48 @@ export class ConnectBtns { // ======================================================================== + // ===================== Event Schema buttons ========================== + + public static schemaUnitFromDropdown() { + return cy.dataCy('connect-schema-unit-from-dropdown'); + } + + public static schemaUnitTransformBtn() { + return cy.dataCy('connect-schema-unit-transform-btn'); + } + + public static schemaUnitToDropdown() { + return cy.dataCy('connect-schema-unit-to-dropdown'); + } + + public static saveEditProperty() { + return cy.dataCy('sp-save-edit-property', { timeout: 10000 }); + } + + public static markAsTimestampBtn() { + return cy.dataCy('sp-mark-as-timestamp').children(); + } + + public static setTimestampConverter(option: 'Number' | 'String') { + cy.dataCy('connect-timestamp-converter') + .click() + .get('mat-option') + .contains(option) + .click(); + } + + public static timestampStringRegex() { + return cy.dataCy('connect-timestamp-string-regex', { timeout: 10000 }); + } + + public static timestampNumberDropdown() { + return cy.dataCy('connect-timestamp-number-dropdown', { + timeout: 10000, + }); + } + + // ======================================================================== + // ===================== Format configurations ========================== public static csvDelimiter() { diff --git a/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts b/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts index 098ea68554..1e302003ec 100644 --- a/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts +++ b/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts @@ -35,7 +35,7 @@ export class ConnectEventSchemaUtils { ConnectEventSchemaUtils.clickEditProperty(propertyName); // Mark as timestamp - cy.dataCy('sp-mark-as-timestamp').children().click(); + ConnectBtns.markAsTimestampBtn().click(); // Close cy.dataCy('sp-save-edit-property').click(); @@ -51,33 +51,55 @@ export class ConnectEventSchemaUtils { this.eventSchemaNextBtnEnabled(); } - public static editTimestampProperty( + public static editTimestampPropertyWithRegex( propertyName: string, timestampRegex: string, ) { ConnectEventSchemaUtils.clickEditProperty(propertyName); - cy.dataCy('sp-mark-as-timestamp').children().click(); - cy.dataCy('connect-timestamp-converter') - .click() - .get('mat-option') - .contains('String') - .click(); - cy.dataCy('connect-timestamp-string-regex').type(timestampRegex); - cy.dataCy('sp-save-edit-property').click(); + ConnectBtns.markAsTimestampBtn().click(); + ConnectBtns.setTimestampConverter('String'); + + ConnectBtns.timestampStringRegex().type(timestampRegex); + + ConnectBtns.saveEditProperty().click(); + // The following code validates that the regex is persisted by reopening the edit dialog again cy.dataCy('edit-' + propertyName.toLowerCase(), { timeout: 10000, }).click({ force: true }); - cy.dataCy('connect-timestamp-string-regex', { timeout: 10000 }).should( - 'have.value', - timestampRegex, - ); - cy.dataCy('sp-save-edit-property', { timeout: 10000 }).should( - 'have.length', - 1, + ConnectBtns.timestampStringRegex().should('have.value', timestampRegex); + + ConnectBtns.saveEditProperty().should('have.length', 1); + ConnectBtns.saveEditProperty().click(); + } + + public static editTimestampPropertyWithNumber( + propertyName: string, + configurationValue: 'Seconds' | 'Milliseconds', + ) { + ConnectEventSchemaUtils.clickEditProperty(propertyName); + ConnectBtns.markAsTimestampBtn().click(); + + ConnectBtns.setTimestampConverter('Number'); + + ConnectBtns.timestampNumberDropdown() + .click({ force: true }) + .get('mat-option') + .contains(configurationValue) + .click(); + + ConnectBtns.saveEditProperty().click(); + + // Check if the configuration is persisted by reopening the edit dialog + ConnectEventSchemaUtils.clickEditProperty(propertyName); + + ConnectBtns.timestampNumberDropdown().should( + 'contain', + configurationValue, ); - cy.dataCy('sp-save-edit-property').click(); + + ConnectBtns.saveEditProperty().click(); } public static numberTransformation(propertyName: string, value: string) { @@ -111,37 +133,15 @@ export class ConnectEventSchemaUtils { toUnit: string, ) { ConnectEventSchemaUtils.clickEditProperty(propertyName); - cy.dataCy('connect-schema-unit-from-dropdown').type(fromUnit); - cy.dataCy('connect-schema-unit-transform-btn').click(); - cy.dataCy('connect-schema-unit-to-dropdown') - .click() + ConnectBtns.schemaUnitFromDropdown().type(fromUnit); + ConnectBtns.schemaUnitTransformBtn().click(); + ConnectBtns.schemaUnitToDropdown().click(); + + ConnectBtns.schemaUnitToDropdown() .get('mat-option') .contains(toUnit) .click(); - cy.dataCy('sp-save-edit-property').click(); - - cy.dataCy('edit-' + propertyName.toLowerCase(), { - timeout: 10000, - }).click({ force: true }); - - cy.dataCy('connect-schema-unit-from-dropdown').eq(0).clear(); - cy.dataCy('connect-schema-unit-from-dropdown').eq(0).type(fromUnit); - cy.dataCy('connect-schema-unit-from-dropdown', { - timeout: 10000, - }).should('have.value', fromUnit); - //cy.dataCy("connect-schema-unit-transform-btn").click(); - cy.dataCy('connect-schema-unit-to-dropdown') - .contains(toUnit) - .click({ force: true }); - cy.dataCy('connect-schema-unit-to-dropdown', { - timeout: 10000, - }).contains(toUnit); - - cy.dataCy('sp-save-edit-property', { timeout: 10000 }).should( - 'have.length', - 1, - ); - cy.dataCy('sp-save-edit-property').click({ force: true }); + ConnectBtns.saveEditProperty().click(); } public static addStaticProperty( diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts b/ui/cypress/support/utils/connect/ConnectUtils.ts index fa30cd01e0..056ab9d2a9 100644 --- a/ui/cypress/support/utils/connect/ConnectUtils.ts +++ b/ui/cypress/support/utils/connect/ConnectUtils.ts @@ -26,7 +26,10 @@ import { UserUtils } from '../UserUtils'; import { PipelineUtils } from '../PipelineUtils'; export class ConnectUtils { - public static testAdapter(adapterConfiguration: AdapterInput) { + public static testAdapter( + adapterConfiguration: AdapterInput, + adapterStartFails = false, + ) { ConnectUtils.goToConnect(); ConnectUtils.goToNewAdapterPage(); @@ -47,7 +50,11 @@ export class ConnectUtils { ConnectEventSchemaUtils.finishEventSchemaConfiguration(); - ConnectUtils.startStreamAdapter(adapterConfiguration); + ConnectUtils.startAdapter( + adapterConfiguration, + false, + adapterStartFails, + ); } public static addAdapter(adapterConfiguration: AdapterInput) { @@ -100,7 +107,7 @@ export class ConnectUtils { ConnectEventSchemaUtils.finishEventSchemaConfiguration(); - ConnectUtils.startStreamAdapter(configuration); + ConnectUtils.startAdapter(configuration); } public static goToConnect() { @@ -148,13 +155,10 @@ export class ConnectUtils { cy.get('#event-schema-next-button').click(); } - public static startStreamAdapter(adapterInput: AdapterInput) { - ConnectUtils.startAdapter(adapterInput); - } - public static startAdapter( adapterInput: AdapterInput, noLiveDataView = false, + adapterStartFails = false, ) { // Set adapter name cy.dataCy('sp-adapter-name').type(adapterInput.adapterName); @@ -175,14 +179,20 @@ export class ConnectUtils { ConnectBtns.adapterSettingsStartAdapter().click(); - if (adapterInput.startAdapter && !noLiveDataView) { - cy.dataCy('sp-connect-adapter-success-live-preview', { + if (adapterStartFails) { + cy.dataCy('sp-connect-adapter-error-message', { timeout: 60000, }).should('be.visible'); } else { - cy.dataCy('sp-connect-adapter-success-added', { - timeout: 60000, - }).should('be.visible'); + if (adapterInput.startAdapter && !noLiveDataView) { + cy.dataCy('sp-connect-adapter-success-live-preview', { + timeout: 60000, + }).should('be.visible'); + } else { + cy.dataCy('sp-connect-adapter-success-added', { + timeout: 60000, + }).should('be.visible'); + } } this.closeAdapterPreview(); diff --git a/ui/cypress/tests/adapter/fileStream.smoke.spec.ts b/ui/cypress/tests/adapter/fileStream.smoke.spec.ts index 262228877b..7024335b05 100644 --- a/ui/cypress/tests/adapter/fileStream.smoke.spec.ts +++ b/ui/cypress/tests/adapter/fileStream.smoke.spec.ts @@ -22,7 +22,7 @@ import { AdapterBuilder } from '../../support/builder/AdapterBuilder'; import { ConnectBtns } from '../../support/utils/connect/ConnectBtns'; describe( - 'Test File Stream Adapter', + 'Test File Replay Adapter', { retries: { runMode: 4, @@ -32,10 +32,10 @@ describe( () => { beforeEach('Setup Test', () => { cy.initStreamPipesTest(); - FileManagementUtils.addFile('fileTest/random.csv'); }); - it('Perform Test', () => { + it('Test successful adapter generation for file stream adapter', () => { + FileManagementUtils.addFile('fileTest/random.csv'); const adapterInput = AdapterBuilder.create('File_Stream') .setName('File Stream Adapter Test') .setTimestampProperty('timestamp') @@ -48,5 +48,58 @@ describe( ConnectUtils.testAdapter(adapterInput); ConnectUtils.deleteAdapter(); }); + + it('File stream adapter should not allow add timestamp option in schema editor', () => { + FileManagementUtils.addFile('connect/fileReplay/noTimestamp.csv'); + const adapterInput = AdapterBuilder.create('File_Stream') + .setName('File Stream Adapter Test') + .setAutoAddTimestampPropery() + .setFormat('csv') + .addFormatInput('input', ConnectBtns.csvDelimiter(), ';') + .addFormatInput('checkbox', ConnectBtns.csvHeader(), 'check') + .build(); + + ConnectUtils.testAdapter(adapterInput, true); + }); + + // it('File Stream adapter with unix timestamp in seconds', () => { + // FileManagementUtils.addFile('connect/fileReplay/timestampInSeconds/input.csv'); + // const adapterConfiguration = + // ConnectUtils.setUpPreprocessingRuleTest(false); + // + // // Edit timestamp property + // ConnectEventSchemaUtils.editTimestampPropertyWithNumber( + // 'timestamp', + // 'Seconds', + // ); + // + // ConnectEventSchemaUtils.finishEventSchemaConfiguration(); + // ConnectUtils.tearDownPreprocessingRuleTest( + // adapterConfiguration, + // 'cypress/fixtures/connect/fileReplay/timestampInSeconds/expected.csv', + // false, + // 2000, + // ); + // }); + + // it('File Stream adapter with unix timestamp in milliseconds', () => { + // FileManagementUtils.addFile('connect/fileReplay/timestampInMilliseconds/input.csv'); + // const adapterConfiguration = + // ConnectUtils.setUpPreprocessingRuleTest(false); + // + // // Edit timestamp property + // ConnectEventSchemaUtils.editTimestampPropertyWithNumber( + // 'timestamp', + // 'Milliseconds', + // ); + // + // ConnectEventSchemaUtils.finishEventSchemaConfiguration(); + // ConnectUtils.tearDownPreprocessingRuleTest( + // adapterConfiguration, + // 'cypress/fixtures/connect/fileReplay/timestampInMilliseconds/expected.csv', + // false, + // 2000, + // ); + // }); }, ); diff --git a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts index 1aa4d14ce6..673cf31ebf 100644 --- a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts +++ b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts @@ -43,7 +43,7 @@ describe('Connect schema rule transformations', () => { ); // Add a timestamp property - ConnectEventSchemaUtils.addTimestampProperty(); + ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp'); ConnectEventSchemaUtils.finishEventSchemaConfiguration(); diff --git a/ui/cypress/tests/adapter/rules/valueRules.ts b/ui/cypress/tests/adapter/rules/valueRules.spec.ts similarity index 97% rename from ui/cypress/tests/adapter/rules/valueRules.ts rename to ui/cypress/tests/adapter/rules/valueRules.spec.ts index 2163f454d2..b4b698e2a9 100644 --- a/ui/cypress/tests/adapter/rules/valueRules.ts +++ b/ui/cypress/tests/adapter/rules/valueRules.spec.ts @@ -31,7 +31,7 @@ describe('Connect value rule transformations', () => { ConnectUtils.setUpPreprocessingRuleTest(false); // Edit timestamp property - ConnectEventSchemaUtils.editTimestampProperty( + ConnectEventSchemaUtils.editTimestampPropertyWithRegex( 'timestamp', "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", ); diff --git a/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html index 85346b2101..b600a282ae 100644 --- a/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html +++ b/ui/src/app/connect/dialog/adapter-started/adapter-started-success/adapter-started-success.component.html @@ -67,6 +67,7 @@