Skip to content

Commit

Permalink
Making the source in flight component lazy. (#4881)
Browse files Browse the repository at this point in the history
Right now we pollute our metrics with unused sources (like pulsar, kafka
etc.).
After this PR, only sources that are actually used will show up.
  • Loading branch information
fulmicoton authored Apr 19, 2024
1 parent d2d5809 commit 45bb963
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
82 changes: 58 additions & 24 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::OnceLock;

use once_cell::sync::Lazy;
use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
Expand Down Expand Up @@ -259,10 +260,10 @@ pub struct InFlightDataGauges {
pub wal: IntGauge,
pub fetch_stream: IntGauge,
pub multi_fetch_stream: IntGauge,
pub sources: InFlightDataSourceGauges,
pub doc_processor_mailbox: IntGauge,
pub indexer_mailbox: IntGauge,
pub index_writer: IntGauge,
in_flight_gauge_vec: IntGaugeVec<1>,
}

impl Default for InFlightDataGauges {
Expand All @@ -282,37 +283,70 @@ impl Default for InFlightDataGauges {
wal: in_flight_gauge_vec.with_label_values(["wal"]),
fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]),
multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]),
sources: InFlightDataSourceGauges::new(&in_flight_gauge_vec),
doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]),
indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]),
index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]),
in_flight_gauge_vec: in_flight_gauge_vec.clone(),
}
}
}

/// TODO make those lazy.
#[derive(Clone)]
pub struct InFlightDataSourceGauges {
pub file: IntGauge,
pub ingest: IntGauge,
pub kafka: IntGauge,
pub kinesis: IntGauge,
pub pubsub: IntGauge,
pub pulsar: IntGauge,
pub other: IntGauge,
}
impl InFlightDataGauges {
#[inline]
pub fn file(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["file_source"]))
}

impl InFlightDataSourceGauges {
pub fn new(in_flight_gauge_vec: &IntGaugeVec<1>) -> Self {
Self {
file: in_flight_gauge_vec.with_label_values(["file_source"]),
ingest: in_flight_gauge_vec.with_label_values(["ingest_source"]),
kafka: in_flight_gauge_vec.with_label_values(["kafka_source"]),
kinesis: in_flight_gauge_vec.with_label_values(["kinesis_source"]),
pubsub: in_flight_gauge_vec.with_label_values(["pubsub_source"]),
pulsar: in_flight_gauge_vec.with_label_values(["pulsar_source"]),
other: in_flight_gauge_vec.with_label_values(["other"]),
}
#[inline]
pub fn ingest(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| {
self.in_flight_gauge_vec
.with_label_values(["ingest_source"])
})
}

#[inline]
pub fn kafka(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["kafka_source"]))
}

#[inline]
pub fn kinesis(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| {
self.in_flight_gauge_vec
.with_label_values(["kinesis_source"])
})
}

#[inline]
pub fn pubsub(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| {
self.in_flight_gauge_vec
.with_label_values(["pubsub_source"])
})
}

#[inline]
pub fn pulsar(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| {
self.in_flight_gauge_vec
.with_label_values(["pulsar_source"])
})
}

#[inline]
pub fn other(&self) -> &IntGauge {
static GAUGE: OnceLock<IntGauge> = OnceLock::new();
GAUGE.get_or_init(|| {
self.in_flight_gauge_vec
.with_label_values(["pulsar_source"])
})
}
}

Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,13 @@ impl BatchBuilder {

pub fn with_capacity(capacity: usize, source_type: SourceType) -> Self {
let gauge = match source_type {
SourceType::File => &MEMORY_METRICS.in_flight.sources.file,
SourceType::IngestV2 => &MEMORY_METRICS.in_flight.sources.ingest,
SourceType::Kafka => &MEMORY_METRICS.in_flight.sources.kafka,
SourceType::Kinesis => &MEMORY_METRICS.in_flight.sources.kinesis,
SourceType::PubSub => &MEMORY_METRICS.in_flight.sources.pubsub,
SourceType::Pulsar => &MEMORY_METRICS.in_flight.sources.pulsar,
_ => &MEMORY_METRICS.in_flight.sources.other,
SourceType::File => MEMORY_METRICS.in_flight.file(),
SourceType::IngestV2 => MEMORY_METRICS.in_flight.ingest(),
SourceType::Kafka => MEMORY_METRICS.in_flight.kafka(),
SourceType::Kinesis => MEMORY_METRICS.in_flight.kinesis(),
SourceType::PubSub => MEMORY_METRICS.in_flight.pubsub(),
SourceType::Pulsar => MEMORY_METRICS.in_flight.pulsar(),
_ => MEMORY_METRICS.in_flight.other(),
};
let gauge_guard = GaugeGuard::from_gauge(gauge);

Expand Down

0 comments on commit 45bb963

Please sign in to comment.