diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java index e36b6e9cf9..4adf3f153c 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java @@ -121,6 +121,13 @@ public void onEvent(Event event) throws SpRuntimeException { throw new SpRuntimeException("event is null"); } + // sanitize event + for (String key : event.getRaw().keySet()) { + if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k -> k.equalsIgnoreCase(key))) { + event.renameFieldByRuntimeName(key, key + "_"); + } + } + Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong(); Point.Builder point = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java index 58281241e4..375f863aff 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java @@ -119,6 +119,14 @@ public void updateFieldBySelector(String selector, AbstractField field) { } } + public void renameFieldByRuntimeName(String oldRuntimeName, String newRuntimeName) { + AbstractField field = getFieldByRuntimeName(oldRuntimeName); + String selector = makeKey(field); + removeFieldBySelector(selector); + field.rename(newRuntimeName); + addField(field); + } + private void updateFieldMap(Map currentFieldMap, String selector, Integer position, AbstractField field) {