Skip to content

Commit

Permalink
fix(#2549): consider renaming rules for timestamp extraction (#2559)
Browse files Browse the repository at this point in the history
* fix(#2549): consider renaming rules for timestamp retrieval

* style: fix indentation

* style: fix indentation

* fix(#2549): extract method
  • Loading branch information
bossenti authored Mar 15, 2024
1 parent 3e17c93 commit 92a71b8
Showing 1 changed file with 125 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.streampipes.extensions.management.connect.adapter.parser.xml.XmlParser;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.connect.rules.schema.RenameRuleDescription;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
Expand All @@ -48,6 +49,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -74,6 +76,8 @@ public class FileReplayAdapter implements StreamPipesAdapter {
private boolean replaceTimestamp;
private String timestampRuntimeName;

private String timestampSourceFieldName;

private float speedUp;

private long timestampLastEvent = -1;
Expand All @@ -85,25 +89,43 @@ public IAdapterConfiguration declareConfig() {
new JsonParsers(),
new CsvParser(),
new XmlParser(),
new ImageParser())
new ImageParser()
)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic)
.requiredFile(Labels.withId(FILE_PATH), Filetypes.CSV, Filetypes.JSON, Filetypes.XML)
.requiredFile(
Labels.withId(FILE_PATH),
Filetypes.CSV,
Filetypes.JSON,
Filetypes.XML
)
.requiredMultiValueSelection(Labels.withId(REPLACE_TIMESTAMP), Options.from(""))
.requiredSingleValueSelection(Labels.withId(REPLAY_ONCE), Options.from("no", "yes"))
.requiredSingleValueSelection(
Labels.withId(REPLAY_ONCE),
Options.from("no", "yes")
)
.requiredAlternatives(Labels.withId(SPEED),
Alternatives.from(Labels.withId(KEEP_ORIGINAL_TIME), true),
Alternatives.from(Labels.withId(FASTEST)), Alternatives.from(Labels.withId(SPEED_UP_FACTOR),
StaticProperties.group(Labels.withId(SPEED_UP_FACTOR_GROUP),
StaticProperties.doubleFreeTextProperty(Labels.withId(SPEED_UP)))))
Alternatives.from(Labels.withId(FASTEST)),
Alternatives.from(
Labels.withId(SPEED_UP_FACTOR),
StaticProperties.group(
Labels.withId(SPEED_UP_FACTOR_GROUP),
StaticProperties.doubleFreeTextProperty(Labels.withId(
SPEED_UP))
)
)
)
.buildConfiguration();
}

@Override
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
public void onAdapterStarted(
IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext
) throws AdapterException {

// extract user input
executor = Executors.newScheduledThreadPool(1);
Expand All @@ -114,7 +136,7 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
var replaceTimestampStringList = extractor
.getStaticPropertyExtractor()
.selectedMultiValues(REPLACE_TIMESTAMP, String.class);
replaceTimestamp = replaceTimestampStringList.size() != 0;
replaceTimestamp = !replaceTimestampStringList.isEmpty();
var speedUpAlternative = extractor
.getStaticPropertyExtractor()
.selectedAlternativeInternalId(SPEED);
Expand All @@ -133,66 +155,123 @@ public void onAdapterStarted(IAdapterParameterExtractor extractor,
.getEventProperties()
.stream()
.filter(eventProperty ->
eventProperty.getDomainProperties().contains(URI.create(SO.DATE_TIME)))
eventProperty.getDomainProperties()
.contains(URI.create(SO.DATE_TIME)))
.findFirst();

if (timestampField.isEmpty()) {
throw new AdapterException("Could not find a timestamp field in event schema");
} else {
timestampRuntimeName = timestampField.get().getRuntimeName();
timestampRuntimeName = timestampField.get()
.getRuntimeName();
}

determineSourceTimestampField(extractor);

// start replay adapter
if (replayOnce) {
executor.schedule(() -> parseFile(extractor, collector, adapterRuntimeContext),
executor.schedule(
() -> parseFile(extractor, collector, adapterRuntimeContext),
0,
TimeUnit.SECONDS);
TimeUnit.SECONDS
);
} else {
executor.scheduleAtFixedRate(() -> parseFile(extractor, collector, adapterRuntimeContext),
executor.scheduleAtFixedRate(
() -> parseFile(extractor, collector, adapterRuntimeContext),
0,
1,
TimeUnit.SECONDS);
TimeUnit.SECONDS
);
}
}

private void parseFile(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) {
/**
* Determines the source field name for the timestamp property based
* on any renaming rules configured within the adapter.
* It ensures that the correct field name is used when extracting the timestamp from the raw data.
*
* @param extractor An instance of {@code IAdapterParameterExtractor} used for extracting adapter parameters.
* @throws AdapterException If there are multiple renaming rules detected affecting the timestamp property,
* indicating an invalid configuration.
*/
private void determineSourceTimestampField(IAdapterParameterExtractor extractor) throws AdapterException {
var renamingRulesTimestamp = getRenamingRulesTimestamp(extractor);
if (!renamingRulesTimestamp.isEmpty()) {
if (renamingRulesTimestamp.size() == 1) {
timestampSourceFieldName = renamingRulesTimestamp.get(0)
.getOldRuntimeKey();
} else {
throw new AdapterException("Invalid configuration - multiple renaming rules detected which affect the "
+ "timestamp property.");
}
} else {
// no renaming rules can be found, timestamp name does not change
timestampSourceFieldName = timestampRuntimeName;
}
}

/**
* Retrieves a list of {@link RenameRuleDescription} affecting the timestamp property.
*
* @param extractor An instance of IAdapterParameterExtractor used to extract adapter description and rules.
* @return A list of {@link RenameRuleDescription} containing rules affecting timestamp property.
*/
private List<RenameRuleDescription> getRenamingRulesTimestamp(IAdapterParameterExtractor extractor) {
var renamingRules =
extractor.getAdapterDescription()
.getRules()
.stream()
.filter(rule -> rule instanceof RenameRuleDescription)
.map(rule -> (RenameRuleDescription) rule)
.toList();

return renamingRules.stream()
.filter(rule -> rule.getNewRuntimeKey()
.equals(timestampRuntimeName))
.toList();
}

private void parseFile(
IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext
) {
try {
InputStream inputStream = getDataFromEndpoint(extractor
.getStaticPropertyExtractor()
.selectedFilename(FILE_PATH));

extractor.selectedParser().parse(inputStream, (event) -> {

long actualEventTimestamp = -1;
if (event.get(timestampRuntimeName) instanceof Long) {
actualEventTimestamp = (long) event.get(timestampRuntimeName);
} else {
LOG.error(
"The timestamp field is not a unix timestamp in ms. Value: %s"
.formatted(event.get(timestampRuntimeName)));
}

if (timestampLastEvent != -1) {
long sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) / speedUp);
// speed up is set to Float.MAX_VALUE when user selected fastest option
if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
LOG.info("File stream adapter was stopped, the current replay is interuppted", e);
extractor.selectedParser()
.parse(inputStream, (event) -> {

long actualEventTimestamp = -1;
if (event.get(timestampSourceFieldName) instanceof Long) {
actualEventTimestamp = (long) event.get(timestampSourceFieldName);
} else {
LOG.error(
"The timestamp field is not a unix timestamp in ms. Value: %s"
.formatted(event.get(timestampSourceFieldName)));
}
}
}

timestampLastEvent = actualEventTimestamp;
if (replaceTimestamp) {
event.put(timestampRuntimeName, System.currentTimeMillis());
}
if (timestampLastEvent != -1) {
long sleepTime = (long) ((actualEventTimestamp - timestampLastEvent) / speedUp);
// speed up is set to Float.MAX_VALUE when user selected fastest option
if (sleepTime > 0 && speedUp != Float.MAX_VALUE) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
LOG.info("File stream adapter was stopped, the current replay is interuppted", e);
}
}
}

timestampLastEvent = actualEventTimestamp;
if (replaceTimestamp) {
event.put(timestampSourceFieldName, System.currentTimeMillis());
}

collector.collect(event);
});
collector.collect(event);
});

} catch (AdapterException e) {
adapterRuntimeContext
Expand Down

0 comments on commit 92a71b8

Please sign in to comment.