Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve ingest lock #5296

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ pub mod busy_detector {
use std::time::Instant;

use once_cell::sync::Lazy;
use quickwit_common::metrics::Vector;
use tracing::debug;

use crate::metrics::CLI_METRICS;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use pin_project::pin_project;
use prometheus::IntCounter;
use tokio::io::AsyncWrite;

use crate::metrics::{new_counter_vec, IntCounterVec};
use crate::metrics::{new_counter_vec, IntCounterVec, Vector};
use crate::{KillSwitch, Progress, ProtectedZoneGuard};

// Max 1MB at a time.
Expand Down
23 changes: 17 additions & 6 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ pub use prometheus::{
};
use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder};

pub trait Vector<const N: usize, T> {
fn with_label_values(&self, label_values: [&str; N]) -> T;
}

#[derive(Clone)]
pub struct HistogramVec<const N: usize> {
underlying: PrometheusHistogramVec,
}

impl<const N: usize> HistogramVec<N> {
pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram {
impl<const N: usize> Vector<N, Histogram> for HistogramVec<N> {
fn with_label_values(&self, label_values: [&str; N]) -> Histogram {
self.underlying.with_label_values(&label_values)
}
}
Expand All @@ -44,8 +48,8 @@ pub struct IntCounterVec<const N: usize> {
underlying: PrometheusIntCounterVec,
}

impl<const N: usize> IntCounterVec<N> {
pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter {
impl<const N: usize> Vector<N, IntCounter> for IntCounterVec<N> {
fn with_label_values(&self, label_values: [&str; N]) -> IntCounter {
self.underlying.with_label_values(&label_values)
}
}
Expand All @@ -55,8 +59,8 @@ pub struct IntGaugeVec<const N: usize> {
underlying: PrometheusIntGaugeVec,
}

impl<const N: usize> IntGaugeVec<N> {
pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge {
impl<const N: usize> Vector<N, IntGauge> for IntGaugeVec<N> {
fn with_label_values(&self, label_values: [&str; N]) -> IntGauge {
self.underlying.with_label_values(&label_values)
}
}
Expand Down Expand Up @@ -465,4 +469,11 @@ pub fn index_label(index_name: &str) -> &str {
}
}

// TODO wants: macro to generate static metric vectors
// could be used to simplify
// - quickwit-ingest/src/ingest_v2/metrics.rs 15 labels
// - quickwit-ingest/src/ingest_v2/metrics.rs 12 labels
// - quickwit-common/src/metrics.rs 10 labels
// and encourage using that pattern in more places

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use prometheus::IntGauge;
use tokio::sync::oneshot;
use tracing::error;

use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard};
use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard, Vector};

/// An executor backed by a thread pool to run CPU-intensive tasks.
///
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/tower/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tower::{Layer, Service};

use crate::metrics::{
new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec,
Vector,
};

pub trait RpcName {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter, new_gauge, new_gauge_vec, IntCounter, IntGauge, IntGaugeVec,
new_counter, new_gauge, new_gauge_vec, IntCounter, IntGauge, IntGaugeVec, Vector,
};

#[derive(Debug, Clone, Copy)]
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::ops::{Deref, DerefMut};
use std::time::Duration;

use fnv::{FnvHashMap, FnvHashSet};
use quickwit_common::metrics::Vector;
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_common::tower::ConstantRate;
use quickwit_ingest::{RateMibPerSec, ShardInfo, ShardInfos};
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::metrics::IntCounter;
use quickwit_common::metrics::{IntCounter, Vector};
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, QueueCapacity,
Supervisable, HEARTBEAT,
};
use quickwit_common::metrics::Vector;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use quickwit_actors::{
SpawnContext, Supervisable, HEARTBEAT,
};
use quickwit_common::io::{IoControls, Limiter};
use quickwit_common::metrics::Vector;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use fail::fail_point;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::metrics::Vector;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::spawn_named_task;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
Expand Down
11 changes: 8 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use bytes::{BufMut, BytesMut};
use bytesize::ByteSize;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use mrecordlog::Record;
use quickwit_common::metrics::MEMORY_METRICS;
use quickwit_common::retry::RetryParams;
Expand Down Expand Up @@ -133,8 +133,13 @@ impl FetchStreamTask {
let mut mrecord_buffer = BytesMut::with_capacity(self.batch_num_bytes);
let mut mrecord_lengths = Vec::new();

let mrecordlog_guard =
with_lock_metrics!(self.mrecordlog.read().await, "fetch", "read");
let Some(mrecordlog_guard) =
with_lock_metrics!(self.mrecordlog.read().map(Some), fetch, read)
else {
// we always get a Some, that layer is just added to satisfly with_lock_metrics
// needs for a Future<Item = Result | Option>
unreachable!();
};

let Ok(mrecords) = mrecordlog_guard
.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl CloseIdleShardsTask {
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write").await
with_lock_metrics!(state.lock_partially(), close_idle_shards, write)
else {
return;
};
Expand Down
27 changes: 11 additions & 16 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures::StreamExt;
use mrecordlog::error::CreateQueueError;
use once_cell::sync::OnceCell;
use quickwit_cluster::Cluster;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::metrics::{GaugeGuard, Vector, MEMORY_METRICS};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Ingester {

let mut per_source_shard_ids: HashMap<(IndexUid, SourceId), Vec<ShardId>> = HashMap::new();

let state_guard = with_lock_metrics!(self.state.lock_fully().await, "reset_shards", "read")
let state_guard = with_lock_metrics!(self.state.lock_fully(), reset_shards, read)
.expect("ingester should be ready");

for queue_id in state_guard.mrecordlog.list_queues() {
Expand Down Expand Up @@ -328,7 +328,7 @@ impl Ingester {
match advise_reset_shards_result {
Ok(Ok(advise_reset_shards_response)) => {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "reset_shards", "write")
with_lock_metrics!(self.state.lock_fully(), reset_shards, write)
.expect("ingester should be ready");

state_guard
Expand Down Expand Up @@ -461,8 +461,7 @@ impl Ingester {
let force_commit = commit_type == CommitTypeV2::Force;
let leader_id: NodeId = persist_request.leader_id.into();

let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "persist", "write")?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), persist, write)?;

if state_guard.status() != IngesterStatus::Ready {
persist_failures.reserve_exact(persist_request.subrequests.len());
Expand Down Expand Up @@ -937,8 +936,9 @@ impl Ingester {
&self,
init_shards_request: InitShardsRequest,
) -> IngestV2Result<InitShardsResponse> {
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "init_shards", "write")?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), init_shards, write)?;
// we do this to allow simultaneous reborrow of multiple fields.
let state_guard = &mut *state_guard;

if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
Expand Down Expand Up @@ -992,8 +992,7 @@ impl Ingester {
self.self_node_id, truncate_shards_request.ingester_id,
)));
}
let mut state_guard =
with_lock_metrics!(self.state.lock_fully().await, "truncate_shards", "write")?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), truncate_shards, write)?;

for subrequest in truncate_shards_request.subrequests {
let queue_id = subrequest.queue_id();
Expand All @@ -1019,8 +1018,7 @@ impl Ingester {
&self,
close_shards_request: CloseShardsRequest,
) -> IngestV2Result<CloseShardsResponse> {
let mut state_guard =
with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write")?;
let mut state_guard = with_lock_metrics!(self.state.lock_partially(), close_shards, write)?;

let mut successes = Vec::with_capacity(close_shards_request.shard_pkeys.len());

Expand Down Expand Up @@ -1174,8 +1172,7 @@ impl IngesterService for Ingester {
})
})
.collect();
let mut state_guard =
with_lock_metrics!(self.state.lock_fully(), "retain_shards", "write").await?;
let mut state_guard = with_lock_metrics!(self.state.lock_fully(), retain_shards, write)?;
let remove_queue_ids: HashSet<QueueId> = state_guard
.shards
.keys()
Expand Down Expand Up @@ -1219,9 +1216,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
warn!("ingester state update failed");
return;
};
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_fully().await, "gc_shards", "write")
else {
let Ok(mut state_guard) = with_lock_metrics!(state.lock_fully(), gc_shards, write) else {
error!("failed to lock the ingester state");
return;
};
Expand Down
Loading
Loading