Skip to content

Commit

Permalink
Record object storage request latencies
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Oct 23, 2024
1 parent f43e374 commit d628047
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 39 deletions.
19 changes: 0 additions & 19 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,6 @@ pub fn new_counter(
counter
}

pub fn new_counter_with_labels(
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> IntCounter {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
counter
}

pub fn new_counter_vec<const N: usize>(
name: &str,
help: &str,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Default for SearchMetrics {
),
root_search_request_duration_seconds: new_histogram_vec(
"root_search_request_duration_seconds",
"Duration of root search gRPC request in seconds.",
"Duration of root search gRPC requests in seconds.",
"search",
&[("kind", "server")],
["status"],
Expand All @@ -93,7 +93,7 @@ impl Default for SearchMetrics {
),
leaf_search_request_duration_seconds: new_histogram_vec(
"leaf_search_request_duration_seconds",
"Duration of leaf search gRPC request in seconds.",
"Duration of leaf search gRPC requests in seconds.",
"search",
&[("kind", "server")],
["status"],
Expand Down
59 changes: 54 additions & 5 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntCounterVec,
IntGauge,
exponential_buckets, new_counter, new_counter_vec, new_gauge, new_histogram_vec, Histogram,
IntCounter, IntCounterVec, IntGauge,
};

/// Counters associated to storage operations.
Expand All @@ -41,6 +41,15 @@ pub struct StorageMetrics {
pub object_storage_put_parts: IntCounter,
pub object_storage_download_num_bytes: IntCounter,
pub object_storage_upload_num_bytes: IntCounter,

pub object_storage_delete_requests_total: IntCounter,
pub object_storage_bulk_delete_requests_total: IntCounter,

pub object_storage_get_request_duration: Histogram,
pub object_storage_put_request_duration: Histogram,
pub object_storage_upload_part_request_duration: Histogram,
pub object_storage_delete_request_duration: Histogram,
pub object_storage_bulk_delete_request_duration: Histogram,
}

impl Default for StorageMetrics {
Expand All @@ -60,6 +69,38 @@ impl Default for StorageMetrics {
];
let get_slice_timeout_all_timeouts =
get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]);

let object_storage_requests_total = new_counter_vec(
"object_storage_requests_total",
"Total number of object storage requests performed.",
"storage",
&[],
["action"],
);
let object_storage_delete_requests_total =
object_storage_requests_total.with_label_values(["delete_object"]);
let object_storage_bulk_delete_requests_total =
object_storage_requests_total.with_label_values(["delete_objects"]);

let object_storage_request_duration = new_histogram_vec(
"object_storage_request_duration_seconds",
"Duration of object storage requests in seconds.",
"storage",
&[],
["action"],
exponential_buckets(0.001, 2.0, 15).unwrap(),
);
let object_storage_get_request_duration =
object_storage_request_duration.with_label_values(["get_object"]);
let object_storage_put_request_duration =
object_storage_request_duration.with_label_values(["put_object"]);
let object_storage_upload_part_request_duration =
object_storage_request_duration.with_label_values(["upload_part"]);
let object_storage_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_object"]);
let object_storage_bulk_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_objects"]);

StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
fd_cache_metrics: CacheMetrics::for_component("fd"),
Expand Down Expand Up @@ -107,6 +148,14 @@ impl Default for StorageMetrics {
"storage",
&[],
),
object_storage_delete_requests_total,
object_storage_bulk_delete_requests_total,

object_storage_get_request_duration,
object_storage_put_request_duration,
object_storage_upload_part_request_duration,
object_storage_delete_request_duration,
object_storage_bulk_delete_request_duration,
}
}
}
Expand Down Expand Up @@ -139,19 +188,19 @@ impl CacheMetrics {
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
hits_num_items: new_counter_with_labels(
hits_num_items: new_counter(
"cache_hits_total",
"Number of cache hits by component",
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
hits_num_bytes: new_counter_with_labels(
hits_num_bytes: new_counter(
"cache_hits_bytes",
"Number of cache hits in bytes by component",
CACHE_METRICS_NAMESPACE,
&[("component_name", component_name)],
),
misses_num_items: new_counter_with_labels(
misses_num_items: new_counter(
"cache_misses_total",
"Number of cache misses by component",
CACHE_METRICS_NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ impl S3CompatibleObjectStorage {
.byte_stream()
.await
.map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?;

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(len);
let _timer = crate::STORAGE_METRICS
.object_storage_put_request_duration
.start_timer();

self.s3_client
.put_object()
.bucket(bucket)
Expand All @@ -304,11 +313,6 @@ impl S3CompatibleObjectStorage {
Retry::Permanent(StorageError::from(sdk_error))
}
})?;

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(len);
Ok(())
}

Expand Down Expand Up @@ -423,10 +427,14 @@ impl S3CompatibleObjectStorage {
.map_err(StorageError::from)
.map_err(Retry::Permanent)?;
let md5 = BASE64_STANDARD.encode(part.md5.0);

crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(part.len());
let _timer = crate::STORAGE_METRICS
.object_storage_upload_part_request_duration
.start_timer();

let upload_part_output = self
.s3_client
Expand All @@ -449,7 +457,7 @@ impl S3CompatibleObjectStorage {
})?;

let completed_part = CompletedPart::builder()
.set_e_tag(upload_part_output.e_tag().map(|tag| tag.to_string()))
.set_e_tag(upload_part_output.e_tag)
.part_number(part.part_number as i32)
.build();
Ok(completed_part)
Expand Down Expand Up @@ -538,14 +546,18 @@ impl S3CompatibleObjectStorage {
Ok(())
}

async fn create_get_object_request(
async fn get_object(
&self,
path: &Path,
range_opt: Option<Range<usize>>,
) -> Result<GetObjectOutput, SdkError<GetObjectError>> {
let key = self.key(path);
let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1));

crate::STORAGE_METRICS.object_storage_get_total.inc();
let _timer = crate::STORAGE_METRICS
.object_storage_get_request_duration
.start_timer();

let get_object_output = self
.s3_client
Expand All @@ -565,7 +577,7 @@ impl S3CompatibleObjectStorage {
) -> StorageResult<Vec<u8>> {
let cap = range_opt.as_ref().map(Range::len).unwrap_or(0);
let get_object_output = aws_retry(&self.retry_params, || {
self.create_get_object_request(path, range_opt.clone())
self.get_object(path, range_opt.clone())
})
.await?;
let mut buf: Vec<u8> = Vec::with_capacity(cap);
Expand Down Expand Up @@ -638,6 +650,12 @@ impl S3CompatibleObjectStorage {
for (path_chunk, delete) in &mut delete_requests_it {
let delete_objects_res: StorageResult<DeleteObjectsOutput> =
aws_retry(&self.retry_params, || async {
crate::STORAGE_METRICS
.object_storage_bulk_delete_requests_total
.inc();
let _timer = crate::STORAGE_METRICS
.object_storage_bulk_delete_request_duration
.start_timer();
self.s3_client
.delete_objects()
.bucket(self.bucket.clone())
Expand Down Expand Up @@ -752,10 +770,8 @@ impl Storage for S3CompatibleObjectStorage {

async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> {
let _permit = REQUEST_SEMAPHORE.acquire().await;
let get_object_output = aws_retry(&self.retry_params, || {
self.create_get_object_request(path, None)
})
.await?;
let get_object_output =
aws_retry(&self.retry_params, || self.get_object(path, None)).await?;
let mut body_read = BufReader::new(get_object_output.body.into_async_read());
let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?;
STORAGE_METRICS
Expand All @@ -770,6 +786,12 @@ impl Storage for S3CompatibleObjectStorage {
let bucket = self.bucket.clone();
let key = self.key(path);
let delete_res = aws_retry(&self.retry_params, || async {
crate::STORAGE_METRICS
.object_storage_delete_requests_total
.inc();
let _timer = crate::STORAGE_METRICS
.object_storage_delete_request_duration
.start_timer();
self.s3_client
.delete_object()
.bucket(&bucket)
Expand Down Expand Up @@ -818,7 +840,7 @@ impl Storage for S3CompatibleObjectStorage {
) -> crate::StorageResult<Box<dyn AsyncRead + Send + Unpin>> {
let permit = REQUEST_SEMAPHORE.acquire().await;
let get_object_output = aws_retry(&self.retry_params, || {
self.create_get_object_request(path, Some(range.clone()))
self.get_object(path, Some(range.clone()))
})
.await?;
Ok(Box::new(S3AsyncRead {
Expand Down

0 comments on commit d628047

Please sign in to comment.