Skip to content

Commit

Permalink
Chunk receive future to avoid hanging actor
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 4, 2024
1 parent fb65179 commit 2b31148
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 59 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,9 @@ pub enum FileSourceMessageType {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct FileSourceSqs {
pub queue_url: String,
/// Polling wait time in seconds for receiving messages. Leave default value.
#[serde(default = "default_wait_time_seconds")]
pub wait_time_seconds: u8,
pub message_type: FileSourceMessageType,
}

fn default_wait_time_seconds() -> u8 {
20
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct FileSourceUri {
#[schema(value_type = String)]
Expand Down Expand Up @@ -337,9 +330,6 @@ pub struct PubSubSourceParams {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct SqsSourceParams {
pub queue_url: String,
/// Polling wait time in seconds for receiving messages. Leave default value.
#[serde(default = "default_wait_time_seconds")]
pub wait_time_seconds: u8,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
Expand Down Expand Up @@ -864,7 +854,6 @@ mod tests {
FileSourceParams::Sqs(FileSourceSqs {
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
.to_string(),
wait_time_seconds: default_wait_time_seconds(),
message_type: FileSourceMessageType::S3Notification,
})
);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Handler<SplitsUpdate> for Publisher {
);
return Ok(());
}
info!(new_splits=?split_ids, checkpoint_delta=?checkpoint_delta_opt, "publish-new-splits");
info!("publish-new-splits");
if let Some(source_mailbox) = self.source_mailbox_opt.as_ref() {
if let Some(checkpoint) = checkpoint_delta_opt {
// We voluntarily do not log anything here.
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::types::SourceId;

use super::doc_file_reader::{BatchReader, ObjectUriBatchReader, StdinBatchReader};
#[cfg(feature = "sqs")]
use super::queue_sources::QueueCoordinator;
use super::queue_sources::coordinator::QueueCoordinator;
use crate::actors::DocProcessor;
use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory};

Expand Down Expand Up @@ -443,8 +443,6 @@ mod localstack_tests {
// source setup
let source_params = FileSourceParams::Sqs(FileSourceSqs {
queue_url,
// decrease poll duration to avoid hanging actor shutdown
wait_time_seconds: 1,
message_type: FileSourceMessageType::RawUri,
});
let source_config = SourceConfig::for_test(
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ use once_cell::sync::OnceCell;
#[cfg(feature = "pulsar")]
pub use pulsar_source::{PulsarSource, PulsarSourceFactory};
#[cfg(feature = "sqs")]
pub use queue_sources::sqs_queue;
#[cfg(feature = "sqs")]
pub use queue_sources::sqs_source::{SqsSource, SqsSourceFactory};
pub use queue_sources::{
sqs_queue,
sqs_source::{SqsSource, SqsSourceFactory},
};
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::pubsub::EventBroker;
Expand Down
28 changes: 18 additions & 10 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ use super::local_state::QueueLocalState;
use super::message::{MessageType, ReadyMessage};
use super::shared_state::{checkpoint_messages, QueueSharedState};
use super::visibility::{spawn_visibility_task, VisibilitySettings};
use super::Queue;
use super::{Queue, QueueReceiver};
use crate::actors::DocProcessor;
use crate::models::{NewPublishLock, NewPublishToken, PublishLock};
use crate::source::{SourceContext, SourceRuntime};

/// Maximum duration that the `emit_batches()` callback can wait for
/// `queue.receive()` calls. If too small, the actor loop will spin
/// un-necessarily. If too large, the actor loop will be slow to react to new
/// messages (or shutdown).
pub const RECEIVE_POLL_TIMEOUT: Duration = Duration::from_millis(500);

#[derive(Default, Serialize)]
pub struct QueueCoordinatorObservableState {
/// Number of bytes processed by the source.
Expand All @@ -60,6 +66,7 @@ pub struct QueueCoordinator {
pipeline_id: IndexingPipelineId,
source_type: SourceType,
queue: Arc<dyn Queue>,
queue_receiver: QueueReceiver,
observable_state: QueueCoordinatorObservableState,
message_type: MessageType,
publish_lock: PublishLock,
Expand Down Expand Up @@ -95,6 +102,7 @@ impl QueueCoordinator {
pipeline_id: source_runtime.pipeline_id,
source_type: source_runtime.source_config.source_type(),
storage_resolver: source_runtime.storage_resolver,
queue_receiver: QueueReceiver::new(queue.clone(), RECEIVE_POLL_TIMEOUT),
queue,
observable_state: QueueCoordinatorObservableState::default(),
message_type,
Expand All @@ -112,7 +120,7 @@ impl QueueCoordinator {
source_runtime: SourceRuntime,
) -> anyhow::Result<Self> {
use super::sqs_queue::SqsQueue;
let queue = SqsQueue::try_new(config.queue_url, config.wait_time_seconds).await?;
let queue = SqsQueue::try_new(config.queue_url).await?;
let message_type = match config.message_type {
FileSourceMessageType::S3Notification => MessageType::S3Notification,
FileSourceMessageType::RawUri => MessageType::RawUri,
Expand Down Expand Up @@ -142,14 +150,10 @@ impl QueueCoordinator {

/// Polls messages from the queue and prepares them for processing
async fn poll_messages(&mut self, ctx: &SourceContext) -> Result<(), ActorExitStatus> {
// receive() typically uses long polling so it can be long to respond
// TODO increase `max_messages` when previous messages were small
// TODO avoid blocking actor teardown with the long polling future
let raw_messages = ctx
.protect_future(
self.queue
.receive(1, self.visible_settings.deadline_for_receive),
)
let raw_messages = self
.queue_receiver
.receive(1, self.visible_settings.deadline_for_receive)
.await?;

if raw_messages.is_empty() {
Expand Down Expand Up @@ -265,14 +269,17 @@ impl QueueCoordinator {
let committed_partition_ids = checkpoint
.iter()
.filter(|(_, pos)| pos.is_eof())
.map(|(pid, _)| pid);
.map(|(pid, _)| pid)
.collect::<Vec<_>>();
tracing::info!(pids=?committed_partition_ids, "coordinator::suggest_truncate()");
let mut completed = Vec::new();
for partition_id in committed_partition_ids {
let ack_id_opt = self.local_state.mark_completed(partition_id);
if let Some(ack_id) = ack_id_opt {
completed.push(ack_id);
}
}
tracing::info!(ackids=?completed, "coordinator::suggest_truncate()");
self.queue.acknowledge(&completed).await
}

Expand Down Expand Up @@ -316,6 +323,7 @@ mod tests {
pipeline_id,
observable_state: QueueCoordinatorObservableState::default(),
publish_lock: PublishLock::default(),
queue_receiver: QueueReceiver::new(queue.clone(), Duration::from_millis(50)),
queue,
message_type: MessageType::RawUri,
source_type: SourceType::Unspecified,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl fmt::Debug for InnerState {
#[derive(Clone, Debug)]
pub struct MemoryQueueForTests {
inner_state: Arc<Mutex<InnerState>>,
receive_sleep: Duration,
}

impl MemoryQueueForTests {
Expand Down Expand Up @@ -79,6 +80,7 @@ impl MemoryQueueForTests {
});
MemoryQueueForTests {
inner_state: Arc::new(Mutex::new(InnerState::default())),
receive_sleep: Duration::from_millis(50),
}
}

Expand Down Expand Up @@ -108,7 +110,7 @@ impl MemoryQueueForTests {
#[async_trait]
impl Queue for MemoryQueueForTests {
async fn receive(
&self,
self: Arc<Self>,
max_messages: usize,
suggested_deadline: Duration,
) -> anyhow::Result<Vec<RawMessage>> {
Expand All @@ -135,7 +137,7 @@ impl Queue for MemoryQueueForTests {
}
}
// `sleep` to avoid using all the CPU when called in a loop
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(self.receive_sleep).await;

Ok(vec![])
}
Expand Down Expand Up @@ -170,21 +172,22 @@ impl Queue for MemoryQueueForTests {
mod tests {
use super::*;

fn prefilled_queue(nb_message: usize) -> MemoryQueueForTests {
fn prefilled_queue(nb_message: usize) -> Arc<MemoryQueueForTests> {
let memory_queue = MemoryQueueForTests::new();
for i in 0..nb_message {
let payload = format!("Test message {}", i);
let ack_id = i.to_string();
memory_queue.send_message(payload.clone(), &ack_id);
}
memory_queue
Arc::new(memory_queue)
}

#[tokio::test]
async fn test_receive_1_by_1() {
let memory_queue = prefilled_queue(2);
for i in 0..2 {
let messages = memory_queue
.clone()
.receive(1, Duration::from_secs(5))
.await
.unwrap();
Expand Down
Loading

0 comments on commit 2b31148

Please sign in to comment.