diff --git a/main/src/main/java/org/opennms/nephron/Pipeline.java b/main/src/main/java/org/opennms/nephron/Pipeline.java index 9800a210..184b2f82 100644 --- a/main/src/main/java/org/opennms/nephron/Pipeline.java +++ b/main/src/main/java/org/opennms/nephron/Pipeline.java @@ -340,6 +340,7 @@ public static class ReadFromKafka extends PTransform kafkaConsumerConfig; + // metric name: flink_taskmanager_job_task_operator_flows_from_kafka_drift private final Counter flowsFromKafka = Metrics.counter("flows", "from_kafka"); // a distribution would be more interesting for from_kafka_drift // -> Unfortunately histograms are not supported Beam/Flink/Prometheus @@ -379,7 +380,7 @@ public void processElement(ProcessContext c) { // Metrics flowsFromKafka.inc(); - flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getTimestamp()); + flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getLastSwitched().getValue()); } })); }