diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java index fc7cdb5cd6..3afa98d2f2 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java @@ -32,6 +32,7 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentProcessor; +import org.apache.streampipes.processors.transformation.jvm.processor.datetime.DateTimeFromStringProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.fieldrename.FiledRenameProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.hasher.FieldHasherProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.mapper.FieldMapperProcessor; @@ -67,6 +68,7 @@ public List> pipelineElements() { new TimestampExtractorProcessor(), new BooleanCounterProcessor(), new BooleanInverterProcessor(), + new DateTimeFromStringProcessor(), new BooleanTimekeepingProcessor(), new BooleanTimerProcessor(), new CsvMetadataEnrichmentProcessor(), @@ -84,7 +86,7 @@ public List> pipelineElements() { new BooleanOperatorProcessor(), new FiledRenameProcessor(), new RoundProcessor() - ); + ); } @Override diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java new file mode 100644 index 0000000000..be5b329468 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/DateTimeFromStringProcessor.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.transformation.jvm.processor.datetime; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.model.DataProcessorType; +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.schema.PropertyScope; +import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; +import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; +import org.apache.streampipes.sdk.helpers.EpProperties; +import org.apache.streampipes.sdk.helpers.EpRequirements; +import org.apache.streampipes.sdk.helpers.Labels; +import org.apache.streampipes.sdk.helpers.Locales; +import org.apache.streampipes.sdk.helpers.Options; +import org.apache.streampipes.sdk.helpers.OutputStrategies; +import org.apache.streampipes.sdk.utils.Assets; +import org.apache.streampipes.vocabulary.SO; +import org.apache.streampipes.wrapper.params.compat.ProcessorParams; +import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DateTimeFromStringProcessor extends StreamPipesDataProcessor { + + public static final String FIELD_ID = "inputField"; + public static final String OUTPUT_TIMESTAMP_RUNTIME_NAME = "timestringInMillis"; + public static final String OUTPUT_TIMEZONE_RUNTIME_NAME = "timeZone"; + public static final String INPUT_TIMEZONE_KEY = "inputTimeZone"; + + private String streamInputDateTimeFieldName; + private String selectedTimeZone; + + @Override + public DataProcessorDescription declareModel() { + return ProcessingElementBuilder + .create("org.apache.streampipes.processors.transformation.jvm.datetime", 0) + .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME) + .withLocales(Locales.EN) + .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .requiredStream(StreamRequirementsBuilder.create() + .requiredPropertyWithUnaryMapping( + EpRequirements.stringReq(), + Labels.withId(FIELD_ID), + PropertyScope.NONE + ) + .build()) + .requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY), + Options.from(getTimeZoneOptions()), true + ) + .outputStrategy( + OutputStrategies.append( + EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME), + EpProperties.stringEp( + // We can use the labels from the input timezone here + Labels.withId(INPUT_TIMEZONE_KEY), + OUTPUT_TIMEZONE_RUNTIME_NAME, + SO.SCHEDULE_TIMEZONE + ) + ) + ) + .build(); + } + + @Override + public void onInvocation( + ProcessorParams parameters, SpOutputCollector spOutputCollector, + EventProcessorRuntimeContext runtimeContext + ) throws SpRuntimeException { + ProcessingElementParameterExtractor extractor = parameters.extractor(); + this.streamInputDateTimeFieldName = extractor.mappingPropertyValue(FIELD_ID); + this.selectedTimeZone = extractor.selectedSingleValue(INPUT_TIMEZONE_KEY, String.class); + } + + @Override + public void onEvent(Event event, SpOutputCollector collector) { + String dateTimeString = event.getFieldBySelector(streamInputDateTimeFieldName) + .getAsPrimitive() + .getAsString(); + DateTimeFormatter dtFormatter = DateTimeFormatter.ISO_DATE_TIME; + ZonedDateTime zdt = parseDateTime(dateTimeString, dtFormatter); + + /* + * A temporary workaround is in place to put a long represent the + * zonedDateTimeVariable One possible workaround is to use the time zone and the + * long to reconstitute the actual time after the event has been sent. + * event.addField(OUTPUT_DATETIME_RUNTIME_NAME, zdt); + */ + event.addField( + OUTPUT_TIMESTAMP_RUNTIME_NAME, + zdt.toInstant() + .toEpochMilli() + ); + event.addField(OUTPUT_TIMEZONE_RUNTIME_NAME, selectedTimeZone); + + collector.collect(event); + } + + @Override + public void onDetach() { + + } + + private ZonedDateTime parseDateTime(String dateTimeString, DateTimeFormatter dtf) { + ZonedDateTime zdt; + try { + zdt = ZonedDateTime.parse(dateTimeString); + + } catch (DateTimeParseException e1) { + try { + LocalDateTime ldt = LocalDateTime.parse(dateTimeString, dtf); + ZoneId timeZoneId = ZoneId.of(selectedTimeZone); + zdt = ldt.atZone(timeZoneId); + } catch (DateTimeParseException e2) { + throw new RuntimeException("Could not parse DateTime String: " + dateTimeString); + } + } + return zdt; + } + + private static String[] getTimeZoneOptions() { + List timeZones = new ArrayList<>(ZoneId.getAvailableZoneIds()); + Collections.sort(timeZones); + return timeZones.toArray(new String[0]); + } +} diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/documentation.md b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/documentation.md new file mode 100644 index 0000000000..3dfcb6657f --- /dev/null +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/documentation.md @@ -0,0 +1,72 @@ + + +## Datetime From String + +

+ +

+ +*** + +## Overview + +The "Datetime From String" processor is a handy tool that helps convert human-readable datetime information into a +format that machines can understand. This is particularly useful when dealing with data that includes dates and times. + +### Why Use This Processor? + +In the context of event streams, you may encounter dates and times formatted for human readability but not necessarily +optimized for computer processing. The "Datetime From String" processor addresses this by facilitating the conversion +of human-readable datetime information within your continuous stream of events. + +*** + +## How It Works + +When you input a data stream into this processor containing a datetime in a specific format (such as "2023-11-24 15:30: +00"), it +undergoes a transformation. The processor converts it into a computer-friendly format called a ZonedDateTime object. + +### Example + +Let's say you have an event stream with a property containing values like "2023-11-24 15:30:00" and you want to make +sure your computer understands it. You can use +this processor to convert it into a format that's machine-friendly. + +*** + +## Getting Started + +To use this processor, you need one thing in your data: + +1. **Datetime String**: This is the name of the event property that contains the human-readable datetime string, like "2023-11-24 15:30:00". + + +### Configuration + +The only thing you need to configure is the time zone. +1. **Time Zone**: Specify the time zone that applies to your datetime if it doesn't already have this information.This ensures that the processor understands the context of your +datetime. + +## Output + +After the conversion happens, the processor adds a new piece of information to your data stream: + +* **timestringInMillis**: This is the transformed datetime in a format that computers can easily work with (UNIX timestamp in milliseconds). +* **timeZone**: The name of the timezone the `dateTime` value refers to. Can be used to reconstitute the actual time. \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/icon.png b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/icon.png new file mode 100644 index 0000000000..c26c37e060 Binary files /dev/null and b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/icon.png differ diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/strings.en b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/strings.en new file mode 100644 index 0000000000..a7b2e84277 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.datetime/strings.en @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.streampipes.processors.transformation.jvm.datetime.title=Datetime From String +org.apache.streampipes.processors.transformation.jvm.datetime.description=Converts a human-readable timestamp from a textual (string) representation into a computer-readable object + +inputField.title=DateTime String +inputField.description=The event property that contains the timestamp ISO 8601 formatted strings, e.g., '2023-11-29T18:30:22' + +inputTimeZone.title=Time Zone +inputTimeZone.description=The time zone for which the string applies (if the string already contains the time zone information, the user time zone will be ignored) + diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/TestDateTimeFromStringProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/TestDateTimeFromStringProcessor.java new file mode 100644 index 0000000000..14543c3d13 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/datetime/TestDateTimeFromStringProcessor.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.transformation.jvm.processor.datetime; + +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.model.graph.DataProcessorDescription; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.runtime.EventFactory; +import org.apache.streampipes.model.runtime.SchemaInfo; +import org.apache.streampipes.model.runtime.SourceInfo; +import org.apache.streampipes.model.staticproperty.MappingPropertyUnary; +import org.apache.streampipes.model.staticproperty.OneOfStaticProperty; +import org.apache.streampipes.model.staticproperty.Option; +import org.apache.streampipes.sdk.helpers.Tuple2; +import org.apache.streampipes.test.extensions.api.StoreEventCollector; +import org.apache.streampipes.test.generator.EventStreamGenerator; +import org.apache.streampipes.test.generator.InvocationGraphGenerator; +import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator; +import org.apache.streampipes.wrapper.params.compat.ProcessorParams; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class TestDateTimeFromStringProcessor { + private static final Logger LOG = LoggerFactory.getLogger(TestDateTimeFromStringProcessor.class); + + @org.junit.runners.Parameterized.Parameters + public static Iterable data() { + + var expectedTimestamp = 1700222655537L; + + return Arrays.asList(new Object[][]{ + // the first test just demonstrates that the testing and the source code is + // functioning + { + "inputField", "US/Eastern", + List.of("2020-11-13T21:07:38.146120+01:00", "2023-11-14T16:17:01.286299-05:00", + "2023-11-14T14:05:57.519543100" + ), + List.of(1605298058146L, 1699996621286L, 1699988757519L) + }, + /* + * the next three tests demonstrate two things: (1) if the DateTime string has + * the zone information already in it, the user input on the timezone will not + * affect the DateTime variable. (2) The same instant in the real world will + * result in the same instant in the datetime variable + */ + { + "inputField", "US/Pacific", List.of("2023-11-17T04:04:15.537187600-08:00[US/Pacific]"), + List.of(expectedTimestamp) + }, + { + "inputField", "US/Pacific", List.of("2023-11-17T05:04:15.537187600-07:00[US/Arizona]"), + List.of(expectedTimestamp) + }, + { + "inputField", "US/Pacific", List.of("2023-11-17T07:04:15.537187600-05:00[US/Eastern]"), + List.of(expectedTimestamp) + }, + /* + * The next three tests demonstrate that if a localdatetime is given, when the + * user selects a time zone. An instant in time will be created for that + * specific timezone. + */ + { + "inputField", "US/Pacific", List.of("2023-11-17T04:04:15.537187600"), + List.of(expectedTimestamp) + }, + { + "inputField", "US/Arizona", List.of("2023-11-17T04:04:15.537187600"), + List.of(1700219055537L) + }, + { + "inputField", "US/Eastern", List.of("2023-11-17T04:04:15.537187600"), + List.of(1700211855537L) + }, + + }); + } + + @org.junit.runners.Parameterized.Parameter + public String streamInputDateTimeFieldName; + + @org.junit.runners.Parameterized.Parameter(1) + public String selectedTimeZone; + + @org.junit.runners.Parameterized.Parameter(2) + public List eventsString; + + @org.junit.runners.Parameterized.Parameter(3) + public List expectedValues; + + public static final String DEFAULT_STREAM_PREFIX = "Stream"; + + @Test + public void testDateTime() { + DateTimeFromStringProcessor dateTime = new DateTimeFromStringProcessor(); + DataProcessorDescription originalGraph = dateTime.declareModel(); + originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); + + DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); + graph.setInputStreams(Collections.singletonList( + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream")))); + graph.setOutputStream( + EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream"))); + graph.getOutputStream() + .getEventGrounding() + .getTransportProtocol() + .getTopicDefinition() + .setActualTopicName("output-topic"); + + MappingPropertyUnary mappingPropertyUnary = graph.getStaticProperties() + .stream() + .filter(p -> p instanceof MappingPropertyUnary) + .map(p -> (MappingPropertyUnary) p) + .filter(p -> DateTimeFromStringProcessor.FIELD_ID.equals( + p.getInternalName() + ) + ) + .findFirst() + .orElse(null); + + assert mappingPropertyUnary != null; + mappingPropertyUnary.setSelectedProperty(DEFAULT_STREAM_PREFIX + "::" + streamInputDateTimeFieldName); + + OneOfStaticProperty selectedTimeZoneProperty = graph.getStaticProperties() + .stream() + .filter(p -> p instanceof OneOfStaticProperty) + .map(p -> (OneOfStaticProperty) p) + .filter(p -> ( + DateTimeFromStringProcessor.INPUT_TIMEZONE_KEY.equals( + p.getInternalName() + ) + )) + .findFirst() + .orElse(null); + assert selectedTimeZoneProperty != null; + Option selectedTimeZoneOption = selectedTimeZoneProperty.getOptions() + .stream() + .filter(item -> item.getName() + .equals(selectedTimeZone)) + .findFirst() + .orElse(null); + assert selectedTimeZoneOption != null; + selectedTimeZoneOption.setSelected(true); + + ProcessorParams params = new ProcessorParams(graph); + SpOutputCollector spOut = new StoreEventCollector(); + dateTime.onInvocation(params, spOut, null); + Tuple2> res = sendEvents(dateTime, spOut); + List resValues = res.v; + assert eventsString.size() == expectedValues.size(); + int size = eventsString.size(); + for (int i = 0; i < size; i++) { + LOG.info("Expected value is {}.", expectedValues.get(i)); + LOG.info( + "Actual value is {}.", + resValues.get(i) + .toString() + ); + assertEquals( + expectedValues.get(i), + resValues.get(i) + ); + } + } + + private Tuple2> sendEvents(DateTimeFromStringProcessor dateTime, SpOutputCollector spOut) { + String field = ""; + Long timestampValue = null; + List events = makeEvents(); + List dateTimeValueList = new ArrayList<>(); + for (Event event : events) { + LOG.info("sending event with value " + + event.getFieldBySelector(DEFAULT_STREAM_PREFIX + "::" + streamInputDateTimeFieldName) + .getAsPrimitive() + .getAsString()); + dateTime.onEvent(event, spOut); + + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + field = event.getFieldBySelector(DEFAULT_STREAM_PREFIX + "::" + DateTimeFromStringProcessor.FIELD_ID) + .getAsPrimitive() + .getAsString(); + timestampValue = event + .getFieldBySelector(DateTimeFromStringProcessor.OUTPUT_TIMESTAMP_RUNTIME_NAME) + .getAsPrimitive() + .getAsLong(); + LOG.info(field + ":" + timestampValue); + } catch (IllegalArgumentException e) { + throw new RuntimeException(e); + } + dateTimeValueList.add(timestampValue); + } + return new Tuple2<>(field, dateTimeValueList); + } + + private List makeEvents() { + List events = new ArrayList<>(); + for (String eventString : eventsString) { + events.add(makeEvent(eventString)); + } + return events; + } + + private Event makeEvent(String value) { + Map map = new HashMap<>(); + map.put(streamInputDateTimeFieldName, value); + return EventFactory.fromMap( + map, + new SourceInfo("test-topic", DEFAULT_STREAM_PREFIX), + new SchemaInfo(null, new ArrayList<>()) + ); + } +} diff --git a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/SO.java b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/SO.java index dfea7eaaff..c4abd3e3ac 100644 --- a/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/SO.java +++ b/streampipes-vocabulary/src/main/java/org/apache/streampipes/vocabulary/SO.java @@ -374,6 +374,7 @@ public class SO { public static final String SAME_AS = "http://schema.org/sameAs"; public static final String SAMPLE_TYPE = "http://schema.org/sampleType"; public static final String SATURATED_FAT_CONTENT = "http://schema.org/saturatedFatContent"; + public static final String SCHEDULE_TIMEZONE = "https://schema.org/scheduleTimezone"; public static final String SCHEDULED_TIME = "http://schema.org/scheduledTime"; public static final String SCREENSHOT = "http://schema.org/screenshot"; public static final String SEASON_NUMBER = "http://schema.org/seasonNumber";