diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java index 4adf3f153c..4abb33d679 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java @@ -20,10 +20,10 @@ import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer; import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.field.PrimitiveField; -import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.vocabulary.SO; @@ -31,7 +31,6 @@ import org.influxdb.InfluxDB; import org.influxdb.dto.Point; -import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -49,6 +48,8 @@ public class InfluxStore { Map sanitizedRuntimeNames = new HashMap<>(); private InfluxDB influxDb = null; + private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer(); + public InfluxStore(DataLakeMeasure measure, InfluxConnectionSettings settings) { this.measure = measure; @@ -76,7 +77,7 @@ private void connect(InfluxConnectionSettings settings) throws SpRuntimeExceptio influxDb = InfluxClientProvider.getInfluxDBClient(settings); // Checking, if server is available - Pong response = influxDb.ping(); + var response = influxDb.ping(); if (response.getVersion().equalsIgnoreCase("unknown")) { throw new SpRuntimeException("Could not connect to InfluxDb Server: " + settings.getConnectionUrl()); } @@ -90,8 +91,8 @@ private void connect(InfluxConnectionSettings settings) throws SpRuntimeExceptio // setting up the database influxDb.setDatabase(databaseName); - int batchSize = 2000; - int flushDuration = 500; + var batchSize = 2000; + var flushDuration = 500; influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS); } @@ -122,28 +123,26 @@ public void onEvent(Event event) throws SpRuntimeException { } // sanitize event - for (String key : event.getRaw().keySet()) { + for (var key : event.getRaw().keySet()) { if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k -> k.equalsIgnoreCase(key))) { event.renameFieldByRuntimeName(key, key + "_"); } } - Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong(); - Point.Builder point = + var timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong(); + var point = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS); - for (EventProperty ep : measure.getEventSchema().getEventProperties()) { - if (ep instanceof EventPropertyPrimitive) { - String runtimeName = ep.getRuntimeName(); - - // timestamp should not be added as a field - if (!measure.getTimestampField().endsWith(runtimeName)) { - String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName); - - try { - var field = event.getOptionalFieldByRuntimeName(runtimeName); + for (var ep : measure.getEventSchema().getEventProperties()) { + var runtimeName = ep.getRuntimeName(); + // timestamp should not be added as a field + if (!measure.getTimestampField().endsWith(runtimeName)) { + var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName); + var field = event.getOptionalFieldByRuntimeName(runtimeName); + try { + if (ep instanceof EventPropertyPrimitive) { if (field.isPresent()) { - PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive(); + var eventPropertyPrimitiveField = field.get().getAsPrimitive(); if (eventPropertyPrimitiveField.getRawValue() == null) { nullFields.add(sanitizedRuntimeName); } else { @@ -162,10 +161,18 @@ public void onEvent(Event event) throws SpRuntimeException { } else { missingFields.add(runtimeName); } - } catch (SpRuntimeException iae) { - LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", - runtimeName, iae); + } else { + // Since InfluxDB can't store non-primitive types, store them as string + // and deserialize later in downstream processes + if (field.isPresent()) { + handleNonPrimitiveMeasurementProperty(point, event, sanitizedRuntimeName); + } else { + missingFields.add(runtimeName); + } } + } catch (SpRuntimeException iae) { + LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", + runtimeName, iae); } } } @@ -189,7 +196,7 @@ private void handleMeasurementProperty(Point.Builder p, PrimitiveField eventPropertyPrimitiveField) { try { // Store property according to property type - String runtimeType = ep.getRuntimeType(); + var runtimeType = ep.getRuntimeType(); if (XSD.INTEGER.toString().equals(runtimeType)) { try { p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt()); @@ -218,6 +225,15 @@ private void handleMeasurementProperty(Point.Builder p, } } + private void handleNonPrimitiveMeasurementProperty(Point.Builder p, Event event, String preparedRuntimeName) { + try { + var json = rawFieldSerializer.serialize(event.getRaw().get(preparedRuntimeName)); + p.addField(preparedRuntimeName, json); + } catch (SpRuntimeException e) { + LOG.warn("Failed to serialize field {}, ignoring.", preparedRuntimeName); + } + } + /** * Shuts down the connection to the InfluxDB server */ diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java new file mode 100644 index 0000000000..05380df10c --- /dev/null +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.commons.influx.serializer; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; + +public class RawFieldSerializer { + protected ObjectMapper objectMapper; + + public RawFieldSerializer() { + this.objectMapper = new ObjectMapper().activateDefaultTyping( + BasicPolymorphicTypeValidator.builder() + .allowIfBaseType(Object.class) + .build(), + ObjectMapper.DefaultTyping.EVERYTHING); + } + + public String serialize(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new SpRuntimeException(e.getCause()); + } + } + + public Object deserialize(String json) { + try { + return objectMapper.readValue(json, Object.class); + } catch (JsonProcessingException e) { + throw new SpRuntimeException(e.getCause()); + } + } +} diff --git a/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java new file mode 100644 index 0000000000..c9b85695d1 --- /dev/null +++ b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.commons.influx.serializer; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestRawFieldSerializer { + private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer(); + private Map primitives = new HashMap(); + + public TestRawFieldSerializer() { + primitives.put("Integer", 1); + primitives.put("Long", 1L); + primitives.put("Float", 1.0f); + primitives.put("Double", 1.0d); + primitives.put("Boolean", true); + primitives.put("String", "1"); + } + + // Test able to deserialize back the original data + @Test + public void testRawFieldSerializerListInMap() { + var rawListField = new ArrayList(); + rawListField.addAll(primitives.values()); + + var rawNestedField = new HashMap(); + rawNestedField.putAll(primitives); + rawNestedField.put("List", rawListField); + + var json = rawFieldSerializer.serialize(rawNestedField); + + assertEquals(rawNestedField, rawFieldSerializer.deserialize(json)); + } + + @Test + public void testRawFieldSerializerMapInList() { + var rawNestedField = new HashMap(); + rawNestedField.putAll(primitives); + + var rawListField = new ArrayList(); + rawListField.addAll(primitives.values()); + rawListField.add(rawNestedField); + + var json = rawFieldSerializer.serialize(rawListField); + + assertEquals(rawListField, rawFieldSerializer.deserialize(json)); + } +}