From ef3674e6033998c880a40383b99799db97eaf289 Mon Sep 17 00:00:00 2001 From: Muyang Ye Date: Mon, 13 Nov 2023 22:43:36 -0800 Subject: [PATCH] fix(#2166): sanitize event keys in influx sink (#2172) * implement new round processor * add English locale, icon, and documentation * fix checkstyle * support different rounding modes * add rounding mode in documentation * fix time display * let NaryMapping selection account for property scope * implement boolean filter unit tests * add common StoreEventCollector class and refactor TestChangedValueDetectionProcessor * add new class * show associated pipelines' names and allow one click deletion * center text * fix minor error * replace magic number * add timeout * restore newline * changeb baseurl * revert port * revert timeout * implement pipelines owner check * undo automatic changes * enable admin to delete pipelines no matter ownership * sanitize event * add newline back * fix iter is on a copy --------- Co-authored-by: bossenti --- .../dataexplorer/commons/influx/InfluxStore.java | 7 +++++++ .../java/org/apache/streampipes/model/runtime/Event.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java index e36b6e9cf9..4adf3f153c 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java @@ -121,6 +121,13 @@ public void onEvent(Event event) throws SpRuntimeException { throw new SpRuntimeException("event is null"); } + // sanitize event + for (String key : event.getRaw().keySet()) { + if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k -> k.equalsIgnoreCase(key))) { + event.renameFieldByRuntimeName(key, key + "_"); + } + } + Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong(); Point.Builder point = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java index 58281241e4..375f863aff 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java @@ -119,6 +119,14 @@ public void updateFieldBySelector(String selector, AbstractField field) { } } + public void renameFieldByRuntimeName(String oldRuntimeName, String newRuntimeName) { + AbstractField field = getFieldByRuntimeName(oldRuntimeName); + String selector = makeKey(field); + removeFieldBySelector(selector); + field.rename(newRuntimeName); + addField(field); + } + private void updateFieldMap(Map currentFieldMap, String selector, Integer position, AbstractField field) {