Skip to content

Commit

Permalink
fix(#2106) Serialize non-primitive types as string and store in Influ…
Browse files Browse the repository at this point in the history
…xDB (#2196)

* Serialize non-primitive types and store in Influx

* extract RawFieldSerializer

* rename test

* delete old
  • Loading branch information
muyangye authored Nov 27, 2023
1 parent 8d8054b commit 34b33bd
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@

import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.PrimitiveField;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.vocabulary.XSD;

import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
Expand All @@ -49,6 +48,8 @@ public class InfluxStore {
Map<String, String> sanitizedRuntimeNames = new HashMap<>();
private InfluxDB influxDb = null;

private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();

public InfluxStore(DataLakeMeasure measure,
InfluxConnectionSettings settings) {
this.measure = measure;
Expand Down Expand Up @@ -76,7 +77,7 @@ private void connect(InfluxConnectionSettings settings) throws SpRuntimeExceptio
influxDb = InfluxClientProvider.getInfluxDBClient(settings);

// Checking, if server is available
Pong response = influxDb.ping();
var response = influxDb.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
throw new SpRuntimeException("Could not connect to InfluxDb Server: " + settings.getConnectionUrl());
}
Expand All @@ -90,8 +91,8 @@ private void connect(InfluxConnectionSettings settings) throws SpRuntimeExceptio

// setting up the database
influxDb.setDatabase(databaseName);
int batchSize = 2000;
int flushDuration = 500;
var batchSize = 2000;
var flushDuration = 500;
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -122,28 +123,26 @@ public void onEvent(Event event) throws SpRuntimeException {
}

// sanitize event
for (String key : event.getRaw().keySet()) {
for (var key : event.getRaw().keySet()) {
if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k -> k.equalsIgnoreCase(key))) {
event.renameFieldByRuntimeName(key, key + "_");
}
}

Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
Point.Builder point =
var timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
var point =
Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS);

for (EventProperty ep : measure.getEventSchema().getEventProperties()) {
if (ep instanceof EventPropertyPrimitive) {
String runtimeName = ep.getRuntimeName();

// timestamp should not be added as a field
if (!measure.getTimestampField().endsWith(runtimeName)) {
String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);

try {
var field = event.getOptionalFieldByRuntimeName(runtimeName);
for (var ep : measure.getEventSchema().getEventProperties()) {
var runtimeName = ep.getRuntimeName();
// timestamp should not be added as a field
if (!measure.getTimestampField().endsWith(runtimeName)) {
var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
var field = event.getOptionalFieldByRuntimeName(runtimeName);
try {
if (ep instanceof EventPropertyPrimitive) {
if (field.isPresent()) {
PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive();
var eventPropertyPrimitiveField = field.get().getAsPrimitive();
if (eventPropertyPrimitiveField.getRawValue() == null) {
nullFields.add(sanitizedRuntimeName);
} else {
Expand All @@ -162,10 +161,18 @@ public void onEvent(Event event) throws SpRuntimeException {
} else {
missingFields.add(runtimeName);
}
} catch (SpRuntimeException iae) {
LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored",
runtimeName, iae);
} else {
// Since InfluxDB can't store non-primitive types, store them as string
// and deserialize later in downstream processes
if (field.isPresent()) {
handleNonPrimitiveMeasurementProperty(point, event, sanitizedRuntimeName);
} else {
missingFields.add(runtimeName);
}
}
} catch (SpRuntimeException iae) {
LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored",
runtimeName, iae);
}
}
}
Expand All @@ -189,7 +196,7 @@ private void handleMeasurementProperty(Point.Builder p,
PrimitiveField eventPropertyPrimitiveField) {
try {
// Store property according to property type
String runtimeType = ep.getRuntimeType();
var runtimeType = ep.getRuntimeType();
if (XSD.INTEGER.toString().equals(runtimeType)) {
try {
p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
Expand Down Expand Up @@ -218,6 +225,15 @@ private void handleMeasurementProperty(Point.Builder p,
}
}

private void handleNonPrimitiveMeasurementProperty(Point.Builder p, Event event, String preparedRuntimeName) {
try {
var json = rawFieldSerializer.serialize(event.getRaw().get(preparedRuntimeName));
p.addField(preparedRuntimeName, json);
} catch (SpRuntimeException e) {
LOG.warn("Failed to serialize field {}, ignoring.", preparedRuntimeName);
}
}

/**
* Shuts down the connection to the InfluxDB server
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.commons.influx.serializer;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;

public class RawFieldSerializer {
protected ObjectMapper objectMapper;

public RawFieldSerializer() {
this.objectMapper = new ObjectMapper().activateDefaultTyping(
BasicPolymorphicTypeValidator.builder()
.allowIfBaseType(Object.class)
.build(),
ObjectMapper.DefaultTyping.EVERYTHING);
}

public String serialize(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new SpRuntimeException(e.getCause());
}
}

public Object deserialize(String json) {
try {
return objectMapper.readValue(json, Object.class);
} catch (JsonProcessingException e) {
throw new SpRuntimeException(e.getCause());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.commons.influx.serializer;

import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;

public class TestRawFieldSerializer {
private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer();
private Map<String, Object> primitives = new HashMap<String, Object>();

public TestRawFieldSerializer() {
primitives.put("Integer", 1);
primitives.put("Long", 1L);
primitives.put("Float", 1.0f);
primitives.put("Double", 1.0d);
primitives.put("Boolean", true);
primitives.put("String", "1");
}

// Test able to deserialize back the original data
@Test
public void testRawFieldSerializerListInMap() {
var rawListField = new ArrayList<Object>();
rawListField.addAll(primitives.values());

var rawNestedField = new HashMap<String, Object>();
rawNestedField.putAll(primitives);
rawNestedField.put("List", rawListField);

var json = rawFieldSerializer.serialize(rawNestedField);

assertEquals(rawNestedField, rawFieldSerializer.deserialize(json));
}

@Test
public void testRawFieldSerializerMapInList() {
var rawNestedField = new HashMap<String, Object>();
rawNestedField.putAll(primitives);

var rawListField = new ArrayList<Object>();
rawListField.addAll(primitives.values());
rawListField.add(rawNestedField);

var json = rawFieldSerializer.serialize(rawListField);

assertEquals(rawListField, rawFieldSerializer.deserialize(json));
}
}

0 comments on commit 34b33bd

Please sign in to comment.