Skip to content

Commit

Permalink
[#1865]: create processor to parse date time strings (#2207)
Browse files Browse the repository at this point in the history
* Creation of DateTimeProcessor

Closes #1865

This processor parses a DateTime string. It converts the string into a
ZonedDateTime variable, and places the variable in the event variable.
Tests are made to ensure that the code works properly.

I am still somewhat new, so any feedback is welcome.

* Updates to parse datetime.

Incorporates feedback such as the following:
* Way to initialize variables with constructors
* simplifies methods with more efficient methods
* creates the resource file for the processor
* standardizes the names of inputs (i.e field)
* creates the documentation for the processor

* I had renamed the processor, but I forgot to rename the Testing file.

* improve assets

* improve assets

* adapt parameter name in test

* Registering Processing Element and Aligning Ids

* Drop down instead of radio group.

Also change inputTimeZone description in resource files

* improve parameter description

* put timestamp in event

* Temporary workaround

* Includes output strategy for time zone.

One possible workaround is to use the time zone to reconstitute the
datetime after the event has been collected.

* minor final improvements

* style: adapt to checkstyle

---------

Co-authored-by: bossenti <[email protected]>
  • Loading branch information
dshunter107 and bossenti authored Dec 8, 2023
1 parent 8544084 commit da05e53
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.datetime.DateTimeFromStringProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.fieldrename.FiledRenameProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.hasher.FieldHasherProcessor;
import org.apache.streampipes.processors.transformation.jvm.processor.mapper.FieldMapperProcessor;
Expand Down Expand Up @@ -67,6 +68,7 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
new TimestampExtractorProcessor(),
new BooleanCounterProcessor(),
new BooleanInverterProcessor(),
new DateTimeFromStringProcessor(),
new BooleanTimekeepingProcessor(),
new BooleanTimerProcessor(),
new CsvMetadataEnrichmentProcessor(),
Expand All @@ -84,7 +86,7 @@ public List<IStreamPipesPipelineElement<?>> pipelineElements() {
new BooleanOperatorProcessor(),
new FiledRenameProcessor(),
new RoundProcessor()
);
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.transformation.jvm.processor.datetime;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class DateTimeFromStringProcessor extends StreamPipesDataProcessor {

public static final String FIELD_ID = "inputField";
public static final String OUTPUT_TIMESTAMP_RUNTIME_NAME = "timestringInMillis";
public static final String OUTPUT_TIMEZONE_RUNTIME_NAME = "timeZone";
public static final String INPUT_TIMEZONE_KEY = "inputTimeZone";

private String streamInputDateTimeFieldName;
private String selectedTimeZone;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder
.create("org.apache.streampipes.processors.transformation.jvm.datetime", 0)
.category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(
EpRequirements.stringReq(),
Labels.withId(FIELD_ID),
PropertyScope.NONE
)
.build())
.requiredSingleValueSelection(Labels.withId(INPUT_TIMEZONE_KEY),
Options.from(getTimeZoneOptions()), true
)
.outputStrategy(
OutputStrategies.append(
EpProperties.timestampProperty(OUTPUT_TIMESTAMP_RUNTIME_NAME),
EpProperties.stringEp(
// We can use the labels from the input timezone here
Labels.withId(INPUT_TIMEZONE_KEY),
OUTPUT_TIMEZONE_RUNTIME_NAME,
SO.SCHEDULE_TIMEZONE
)
)
)
.build();
}

@Override
public void onInvocation(
ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext
) throws SpRuntimeException {
ProcessingElementParameterExtractor extractor = parameters.extractor();
this.streamInputDateTimeFieldName = extractor.mappingPropertyValue(FIELD_ID);
this.selectedTimeZone = extractor.selectedSingleValue(INPUT_TIMEZONE_KEY, String.class);
}

@Override
public void onEvent(Event event, SpOutputCollector collector) {
String dateTimeString = event.getFieldBySelector(streamInputDateTimeFieldName)
.getAsPrimitive()
.getAsString();
DateTimeFormatter dtFormatter = DateTimeFormatter.ISO_DATE_TIME;
ZonedDateTime zdt = parseDateTime(dateTimeString, dtFormatter);

/*
* A temporary workaround is in place to put a long represent the
* zonedDateTimeVariable One possible workaround is to use the time zone and the
* long to reconstitute the actual time after the event has been sent.
* event.addField(OUTPUT_DATETIME_RUNTIME_NAME, zdt);
*/
event.addField(
OUTPUT_TIMESTAMP_RUNTIME_NAME,
zdt.toInstant()
.toEpochMilli()
);
event.addField(OUTPUT_TIMEZONE_RUNTIME_NAME, selectedTimeZone);

collector.collect(event);
}

@Override
public void onDetach() {

}

private ZonedDateTime parseDateTime(String dateTimeString, DateTimeFormatter dtf) {
ZonedDateTime zdt;
try {
zdt = ZonedDateTime.parse(dateTimeString);

} catch (DateTimeParseException e1) {
try {
LocalDateTime ldt = LocalDateTime.parse(dateTimeString, dtf);
ZoneId timeZoneId = ZoneId.of(selectedTimeZone);
zdt = ldt.atZone(timeZoneId);
} catch (DateTimeParseException e2) {
throw new RuntimeException("Could not parse DateTime String: " + dateTimeString);
}
}
return zdt;
}

private static String[] getTimeZoneOptions() {
List<String> timeZones = new ArrayList<>(ZoneId.getAvailableZoneIds());
Collections.sort(timeZones);
return timeZones.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

## Datetime From String

<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
</p>

***

## Overview

The "Datetime From String" processor is a handy tool that helps convert human-readable datetime information into a
format that machines can understand. This is particularly useful when dealing with data that includes dates and times.

### Why Use This Processor?

In the context of event streams, you may encounter dates and times formatted for human readability but not necessarily
optimized for computer processing. The "Datetime From String" processor addresses this by facilitating the conversion
of human-readable datetime information within your continuous stream of events.

***

## How It Works

When you input a data stream into this processor containing a datetime in a specific format (such as "2023-11-24 15:30:
00"), it
undergoes a transformation. The processor converts it into a computer-friendly format called a ZonedDateTime object.

### Example

Let's say you have an event stream with a property containing values like "2023-11-24 15:30:00" and you want to make
sure your computer understands it. You can use
this processor to convert it into a format that's machine-friendly.

***

## Getting Started

To use this processor, you need one thing in your data:

1. **Datetime String**: This is the name of the event property that contains the human-readable datetime string, like "2023-11-24 15:30:00".


### Configuration

The only thing you need to configure is the time zone.
1. **Time Zone**: Specify the time zone that applies to your datetime if it doesn't already have this information.This ensures that the processor understands the context of your
datetime.

## Output

After the conversion happens, the processor adds a new piece of information to your data stream:

* **timestringInMillis**: This is the transformed datetime in a format that computers can easily work with (UNIX timestamp in milliseconds).
* **timeZone**: The name of the timezone the `dateTime` value refers to. Can be used to reconstitute the actual time.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.streampipes.processors.transformation.jvm.datetime.title=Datetime From String
org.apache.streampipes.processors.transformation.jvm.datetime.description=Converts a human-readable timestamp from a textual (string) representation into a computer-readable object

inputField.title=DateTime String
inputField.description=The event property that contains the timestamp ISO 8601 formatted strings, e.g., '2023-11-29T18:30:22'

inputTimeZone.title=Time Zone
inputTimeZone.description=The time zone for which the string applies (if the string already contains the time zone information, the user time zone will be ignored)

Loading

0 comments on commit da05e53

Please sign in to comment.