diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 7eed30ef197..db303b1b763 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::HashMap; +use std::sync::OnceLock; use once_cell::sync::Lazy; use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder}; @@ -259,10 +260,10 @@ pub struct InFlightDataGauges { pub wal: IntGauge, pub fetch_stream: IntGauge, pub multi_fetch_stream: IntGauge, - pub sources: InFlightDataSourceGauges, pub doc_processor_mailbox: IntGauge, pub indexer_mailbox: IntGauge, pub index_writer: IntGauge, + in_flight_gauge_vec: IntGaugeVec<1>, } impl Default for InFlightDataGauges { @@ -282,37 +283,70 @@ impl Default for InFlightDataGauges { wal: in_flight_gauge_vec.with_label_values(["wal"]), fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]), multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]), - sources: InFlightDataSourceGauges::new(&in_flight_gauge_vec), doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), + in_flight_gauge_vec: in_flight_gauge_vec.clone(), } } } -/// TODO make those lazy. -#[derive(Clone)] -pub struct InFlightDataSourceGauges { - pub file: IntGauge, - pub ingest: IntGauge, - pub kafka: IntGauge, - pub kinesis: IntGauge, - pub pubsub: IntGauge, - pub pulsar: IntGauge, - pub other: IntGauge, -} +impl InFlightDataGauges { + #[inline] + pub fn file(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["file_source"])) + } -impl InFlightDataSourceGauges { - pub fn new(in_flight_gauge_vec: &IntGaugeVec<1>) -> Self { - Self { - file: in_flight_gauge_vec.with_label_values(["file_source"]), - ingest: in_flight_gauge_vec.with_label_values(["ingest_source"]), - kafka: in_flight_gauge_vec.with_label_values(["kafka_source"]), - kinesis: in_flight_gauge_vec.with_label_values(["kinesis_source"]), - pubsub: in_flight_gauge_vec.with_label_values(["pubsub_source"]), - pulsar: in_flight_gauge_vec.with_label_values(["pulsar_source"]), - other: in_flight_gauge_vec.with_label_values(["other"]), - } + #[inline] + pub fn ingest(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + self.in_flight_gauge_vec + .with_label_values(["ingest_source"]) + }) + } + + #[inline] + pub fn kafka(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["kafka_source"])) + } + + #[inline] + pub fn kinesis(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + self.in_flight_gauge_vec + .with_label_values(["kinesis_source"]) + }) + } + + #[inline] + pub fn pubsub(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + self.in_flight_gauge_vec + .with_label_values(["pubsub_source"]) + }) + } + + #[inline] + pub fn pulsar(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + self.in_flight_gauge_vec + .with_label_values(["pulsar_source"]) + }) + } + + #[inline] + pub fn other(&self) -> &IntGauge { + static GAUGE: OnceLock = OnceLock::new(); + GAUGE.get_or_init(|| { + self.in_flight_gauge_vec + .with_label_values(["pulsar_source"]) + }) } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 21d23774bd2..ed01d8041af 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -492,13 +492,13 @@ impl BatchBuilder { pub fn with_capacity(capacity: usize, source_type: SourceType) -> Self { let gauge = match source_type { - SourceType::File => &MEMORY_METRICS.in_flight.sources.file, - SourceType::IngestV2 => &MEMORY_METRICS.in_flight.sources.ingest, - SourceType::Kafka => &MEMORY_METRICS.in_flight.sources.kafka, - SourceType::Kinesis => &MEMORY_METRICS.in_flight.sources.kinesis, - SourceType::PubSub => &MEMORY_METRICS.in_flight.sources.pubsub, - SourceType::Pulsar => &MEMORY_METRICS.in_flight.sources.pulsar, - _ => &MEMORY_METRICS.in_flight.sources.other, + SourceType::File => MEMORY_METRICS.in_flight.file(), + SourceType::IngestV2 => MEMORY_METRICS.in_flight.ingest(), + SourceType::Kafka => MEMORY_METRICS.in_flight.kafka(), + SourceType::Kinesis => MEMORY_METRICS.in_flight.kinesis(), + SourceType::PubSub => MEMORY_METRICS.in_flight.pubsub(), + SourceType::Pulsar => MEMORY_METRICS.in_flight.pulsar(), + _ => MEMORY_METRICS.in_flight.other(), }; let gauge_guard = GaugeGuard::from_gauge(gauge);