Skip to content

Commit

Permalink
Extending the control plane model (#3938)
Browse files Browse the repository at this point in the history
* The control plane model now offers a concise view of the Metastore.

The indexing scheduler query the metastore at all.

Other refactoring

We rely on SourceUid more broadly in the code.
The unit test were using IndexUid in a broken way in places. This is now
fixed.

This PR introduces a LogicalIndexTask. The idea is to have it hold all
information necessary for us to do the placement.
Right now this means only the max pipeline per index parameter (allowing
us to remove the ugly lookup in the source config map).

In the next PR we will most likely also break the 1:1 relationship
between LogicalIndexTask and IndexTask.

The idea would be to have a single logical index task, and possibly
break it into several IndexTask.

This PR also cleans up the hack the populates the list of indexing task
with ingest v2 tasks by mutating the list of tasks
  • Loading branch information
fulmicoton authored Oct 13, 2023
1 parent 8fa6e45 commit 6851950
Show file tree
Hide file tree
Showing 10 changed files with 524 additions and 447 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub const CLI_INGEST_SOURCE_ID: &str = "_ingest-cli-source";
pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source";

/// Reserved source ID used for native Quickwit ingest.
/// (this is for ingest v2)
pub const INGEST_SOURCE_ID: &str = "_ingest-source";

pub const RESERVED_SOURCE_IDS: &[&str] =
Expand Down
55 changes: 31 additions & 24 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_actors::{
};
use quickwit_config::{IndexConfig, SourceConfig};
use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_metastore::{IndexMetadata, Metastore};
use quickwit_proto::control_plane::{
ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse,
Expand All @@ -42,8 +42,8 @@ use serde::Serialize;
use tracing::error;

use crate::control_plane_model::{ControlPlaneModel, ControlPlaneModelMetrics};
use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::ingest::IngestController;
use crate::scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::IndexerPool;

/// Interval between two controls (or checks) of the desired plan VS running plan.
Expand Down Expand Up @@ -126,15 +126,14 @@ impl Actor for ControlPlane {
self.model
.load_from_metastore(&*self.metastore, ctx.progress())
.await
.context("Failed to intiialize the model")?;
.context("failed to intialize the model")?;

if let Err(error) = self
.indexing_scheduler
.schedule_indexing_plan_if_needed()
.await
.schedule_indexing_plan_if_needed(&self.model)
{
// TODO inspect error.
error!("Error when scheduling indexing plan: `{}`.", error);
error!("error when scheduling indexing plan: `{}`.", error);
}

ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
Expand All @@ -153,7 +152,7 @@ impl Handler<ControlPlanLoop> for ControlPlane {
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.indexing_scheduler.control_running_plan().await {
if let Err(error) = self.indexing_scheduler.control_running_plan(&self.model) {
error!("error when controlling the running plan: `{}`", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
Expand All @@ -176,7 +175,7 @@ fn convert_metastore_error<T>(
) -> Result<ControlPlaneResult<T>, ActorExitStatus> {
// If true, we know that the transactions has not been recorded in the Metastore.
// If false, we simply are not sure whether the transaction has been recorded or not.
let metastore_failure_is_certain = match &metastore_error {
let is_transaction_certainly_aborted = match &metastore_error {
MetastoreError::AlreadyExists(_)
| MetastoreError::FailedPrecondition { .. }
| MetastoreError::Forbidden { .. }
Expand All @@ -190,15 +189,16 @@ fn convert_metastore_error<T>(
| MetastoreError::Connection { .. }
| MetastoreError::Db { .. } => false,
};
if metastore_failure_is_certain {
// If the metastore failure is certain, this is actually a good thing.
if is_transaction_certainly_aborted {
// If the metastore transaction is certain to have been aborted,
// this is actually a good thing.
// We do not need to restart the control plane.
error!(err=?metastore_error, transaction_outcome="certainly-failed", "metastore-error: The transaction certainly failed. We do not need to restart the control plane.");
error!(err=?metastore_error, transaction_outcome="aborted", "metastore error");
Ok(Err(ControlPlaneError::Metastore(metastore_error)))
} else {
// If the metastore failure is uncertain, we need to restart the control plane
// If the metastore transaction may have been executed, we need to restart the control plane
// so that it gets resynced with the metastore state.
error!(err=?metastore_error, transaction_outcome="uncertain", "metastore-error: Transaction outcome is uncertain. Restarting control plane.");
error!(err=?metastore_error, transaction_outcome="maybe-executed", "metastore error");
Err(ActorExitStatus::from(anyhow::anyhow!(metastore_error)))
}
}
Expand All @@ -222,14 +222,17 @@ impl Handler<CreateIndexRequest> for ControlPlane {
}
};

let index_uid = match self.metastore.create_index(index_config).await {
let index_uid = match self.metastore.create_index(index_config.clone()).await {
Ok(index_uid) => index_uid,
Err(metastore_error) => {
return convert_metastore_error(metastore_error);
}
};

self.model.add_index(index_uid.clone());
let index_metadata: IndexMetadata =
IndexMetadata::new_with_index_uid(index_uid.clone(), index_config);

self.model.add_index(index_metadata);

let response = CreateIndexResponse {
index_uid: index_uid.into(),
Expand Down Expand Up @@ -262,7 +265,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler.on_index_change().await?;
self.indexing_scheduler.on_index_change(&self.model).await?;

Ok(Ok(response))
}
Expand All @@ -287,21 +290,22 @@ impl Handler<AddSourceRequest> for ControlPlane {
return Ok(Err(ControlPlaneError::from(error)));
}
};
let source_id = source_config.source_id.clone();

if let Err(metastore_error) = self
.metastore
.add_source(index_uid.clone(), source_config)
.add_source(index_uid.clone(), source_config.clone())
.await
{
return convert_metastore_error(metastore_error);
};

self.model.add_source(&index_uid, &source_id);
self.model
.add_source(&index_uid, source_config)
.context("failed to add source")?;

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler.on_index_change().await?;
self.indexing_scheduler.on_index_change(&self.model).await?;

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -333,7 +337,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler.on_index_change().await?;
self.indexing_scheduler.on_index_change(&self.model).await?;

let response = EmptyResponse {};
Ok(Ok(response))
Expand Down Expand Up @@ -362,7 +366,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
}

self.model.delete_source(&index_uid, &request.source_id);
self.indexing_scheduler.on_index_change().await?;
self.indexing_scheduler.on_index_change(&self.model).await?;
let response = EmptyResponse {};
Ok(Ok(response))
}
Expand Down Expand Up @@ -542,6 +546,9 @@ mod tests {
let indexer_pool = IndexerPool::default();
let ingester_pool = IngesterPool::default();

let index_metadata = IndexMetadata::for_test("test-index", "ram://test");
let index_uid = index_metadata.index_uid.clone();

let mut mock_metastore = MockMetastore::default();
mock_metastore
.expect_add_source()
Expand All @@ -553,7 +560,7 @@ mod tests {
});
mock_metastore
.expect_list_indexes_metadatas()
.returning(|_| Ok(Vec::new()));
.returning(move |_| Ok(vec![index_metadata.clone()]));
let metastore = Arc::new(mock_metastore);
let replication_factor = 1;

Expand All @@ -568,7 +575,7 @@ mod tests {
);
let source_config = SourceConfig::for_test("test-source", SourceParams::void());
let add_source_request = AddSourceRequest {
index_uid: "test-index:0".to_string(),
index_uid: index_uid.to_string(),
source_config_json: serde_json::to_string(&source_config).unwrap(),
};
control_plane_mailbox
Expand Down
Loading

0 comments on commit 6851950

Please sign in to comment.