Skip to content

Commit

Permalink
Create and init shards on ingesters via control plane (#4205)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 29, 2023
1 parent 778e7e3 commit 9679d56
Show file tree
Hide file tree
Showing 9 changed files with 1,246 additions and 309 deletions.
59 changes: 53 additions & 6 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::fmt;
use std::time::Duration;

use fnv::FnvHashSet;
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_common::{PrettySample, Progress};
use quickwit_ingest::IngesterPool;
Expand All @@ -29,14 +29,14 @@ use quickwit_proto::control_plane::{
GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{IngestV2Error, ShardIds, ShardState};
use quickwit_proto::ingest::ingester::{IngesterService, InitShardsRequest, PingRequest};
use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIds, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
use rand::seq::SliceRandom;
use tokio::time::timeout;
use tracing::info;
use tracing::{info, warn};

use crate::control_plane_model::ControlPlaneModel;

Expand Down Expand Up @@ -302,6 +302,9 @@ impl IngestController {
let open_shards_response = progress
.protect_future(self.metastore.open_shards(open_shards_request))
.await?;

self.init_shards(&open_shards_response, progress).await;

for open_shards_subresponse in open_shards_response.subresponses {
let index_uid: IndexUid = open_shards_subresponse.index_uid.clone().into();
let source_id = open_shards_subresponse.source_id.clone();
Expand Down Expand Up @@ -330,6 +333,38 @@ impl IngestController {
failures: get_or_create_open_shards_failures,
})
}

/// Calls init shards on the leaders hosting newly opened shards.
async fn init_shards(
&self,
open_shard_response: &metastore::OpenShardsResponse,
progress: &Progress,
) {
let mut per_leader_opened_shards: FnvHashMap<&String, Vec<Shard>> = FnvHashMap::default();

for subresponse in &open_shard_response.subresponses {
for shard in &subresponse.opened_shards {
per_leader_opened_shards
.entry(&shard.leader_id)
.or_default()
.push(shard.clone());
}
}
for (leader_id, shards) in per_leader_opened_shards {
let init_shards_request = InitShardsRequest { shards };

let Some(mut leader) = self.ingester_pool.get(leader_id) else {
warn!("failed to init shards: ingester `{leader_id}` is unavailable");
continue;
};
if let Err(error) = progress
.protect_future(leader.init_shards(init_shards_request))
.await
{
warn!("failed to init shards: {error}");
}
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -345,7 +380,7 @@ mod tests {
use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
use quickwit_proto::ingest::ingester::{
IngesterServiceClient, MockIngesterService, PingResponse,
IngesterServiceClient, InitShardsResponse, MockIngesterService, PingResponse,
};
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::types::SourceId;
Expand Down Expand Up @@ -606,7 +641,19 @@ mod tests {
let ingester: IngesterServiceClient = mock_ingester.into();
ingester_pool.insert("test-ingester-1".into(), ingester.clone());

let mock_ingester = MockIngesterService::default();
let mut mock_ingester = MockIngesterService::default();
mock_ingester
.expect_init_shards()
.once()
.returning(|request| {
assert_eq!(request.shards.len(), 1);
assert_eq!(request.shards[0].index_uid, "test-index-1:0");
assert_eq!(request.shards[0].source_id, "test-source");
assert_eq!(request.shards[0].shard_id, 1);
assert_eq!(request.shards[0].leader_id, "test-ingester-2");

Ok(InitShardsResponse {})
});
let ingester: IngesterServiceClient = mock_ingester.into();
ingester_pool.insert("test-ingester-2".into(), ingester.clone());

Expand Down
Loading

0 comments on commit 9679d56

Please sign in to comment.