Skip to content

Commit

Permalink
Minor refactoring (#4233)
Browse files Browse the repository at this point in the history
- last_observation does not necessarily Clone
- assign shards unwraps message early
- apply_delta not info etc.
  • Loading branch information
fulmicoton authored Dec 4, 2023
1 parent c97919c commit b31bf72
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 49 deletions.
11 changes: 6 additions & 5 deletions quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::ops::Deref;

use serde::Serialize;
use tokio::sync::{oneshot, watch};
Expand Down Expand Up @@ -193,7 +194,7 @@ impl<A: Actor> ActorHandle<A> {
);
}
}
let state = self.last_observation();
let state = self.last_observation().clone();
Observation {
obs_type: ObservationType::PostMortem,
state,
Expand Down Expand Up @@ -254,8 +255,8 @@ impl<A: Actor> ActorHandle<A> {
(exit_status, observation)
}

pub fn last_observation(&self) -> A::ObservableState {
self.last_state.borrow().clone()
pub fn last_observation(&self) -> impl Deref<Target = A::ObservableState> + '_ {
self.last_state.borrow()
}

async fn wait_for_observable_state_callback(
Expand All @@ -271,12 +272,12 @@ impl<A: Actor> ActorHandle<A> {
Observation { obs_type, state }
}
Ok(Err(_)) => {
let state = self.last_observation();
let state = self.last_observation().clone();
let obs_type = ObservationType::PostMortem;
Observation { obs_type, state }
}
Err(_) => {
let state = self.last_observation();
let state = self.last_observation().clone();
let obs_type = if self.actor_context.state().is_exit() {
ObservationType::PostMortem
} else {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<A: Actor> Actor for Supervisor<A> {
let state_opt: Option<A::ObservableState> = self
.handle_opt
.as_ref()
.map(|handle| handle.last_observation());
.map(|handle| handle.last_observation().clone());
SupervisorState {
metrics: self.metrics,
state_opt,
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,12 @@ impl IndexingScheduler {
}
}

let mut indexers = self.get_indexers_from_indexer_pool();
let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();
let running_indexing_tasks_by_node_id: FnvHashMap<String, Vec<IndexingTask>> = indexers
.iter()
.map(|indexer| (indexer.0.clone(), indexer.1.indexing_tasks.clone()))
.map(|(indexer_id, indexer_node_info)| {
(indexer_id.clone(), indexer_node_info.indexing_tasks.clone())
})
.collect();

let indexing_plans_diff = get_indexing_plans_diff(
Expand Down
15 changes: 9 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Actor for IndexingPipeline {
) -> anyhow::Result<()> {
// We update the observation to ensure our last "black box" observation
// is up to date.
self.perform_observe(ctx).await;
self.perform_observe(ctx);
Ok(())
}
}
Expand Down Expand Up @@ -237,7 +237,7 @@ impl IndexingPipeline {
self.statistics.generation
}

async fn perform_observe(&mut self, ctx: &ActorContext<Self>) {
fn perform_observe(&mut self, ctx: &ActorContext<Self>) {
let Some(handles) = &self.handles_opt else {
return;
};
Expand Down Expand Up @@ -494,7 +494,7 @@ impl Handler<SuperviseLoop> for IndexingPipeline {
supervise_loop_token: SuperviseLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.perform_observe(ctx).await;
self.perform_observe(ctx);
self.perform_health_check(ctx).await?;
ctx.schedule_self_msg(SUPERVISE_INTERVAL, supervise_loop_token)
.await;
Expand Down Expand Up @@ -542,15 +542,18 @@ impl Handler<AssignShards> for IndexingPipeline {

async fn handle(
&mut self,
message: AssignShards,
assign_shards_message: AssignShards,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Some(handles) = &mut self.handles_opt {
info!(
shard_ids=?message.0.shard_ids,
shard_ids=?assign_shards_message.0.shard_ids,
"assigning shards to indexing pipeline."
);
handles.source_mailbox.send_message(message).await?;
handles
.source_mailbox
.send_message(assign_shards_message)
.await?;
}
Ok(())
}
Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub struct IndexingService {
}

impl Debug for IndexingService {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter
.debug_struct("IndexingService")
.field("cluster_id", &self.cluster.cluster_id())
Expand Down Expand Up @@ -424,7 +424,8 @@ impl IndexingService {
merge_pipeline_mailbox_handle.handle.state().is_running()
});
self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len();
self.update_cluster_running_indexing_tasks().await;
self.update_cluster_running_indexing_tasks_in_chitchat()
.await;

let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self
.indexing_pipelines
Expand Down Expand Up @@ -536,7 +537,8 @@ impl IndexingService {
// Shut down currently running pipelines that are missing in the new plan.
self.shutdown_pipelines(&pipeline_uid_to_remove).await;

self.update_cluster_running_indexing_tasks().await;
self.update_cluster_running_indexing_tasks_in_chitchat()
.await;

if !failed_spawning_pipeline_ids.is_empty() {
return Err(IndexingError::SpawnPipelinesError {
Expand Down Expand Up @@ -642,8 +644,7 @@ impl IndexingService {
}
}

/// Updates running indexing tasks in chitchat cluster state.
async fn update_cluster_running_indexing_tasks(&self) {
async fn update_cluster_running_indexing_tasks_in_chitchat(&self) {
let indexing_tasks = self
.indexing_pipelines
.values()
Expand Down
24 changes: 6 additions & 18 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tracing::{debug, error, info, warn};
use ulid::Ulid;

use super::{
Assignment, BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};
use crate::actors::DocProcessor;
Expand Down Expand Up @@ -376,7 +376,7 @@ impl Source for IngestSource {

async fn assign_shards(
&mut self,
assignment: Assignment,
mut new_assigned_shard_ids: Vec<ShardId>,
doc_processor_mailbox: &Mailbox<DocProcessor>,
ctx: &SourceContext,
) -> anyhow::Result<()> {
Expand All @@ -388,7 +388,6 @@ impl Source for IngestSource {
.sorted()
.collect::<Vec<ShardId>>();

let mut new_assigned_shard_ids: Vec<ShardId> = assignment.shard_ids;
new_assigned_shard_ids.sort();

if current_assigned_shard_ids == new_assigned_shard_ids {
Expand Down Expand Up @@ -640,14 +639,10 @@ mod tests {
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the indexer will only be able to acquire shard 1.
let assignment = Assignment {
shard_ids: vec![1, 2],
};
let publish_lock = source.publish_lock.clone();
// let publish_token = source.publish_token.clone();

source
.assign_shards(assignment, &doc_processor_mailbox, &ctx)
.assign_shards(vec![1, 2], &doc_processor_mailbox, &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -784,11 +779,8 @@ mod tests {
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the indexer will only be able to acquire shard 1.
let assignment = Assignment { shard_ids: vec![1] };

source
.assign_shards(assignment, &doc_processor_mailbox, &ctx)
.assign_shards(vec![1], &doc_processor_mailbox, &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -940,18 +932,14 @@ mod tests {
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the indexer will only be able to acquire shard 1.
let assignment = Assignment {
shard_ids: vec![1, 2],
};

assert_eq!(
shard_positions_update_rx.try_recv().unwrap_err(),
TryRecvError::Empty
);

// In this scenario, the indexer will only be able to acquire shard 1.
source
.assign_shards(assignment, &doc_processor_mailbox, &ctx)
.assign_shards(vec![1, 2], &doc_processor_mailbox, &ctx)
.await
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub trait Source: Send + 'static {
/// plane.
async fn assign_shards(
&mut self,
_assignement: Assignment,
_shard_ids: Vec<ShardId>,
_doc_processor_mailbox: &Mailbox<DocProcessor>,
_ctx: &SourceContext,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -367,12 +367,12 @@ impl Handler<AssignShards> for SourceActor {

async fn handle(
&mut self,
message: AssignShards,
assign_shards_message: AssignShards,
ctx: &SourceContext,
) -> Result<(), ActorExitStatus> {
let AssignShards(assignment) = message;
let AssignShards(Assignment { shard_ids }) = assign_shards_message;
self.source
.assign_shards(assignment, &self.doc_processor_mailbox, ctx)
.assign_shards(shard_ids, &self.doc_processor_mailbox, ctx)
.await?;
Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ impl Handler<Observe> for DeleteTaskPipeline {
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.state = DeleteTaskPipelineState {
delete_task_planner: handles.delete_task_planner.last_observation(),
downloader: handles.downloader.last_observation(),
delete_task_executor: handles.delete_task_executor.last_observation(),
packager: handles.packager.last_observation(),
uploader: handles.uploader.last_observation(),
publisher: handles.publisher.last_observation(),
delete_task_planner: handles.delete_task_planner.last_observation().clone(),
downloader: handles.downloader.last_observation().clone(),
delete_task_executor: handles.delete_task_executor.last_observation().clone(),
packager: handles.packager.last_observation().clone(),
uploader: handles.uploader.last_observation().clone(),
publisher: handles.publisher.last_observation().clone(),
}
}
ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe)
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-metastore/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use std::sync::Arc;
use quickwit_proto::types::{Position, SourceId};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};
/// Updates running indexing tasks in chitchat cluster state.
use thiserror::Error;
use tracing::{info, warn};
use tracing::{debug, warn};

/// A `PartitionId` uniquely identifies a partition for a given source.
#[derive(Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
Expand Down Expand Up @@ -327,7 +328,7 @@ impl SourceCheckpoint {
delta: SourceCheckpointDelta,
) -> Result<(), IncompatibleCheckpointDelta> {
self.check_compatibility(&delta)?;
info!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");
debug!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");

for (partition_id, partition_position) in delta.per_partition {
self.per_partition
Expand Down

0 comments on commit b31bf72

Please sign in to comment.