Skip to content

Commit

Permalink
Issue/4244 delete index delete shard (#4270)
Browse files Browse the repository at this point in the history
* Remove shards from  ingesters on source/index deletion.

The PR works by adding a RPC on ingester, to "retain shards":
the control plane can send a list of shards that it expects to see
on an indexer and the ingester deletes the shards that are not in that
list.

The control plane performs this RPC when a source is deleted,
or what an index is deleted, or upon restart.

This PR is incomplete and needs some work as described in #4274.
The "retain" mechanics was preferred over sending the list of shard
to remove precisely in prevision of #4274.

Also
- Bugfix in num shards metrics (we were returning the number of source)
- Adding a ingester -> shard table in the control plane model

Closes #4244.

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Dec 15, 2023
1 parent 7546a48 commit aab1e5e
Show file tree
Hide file tree
Showing 10 changed files with 1,012 additions and 81 deletions.
238 changes: 233 additions & 5 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -134,8 +135,13 @@ impl Actor for ControlPlane {
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);

self.ingest_controller
.sync_with_all_ingesters(&self.model)
.await;

ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;

Ok(())
}
}
Expand Down Expand Up @@ -184,7 +190,10 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
shard_positions_update: ShardPositionsUpdate,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let Some(shard_entries) = self.model.list_shards(&shard_positions_update.source_uid) else {
let Some(shard_entries) = self
.model
.list_shards_for_source(&shard_positions_update.source_uid)
else {
// The source no longer exists.
return Ok(());
};
Expand Down Expand Up @@ -332,15 +341,24 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
return convert_metastore_error(metastore_error);
};

self.model.delete_index(&index_uid);
let ingester_needing_resync: BTreeSet<NodeId> = self
.model
.list_shards_for_index(&index_uid)
.flat_map(|shard_entry| shard_entry.ingester_nodes())
.collect();

let response = EmptyResponse {};
self.ingest_controller
.sync_with_ingesters(&ingester_needing_resync, &self.model)
.await;

self.model.delete_index(&index_uid);

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);

let response = EmptyResponse {};
Ok(Ok(response))
}
}
Expand Down Expand Up @@ -422,16 +440,42 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
&mut self,
request: DeleteSourceRequest,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_id = request.source_id.clone();

let source_uid = SourceUid {
index_uid: index_uid.clone(),
source_id: source_id.clone(),
};

if let Err(metastore_error) = self.metastore.delete_source(request).await {
// TODO If the metastore fails returns an error but somehow succeed deleting the source,
// the control plane will restart and the shards will be remaining on the ingesters.
//
// This is tracked in #4274
return convert_metastore_error(metastore_error);
};
self.model.delete_source(&index_uid, &source_id);

let ingester_needing_resync: BTreeSet<NodeId> =
if let Some(shards) = self.model.list_shards_for_source(&source_uid) {
shards
.flat_map(|shard_entry| shard_entry.ingester_nodes())
.collect()
} else {
BTreeSet::new()
};

self.ingest_controller
.sync_with_ingesters(&ingester_needing_resync, &self.model)
.await;

self.model.delete_source(&source_uid);

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
let response = EmptyResponse {};

Ok(Ok(response))
}
}
Expand Down Expand Up @@ -531,6 +575,7 @@ mod tests {
};
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, CpuCapacity, IndexingServiceClient};
use quickwit_proto::ingest::ingester::{IngesterServiceClient, RetainShardsResponse};
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore::{
DeleteShardsResponse, EntityKind, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
Expand Down Expand Up @@ -1246,4 +1291,187 @@ mod tests {

universe.assert_quit().await;
}

#[tokio::test]
async fn test_delete_index() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::default();
let node_id = NodeId::new("control-plane-node".to_string());
let indexer_pool = IndexerPool::default();

let ingester_pool = IngesterPool::default();
let mut ingester_mock = IngesterServiceClient::mock();
ingester_mock
.expect_retain_shards()
.times(2)
.returning(|mut request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert_eq!(&retain_shards_for_source.shard_ids, &[15]);
Ok(RetainShardsResponse {})
});
ingester_pool.insert("node1".into(), ingester_mock.into());

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let index_uid_clone = index_0.index_uid.clone();

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore.expect_delete_index().return_once(
move |delete_index_request: DeleteIndexRequest| {
assert_eq!(delete_index_request.index_uid, index_uid_clone.to_string());
Ok(EmptyResponse {})
},
);

let mut source = SourceConfig::ingest_v2_default();
source.enabled = true;
index_0.add_source(source.clone()).unwrap();

let index_0_clone = index_0.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
index_0_clone.clone()
])
.unwrap())
},
);

let index_uid_clone = index_0.index_uid.clone();
mock_metastore.expect_list_shards().return_once(
move |_list_shards_request: ListShardsRequest| {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
shard_id: 15,
leader_id: "node1".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
publish_position_inclusive: None,
publish_token: None,
}],
next_shard_id: 18,
}],
};
Ok(list_shards_resp)
},
);

let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
&universe,
"cluster".to_string(),
node_id,
indexer_pool,
ingester_pool,
MetastoreServiceClient::from(mock_metastore),
1,
);
// This update should not trigger anything in the control plane.
control_plane_mailbox
.ask(DeleteIndexRequest {
index_uid: index_0.index_uid.to_string(),
})
.await
.unwrap()
.unwrap();

universe.assert_quit().await;
}
#[tokio::test]
async fn test_delete_source() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::default();
let node_id = NodeId::new("control-plane-node".to_string());
let indexer_pool = IndexerPool::default();

let ingester_pool = IngesterPool::default();
let mut ingester_mock = IngesterServiceClient::mock();
ingester_mock
.expect_retain_shards()
.times(2)
.returning(|mut request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert_eq!(&retain_shards_for_source.shard_ids, &[15]);
Ok(RetainShardsResponse {})
});
ingester_pool.insert("node1".into(), ingester_mock.into());

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let index_uid_clone = index_0.index_uid.clone();

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore.expect_delete_source().return_once(
move |delete_source_request: DeleteSourceRequest| {
assert_eq!(delete_source_request.index_uid, index_uid_clone.to_string());
assert_eq!(&delete_source_request.source_id, INGEST_SOURCE_ID);
Ok(EmptyResponse {})
},
);

let mut source = SourceConfig::ingest_v2_default();
source.enabled = true;
index_0.add_source(source.clone()).unwrap();

let index_0_clone = index_0.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
index_0_clone.clone()
])
.unwrap())
},
);

let index_uid_clone = index_0.index_uid.clone();
mock_metastore.expect_list_shards().return_once(
move |_list_shards_request: ListShardsRequest| {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
shard_id: 15,
leader_id: "node1".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
publish_position_inclusive: None,
publish_token: None,
}],
next_shard_id: 18,
}],
};
Ok(list_shards_resp)
},
);

let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
&universe,
"cluster".to_string(),
node_id,
indexer_pool,
ingester_pool,
MetastoreServiceClient::from(mock_metastore),
1,
);
// This update should not trigger anything in the control plane.
control_plane_mailbox
.ask(DeleteSourceRequest {
index_uid: index_0.index_uid.to_string(),
source_id: INGEST_SOURCE_ID.to_string(),
})
.await
.unwrap()
.unwrap();

universe.assert_quit().await;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
SourceType::IngestV2 => {
// Expect: the source should exist since we just read it from `get_source_configs`.
let shard_ids: Vec<ShardId> = model
.list_shards(&source_uid)
.list_shards_for_source(&source_uid)
.expect("source should exist")
.map(|shard| shard.shard_id)
.collect();
Expand Down
Loading

0 comments on commit aab1e5e

Please sign in to comment.