Skip to content

Commit

Permalink
NMS-12987: report currentTimeMillis - flow.lastSwitched as kafka_drif…
Browse files Browse the repository at this point in the history
…t (watermarks are also based on lastSwitched)
  • Loading branch information
swachter committed Jun 4, 2021
1 parent ff85cb5 commit bbc2bbf
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion main/src/main/java/org/opennms/nephron/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ public static class ReadFromKafka extends PTransform<PBegin, PCollection<FlowDoc
private final String topic;
private final Map<String, Object> kafkaConsumerConfig;

// metric name: flink_taskmanager_job_task_operator_flows_from_kafka_drift
private final Counter flowsFromKafka = Metrics.counter("flows", "from_kafka");
// a distribution would be more interesting for from_kafka_drift
// -> Unfortunately histograms are not supported Beam/Flink/Prometheus
Expand Down Expand Up @@ -379,7 +380,7 @@ public void processElement(ProcessContext c) {

// Metrics
flowsFromKafka.inc();
flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getTimestamp());
flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getLastSwitched().getValue());
}
}));
}
Expand Down

0 comments on commit bbc2bbf

Please sign in to comment.