diff --git a/quickwit/quickwit-actors/src/actor_handle.rs b/quickwit/quickwit-actors/src/actor_handle.rs
index 3077b3d9ee8..cca9bef65a5 100644
--- a/quickwit/quickwit-actors/src/actor_handle.rs
+++ b/quickwit/quickwit-actors/src/actor_handle.rs
@@ -18,6 +18,7 @@
// along with this program. If not, see .
use std::fmt;
+use std::ops::Deref;
use serde::Serialize;
use tokio::sync::{oneshot, watch};
@@ -193,7 +194,7 @@ impl ActorHandle {
);
}
}
- let state = self.last_observation();
+ let state = self.last_observation().clone();
Observation {
obs_type: ObservationType::PostMortem,
state,
@@ -254,8 +255,8 @@ impl ActorHandle {
(exit_status, observation)
}
- pub fn last_observation(&self) -> A::ObservableState {
- self.last_state.borrow().clone()
+ pub fn last_observation(&self) -> impl Deref + '_ {
+ self.last_state.borrow()
}
async fn wait_for_observable_state_callback(
@@ -271,12 +272,12 @@ impl ActorHandle {
Observation { obs_type, state }
}
Ok(Err(_)) => {
- let state = self.last_observation();
+ let state = self.last_observation().clone();
let obs_type = ObservationType::PostMortem;
Observation { obs_type, state }
}
Err(_) => {
- let state = self.last_observation();
+ let state = self.last_observation().clone();
let obs_type = if self.actor_context.state().is_exit() {
ObservationType::PostMortem
} else {
diff --git a/quickwit/quickwit-actors/src/supervisor.rs b/quickwit/quickwit-actors/src/supervisor.rs
index b038aa5d58f..4eb4d6cc3f2 100644
--- a/quickwit/quickwit-actors/src/supervisor.rs
+++ b/quickwit/quickwit-actors/src/supervisor.rs
@@ -67,7 +67,7 @@ impl Actor for Supervisor {
let state_opt: Option = self
.handle_opt
.as_ref()
- .map(|handle| handle.last_observation());
+ .map(|handle| handle.last_observation().clone());
SupervisorState {
metrics: self.metrics,
state_opt,
diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
index d47f6072806..8b54bdebdbd 100644
--- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
+++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
@@ -250,10 +250,12 @@ impl IndexingScheduler {
}
}
- let mut indexers = self.get_indexers_from_indexer_pool();
+ let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();
let running_indexing_tasks_by_node_id: FnvHashMap> = indexers
.iter()
- .map(|indexer| (indexer.0.clone(), indexer.1.indexing_tasks.clone()))
+ .map(|(indexer_id, indexer_node_info)| {
+ (indexer_id.clone(), indexer_node_info.indexing_tasks.clone())
+ })
.collect();
let indexing_plans_diff = get_indexing_plans_diff(
diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
index ae65a142659..f18be528af5 100644
--- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
@@ -146,7 +146,7 @@ impl Actor for IndexingPipeline {
) -> anyhow::Result<()> {
// We update the observation to ensure our last "black box" observation
// is up to date.
- self.perform_observe(ctx).await;
+ self.perform_observe(ctx);
Ok(())
}
}
@@ -237,7 +237,7 @@ impl IndexingPipeline {
self.statistics.generation
}
- async fn perform_observe(&mut self, ctx: &ActorContext) {
+ fn perform_observe(&mut self, ctx: &ActorContext) {
let Some(handles) = &self.handles_opt else {
return;
};
@@ -494,7 +494,7 @@ impl Handler for IndexingPipeline {
supervise_loop_token: SuperviseLoop,
ctx: &ActorContext,
) -> Result<(), ActorExitStatus> {
- self.perform_observe(ctx).await;
+ self.perform_observe(ctx);
self.perform_health_check(ctx).await?;
ctx.schedule_self_msg(SUPERVISE_INTERVAL, supervise_loop_token)
.await;
@@ -542,15 +542,18 @@ impl Handler for IndexingPipeline {
async fn handle(
&mut self,
- message: AssignShards,
+ assign_shards_message: AssignShards,
_ctx: &ActorContext,
) -> Result<(), ActorExitStatus> {
if let Some(handles) = &mut self.handles_opt {
info!(
- shard_ids=?message.0.shard_ids,
+ shard_ids=?assign_shards_message.0.shard_ids,
"assigning shards to indexing pipeline."
);
- handles.source_mailbox.send_message(message).await?;
+ handles
+ .source_mailbox
+ .send_message(assign_shards_message)
+ .await?;
}
Ok(())
}
diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs
index 19710d24704..57970df36a6 100644
--- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs
@@ -132,7 +132,7 @@ pub struct IndexingService {
}
impl Debug for IndexingService {
- fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ fn fmt(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter
.debug_struct("IndexingService")
.field("cluster_id", &self.cluster.cluster_id())
@@ -424,7 +424,8 @@ impl IndexingService {
merge_pipeline_mailbox_handle.handle.state().is_running()
});
self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len();
- self.update_cluster_running_indexing_tasks().await;
+ self.update_cluster_running_indexing_tasks_in_chitchat()
+ .await;
let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self
.indexing_pipelines
@@ -536,7 +537,8 @@ impl IndexingService {
// Shut down currently running pipelines that are missing in the new plan.
self.shutdown_pipelines(&pipeline_uid_to_remove).await;
- self.update_cluster_running_indexing_tasks().await;
+ self.update_cluster_running_indexing_tasks_in_chitchat()
+ .await;
if !failed_spawning_pipeline_ids.is_empty() {
return Err(IndexingError::SpawnPipelinesError {
@@ -642,8 +644,7 @@ impl IndexingService {
}
}
- /// Updates running indexing tasks in chitchat cluster state.
- async fn update_cluster_running_indexing_tasks(&self) {
+ async fn update_cluster_running_indexing_tasks_in_chitchat(&self) {
let indexing_tasks = self
.indexing_pipelines
.values()
diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs
index 254e62c7c87..749df2f8bd0 100644
--- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs
+++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs
@@ -48,7 +48,7 @@ use tracing::{debug, error, info, warn};
use ulid::Ulid;
use super::{
- Assignment, BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
+ BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};
use crate::actors::DocProcessor;
@@ -376,7 +376,7 @@ impl Source for IngestSource {
async fn assign_shards(
&mut self,
- assignment: Assignment,
+ mut new_assigned_shard_ids: Vec,
doc_processor_mailbox: &Mailbox,
ctx: &SourceContext,
) -> anyhow::Result<()> {
@@ -388,7 +388,6 @@ impl Source for IngestSource {
.sorted()
.collect::>();
- let mut new_assigned_shard_ids: Vec = assignment.shard_ids;
new_assigned_shard_ids.sort();
if current_assigned_shard_ids == new_assigned_shard_ids {
@@ -640,14 +639,10 @@ mod tests {
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);
// In this scenario, the indexer will only be able to acquire shard 1.
- let assignment = Assignment {
- shard_ids: vec![1, 2],
- };
let publish_lock = source.publish_lock.clone();
- // let publish_token = source.publish_token.clone();
source
- .assign_shards(assignment, &doc_processor_mailbox, &ctx)
+ .assign_shards(vec![1, 2], &doc_processor_mailbox, &ctx)
.await
.unwrap();
@@ -784,11 +779,8 @@ mod tests {
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);
- // In this scenario, the indexer will only be able to acquire shard 1.
- let assignment = Assignment { shard_ids: vec![1] };
-
source
- .assign_shards(assignment, &doc_processor_mailbox, &ctx)
+ .assign_shards(vec![1], &doc_processor_mailbox, &ctx)
.await
.unwrap();
@@ -940,18 +932,14 @@ mod tests {
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);
- // In this scenario, the indexer will only be able to acquire shard 1.
- let assignment = Assignment {
- shard_ids: vec![1, 2],
- };
-
assert_eq!(
shard_positions_update_rx.try_recv().unwrap_err(),
TryRecvError::Empty
);
+ // In this scenario, the indexer will only be able to acquire shard 1.
source
- .assign_shards(assignment, &doc_processor_mailbox, &ctx)
+ .assign_shards(vec![1, 2], &doc_processor_mailbox, &ctx)
.await
.unwrap();
diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs
index 55217865e53..addda75a6c2 100644
--- a/quickwit/quickwit-indexing/src/source/mod.rs
+++ b/quickwit/quickwit-indexing/src/source/mod.rs
@@ -238,7 +238,7 @@ pub trait Source: Send + 'static {
/// plane.
async fn assign_shards(
&mut self,
- _assignement: Assignment,
+ _shard_ids: Vec,
_doc_processor_mailbox: &Mailbox,
_ctx: &SourceContext,
) -> anyhow::Result<()> {
@@ -367,12 +367,12 @@ impl Handler for SourceActor {
async fn handle(
&mut self,
- message: AssignShards,
+ assign_shards_message: AssignShards,
ctx: &SourceContext,
) -> Result<(), ActorExitStatus> {
- let AssignShards(assignment) = message;
+ let AssignShards(Assignment { shard_ids }) = assign_shards_message;
self.source
- .assign_shards(assignment, &self.doc_processor_mailbox, ctx)
+ .assign_shards(shard_ids, &self.doc_processor_mailbox, ctx)
.await?;
Ok(())
}
diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
index d64db0b4ba5..4b9847a8335 100644
--- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
+++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
@@ -270,12 +270,12 @@ impl Handler for DeleteTaskPipeline {
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.state = DeleteTaskPipelineState {
- delete_task_planner: handles.delete_task_planner.last_observation(),
- downloader: handles.downloader.last_observation(),
- delete_task_executor: handles.delete_task_executor.last_observation(),
- packager: handles.packager.last_observation(),
- uploader: handles.uploader.last_observation(),
- publisher: handles.publisher.last_observation(),
+ delete_task_planner: handles.delete_task_planner.last_observation().clone(),
+ downloader: handles.downloader.last_observation().clone(),
+ delete_task_executor: handles.delete_task_executor.last_observation().clone(),
+ packager: handles.packager.last_observation().clone(),
+ uploader: handles.uploader.last_observation().clone(),
+ publisher: handles.publisher.last_observation().clone(),
}
}
ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe)
diff --git a/quickwit/quickwit-metastore/src/checkpoint.rs b/quickwit/quickwit-metastore/src/checkpoint.rs
index 594965df01b..74fbed6bd9b 100644
--- a/quickwit/quickwit-metastore/src/checkpoint.rs
+++ b/quickwit/quickwit-metastore/src/checkpoint.rs
@@ -28,8 +28,9 @@ use std::sync::Arc;
use quickwit_proto::types::{Position, SourceId};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};
+/// Updates running indexing tasks in chitchat cluster state.
use thiserror::Error;
-use tracing::{info, warn};
+use tracing::{debug, warn};
/// A `PartitionId` uniquely identifies a partition for a given source.
#[derive(Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
@@ -327,7 +328,7 @@ impl SourceCheckpoint {
delta: SourceCheckpointDelta,
) -> Result<(), IncompatibleCheckpointDelta> {
self.check_compatibility(&delta)?;
- info!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");
+ debug!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");
for (partition_id, partition_position) in delta.per_partition {
self.per_partition