Skip to content

Commit

Permalink
Guarding the router writer path with a semaphore. (#4263)
Browse files Browse the repository at this point in the history
The idea is to ensure we have at most one write request waiting in the
implicit mrecordlog mutex queue.

In addition, the semaphore limits the number of waiters, so that we can
push the burden of retry to the client.
  • Loading branch information
fulmicoton authored Dec 14, 2023
1 parent 4ab9d46 commit 5676bdc
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 4 deletions.
8 changes: 8 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use super::ingester::PERSIST_REQUEST_TIMEOUT;
use super::routing_table::RoutingTable;
use super::workbench::IngestWorkbench;
use super::IngesterPool;
use crate::semaphore_with_waiter::SemaphoreWithMaxWaiters;
use crate::{with_request_metrics, LeaderId};

/// Duration after which ingest requests time out with [`IngestV2Error::Timeout`].
Expand All @@ -67,6 +68,7 @@ pub struct IngestRouter {
ingester_pool: IngesterPool,
state: Arc<RwLock<RouterState>>,
replication_factor: usize,
write_semaphore: SemaphoreWithMaxWaiters,
}

struct RouterState {
Expand Down Expand Up @@ -101,6 +103,7 @@ impl IngestRouter {
ingester_pool,
state,
replication_factor,
write_semaphore: SemaphoreWithMaxWaiters::new(1, 10),
}
}

Expand Down Expand Up @@ -419,6 +422,11 @@ impl IngestRouterService for IngestRouter {
&mut self,
ingest_request: IngestRequestV2,
) -> IngestV2Result<IngestResponseV2> {
let _permit = self
.write_semaphore
.acquire()
.await
.map_err(|()| IngestV2Error::TooManyRequests)?;
with_request_metrics!(
self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT)
.await,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod metrics;
mod notifications;
mod position;
mod queue;
mod semaphore_with_waiter;

use std::collections::HashMap;
use std::path::{Path, PathBuf};
Expand Down
104 changes: 104 additions & 0 deletions quickwit/quickwit-ingest/src/semaphore_with_waiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;

use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};

/// `SemaphoreWithLimitedWaiters` is an extension of semaphore
/// that limits the number of waiters.
///
/// If more than n-waiters then acquire returns an error.
#[derive(Clone)]
pub struct SemaphoreWithMaxWaiters {
permits: Arc<Semaphore>,
// Implementation detail:
// We do not use the async semaphore mechanics.
// The waiter count could have implemented using a simple atomic counter.
// We use a semaphore to avoid the need to reimplement the looping compare-and-swap dance.
waiter_count: Arc<Semaphore>,
}

impl SemaphoreWithMaxWaiters {
/// Creates a new `SemaphoreWithLimitedWaiters`.
pub fn new(num_permits: usize, max_num_waiters: usize) -> Self {
Self {
permits: Arc::new(Semaphore::new(num_permits)),
waiter_count: Arc::new(Semaphore::new(max_num_waiters)),
}
}

/// Acquires a permit.
pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, ()> {
match Semaphore::try_acquire_owned(self.permits.clone()) {
Ok(permit) => {
return Ok(permit);
}
Err(TryAcquireError::NoPermits) => {}
Err(TryAcquireError::Closed) => {
panic!("semaphore closed (this should never happen)");
}
};
// We bind wait_permit to a variable to extend its lifetime,
// (so we keep holding the associated permit).
let _wait_permit = match Semaphore::try_acquire_owned(self.waiter_count.clone()) {
Ok(wait_permit) => wait_permit,
Err(TryAcquireError::NoPermits) => {
return Err(());
}
Err(TryAcquireError::Closed) => {
// The `permits` semaphore should never be closed because we don't expose the
// `close` API and never call `close` internally.
panic!("semaphore closed");
}
};
let permit = Semaphore::acquire_owned(self.permits.clone())
.await
.expect("semaphore closed "); // (See justification above)
Ok(permit)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

#[tokio::test]
async fn test_semaphore_with_waiters() {
let semaphore_with_waiters = super::SemaphoreWithMaxWaiters::new(1, 1);
let permit = semaphore_with_waiters.acquire().await.unwrap();
let semaphore_with_waiters_clone = semaphore_with_waiters.clone();
let join_handle =
tokio::task::spawn(async move { semaphore_with_waiters_clone.acquire().await });
assert!(!join_handle.is_finished());
tokio::time::sleep(Duration::from_millis(500)).await;
for _ in 0..10 {
tokio::task::yield_now().await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(semaphore_with_waiters.acquire().await.is_err());
assert!(!join_handle.is_finished());
drop(permit);
for _ in 0..10 {
tokio::task::yield_now().await;
}
assert!(join_handle.await.is_ok());
assert!(semaphore_with_waiters.acquire().await.is_ok());
}
}
12 changes: 9 additions & 3 deletions quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum IngestV2Error {
ShardNotFound { shard_id: ShardId },
#[error("request timed out")]
Timeout,
#[error("too many requests")]
TooManyRequests,
// TODO: Merge `Transport` and `IngesterUnavailable` into a single `Unavailable` error.
#[error("transport error: {0}")]
Transport(String),
Expand Down Expand Up @@ -70,6 +72,7 @@ impl From<IngestV2Error> for tonic::Status {
IngestV2Error::Internal(_) => tonic::Code::Internal,
IngestV2Error::ShardNotFound { .. } => tonic::Code::NotFound,
IngestV2Error::Timeout { .. } => tonic::Code::DeadlineExceeded,
IngestV2Error::TooManyRequests => tonic::Code::ResourceExhausted,
IngestV2Error::Transport { .. } => tonic::Code::Unavailable,
};
let message: String = error.to_string();
Expand All @@ -79,10 +82,12 @@ impl From<IngestV2Error> for tonic::Status {

impl From<tonic::Status> for IngestV2Error {
fn from(status: tonic::Status) -> Self {
if status.code() == tonic::Code::Unavailable {
return IngestV2Error::Transport(status.message().to_string());
dbg!(&status);
match status.code() {
tonic::Code::Unavailable => IngestV2Error::Transport(status.message().to_string()),
tonic::Code::ResourceExhausted => IngestV2Error::TooManyRequests,
_ => IngestV2Error::Internal(status.message().to_string()),
}
IngestV2Error::Internal(status.message().to_string())
}
}

Expand All @@ -94,6 +99,7 @@ impl ServiceError for IngestV2Error {
Self::ShardNotFound { .. } => ServiceErrorCode::NotFound,
Self::Timeout { .. } => ServiceErrorCode::Timeout,
Self::Transport { .. } => ServiceErrorCode::Unavailable,
Self::TooManyRequests => ServiceErrorCode::RateLimited,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ fn convert_ingest_response_v2(
IngestFailureReason::Internal => IngestServiceError::Internal("Internal error".to_string()),
IngestFailureReason::NoShardsAvailable => IngestServiceError::Unavailable,
IngestFailureReason::RateLimited => IngestServiceError::RateLimited,
IngestFailureReason::ResourceExhausted => IngestServiceError::Unavailable,
IngestFailureReason::ResourceExhausted => IngestServiceError::RateLimited,
})
}

Expand Down

0 comments on commit 5676bdc

Please sign in to comment.