diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e26fe2d021d..26b6a00b7be 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -626,6 +626,7 @@ impl Handler for ControlPlane { .model .list_shards_for_index(&index_uid) .flat_map(|shard_entry| shard_entry.ingesters()) + .map(|node_id_ref| node_id_ref.to_owned()) .collect(); self.model.delete_index(&index_uid); @@ -750,6 +751,7 @@ impl Handler for ControlPlane { shard_entries .values() .flat_map(|shard_entry| shard_entry.ingesters()) + .map(|node_id_ref| node_id_ref.to_owned()) .collect() } else { BTreeSet::new() diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 25185da878e..2ccaead6801 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -17,7 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeSet, HashMap}; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt; use std::future::Future; use std::sync::Arc; @@ -26,7 +27,6 @@ use std::time::Duration; use fnv::FnvHashSet; use futures::stream::FuturesUnordered; use futures::StreamExt; -use itertools::Itertools; use quickwit_actors::Mailbox; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; @@ -48,7 +48,10 @@ use quickwit_proto::metastore::{ serde_utils, MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardSubrequest, OpenShardsRequest, OpenShardsResponse, }; -use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid}; +use quickwit_proto::types::{IndexUid, NodeId, NodeIdRef, Position, ShardId, SourceUid}; +use rand::rngs::ThreadRng; +use rand::seq::SliceRandom; +use rand::{thread_rng, Rng, RngCore}; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, OwnedMutexGuard}; use tokio::task::JoinHandle; @@ -91,6 +94,110 @@ fn fire_and_forget( }); } +// Returns a random position of the els `slice`, such that the element in this array is NOT +// `except_el`. +fn pick_position( + els: &[&NodeIdRef], + except_el_opt: Option<&NodeIdRef>, + rng: &mut ThreadRng, +) -> Option { + let except_pos_opt = + except_el_opt.and_then(|except_el| els.iter().position(|el| *el == except_el)); + if let Some(except_pos) = except_pos_opt { + let pos = rng.gen_range(0..els.len() - 1); + if pos >= except_pos { + Some(pos + 1) + } else { + Some(pos) + } + } else { + Some(rng.gen_range(0..els.len())) + } +} + +/// Pick a node from the `shard_count_to_node_ids` that is different from `except_node_opt`. +/// We pick in priority nodes with the least number of shards, and we break any tie randomly. +/// +/// Once a node has been found, we update the `shard_count_to_node_ids` to reflect the new state. +/// In particular, the ingester node is moved from its previous shard_count level to its new +/// shard_count level. In particular, a shard_count entry that is empty should be removed from the +/// BTreeMap. +fn pick_one<'a>( + shard_count_to_node_ids: &mut BTreeMap>, + except_node_opt: Option<&'a NodeIdRef>, + rng: &mut ThreadRng, +) -> Option<&'a NodeIdRef> { + let (&shard_count, _) = shard_count_to_node_ids.iter().find(|(_, node_ids)| { + let Some(except_node) = except_node_opt else { + return true; + }; + if node_ids.len() >= 2 { + return true; + } + let Some(&single_node_id) = node_ids.first() else { + return false; + }; + single_node_id != except_node + })?; + let mut shard_entry = shard_count_to_node_ids.entry(shard_count); + let Entry::Occupied(occupied_shard_entry) = &mut shard_entry else { + panic!(); + }; + let nodes = occupied_shard_entry.get_mut(); + let position = pick_position(nodes, except_node_opt, rng)?; + + let node_id = nodes.swap_remove(position); + let new_shard_count = shard_count + 1; + let should_remove_entry = nodes.is_empty(); + + if should_remove_entry { + shard_count_to_node_ids.remove(&shard_count); + } + shard_count_to_node_ids + .entry(new_shard_count) + .or_default() + .push(node_id); + Some(node_id) +} + +/// Pick two ingester nodes from `shard_count_to_node_ids` different one from each other. +/// Ingesters with the lower number of shards are preferred. +fn pick_two<'a>( + shard_count_to_node_ids: &mut BTreeMap>, + rng: &mut ThreadRng, +) -> Option<(&'a NodeIdRef, &'a NodeIdRef)> { + let leader = pick_one(shard_count_to_node_ids, None, rng)?; + let follower = pick_one(shard_count_to_node_ids, Some(leader), rng)?; + Some((leader, follower)) +} + +fn allocate_shards( + node_id_shard_counts: &HashMap, + num_shards: usize, + replication_enabled: bool, +) -> Option)>> { + let mut shard_count_to_node_ids: BTreeMap> = BTreeMap::default(); + for (node_id, &num_shards) in node_id_shard_counts { + shard_count_to_node_ids + .entry(num_shards) + .or_default() + .push(node_id.as_ref()); + } + let mut rng = thread_rng(); + let mut shard_allocations: Vec<(&NodeIdRef, Option<&NodeIdRef>)> = + Vec::with_capacity(num_shards); + for _ in 0..num_shards { + if replication_enabled { + let (leader, follower) = pick_two(&mut shard_count_to_node_ids, &mut rng)?; + shard_allocations.push((leader, Some(follower))); + } else { + let leader = pick_one(&mut shard_count_to_node_ids, None, &mut rng)?; + shard_allocations.push((leader, None)); + } + } + Some(shard_allocations) +} + #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] pub struct IngestControllerStats { pub num_rebalance_shards_ops: usize, @@ -424,87 +531,64 @@ impl IngestController { unavailable_leaders: &FnvHashSet, model: &ControlPlaneModel, ) -> Option)>> { - let ingesters: Vec = self + // Count of open shards per available ingester node (including the ingester with 0 open + // shards). + let mut per_node_num_open_shards: HashMap = self .ingester_pool .keys() .into_iter() .filter(|ingester| !unavailable_leaders.contains(ingester)) - .sorted_by(|left, right| left.cmp(right)) + .map(|ingester| (ingester, 0)) .collect(); - let num_ingesters = ingesters.len(); + let num_ingesters = per_node_num_open_shards.len(); if num_ingesters == 0 { warn!("failed to allocate {num_shards_to_allocate} shards: no ingesters available"); return None; - } else if self.replication_factor > num_ingesters { + } + + if self.replication_factor > num_ingesters { warn!( "failed to allocate {num_shards_to_allocate} shards: replication factor is \ greater than the number of available ingesters" ); return None; } - let mut leader_follower_pairs = Vec::with_capacity(num_shards_to_allocate); - - let mut num_open_shards: usize = 0; - let mut per_leader_num_open_shards: HashMap<&str, usize> = - HashMap::with_capacity(num_ingesters); for shard in model.all_shards() { if shard.is_open() && !unavailable_leaders.contains(&shard.leader_id) { - num_open_shards += 1; - - *per_leader_num_open_shards - .entry(&shard.leader_id) - .or_default() += 1; - } - } - let mut num_remaining_shards_to_allocate = num_shards_to_allocate; - let num_open_shards_target = num_shards_to_allocate + num_open_shards; - let max_num_shards_to_allocate_per_node = num_open_shards_target / num_ingesters; - - // Allocate at most `max_num_shards_to_allocate_per_node` shards to each ingester. - for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { - if num_remaining_shards_to_allocate == 0 { - break; - } - let num_open_shards_inner = per_leader_num_open_shards - .get(leader_id.as_str()) - .copied() - .unwrap_or_default(); - - let num_shards_to_allocate_inner = max_num_shards_to_allocate_per_node - .saturating_sub(num_open_shards_inner) - .min(num_remaining_shards_to_allocate); - - for _ in 0..num_shards_to_allocate_inner { - num_remaining_shards_to_allocate -= 1; - - let leader = leader_id.clone(); - let mut follower_opt = None; - - if self.replication_factor > 1 { - follower_opt = Some(follower_id.clone()); + for ingest_node in shard.ingesters() { + if let Some(shard_count) = + per_node_num_open_shards.get_mut(ingest_node.as_str()) + { + *shard_count += 1; + } else { + // The shard is not present in the `per_node_num_open_shards` map. + // This is normal. It just means an ingester is temporarily unavailable, + // either from the control plane view (not present in the indexer pool, + // because as a result of information from + // chitchat), or because it is in the unavailable + // leaders map. + } } - leader_follower_pairs.push((leader, follower_opt)); } } - // Allocate remaining shards one by one. - for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { - if num_remaining_shards_to_allocate == 0 { - break; - } - num_remaining_shards_to_allocate -= 1; - - let leader = leader_id.clone(); - let mut follower_opt = None; - if self.replication_factor > 1 { - follower_opt = Some(follower_id.clone()); - } - leader_follower_pairs.push((leader, follower_opt)); - } - Some(leader_follower_pairs) + assert!(self.replication_factor == 1 || self.replication_factor == 2); + let leader_follower_pairs: Vec<(&NodeIdRef, Option<&NodeIdRef>)> = allocate_shards( + &per_node_num_open_shards, + num_shards_to_allocate, + self.replication_factor == 2, + )?; + Some( + leader_follower_pairs + .into_iter() + .map(|(leader_id, follower_id)| { + (leader_id.to_owned(), follower_id.map(NodeIdRef::to_owned)) + }) + .collect(), + ) } /// Calls init shards on the leaders hosting newly opened shards. @@ -786,6 +870,7 @@ impl IngestController { model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); return Ok(()); }; + info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); return Ok(()); @@ -922,24 +1007,24 @@ impl IngestController { } } - let num_open_shards_per_leader_target = num_open_shards / num_ingesters; - // We tolerate an ingester with 10% more shards than the average. - // Let's first identify the list of shards we want to "move". let num_open_shards_per_leader_threshold = - (num_open_shards_per_leader_target * 11).div_ceil(10); + (num_open_shards * 11).div_ceil(10 * num_ingesters); let mut shards_to_move: Vec = Vec::new(); + let mut rng = thread_rng(); for open_shards in per_leader_open_shards.values() { - if open_shards.len() <= num_open_shards_per_leader_threshold { - continue; + if let Some(num_shards_to_move) = open_shards + .len() + .checked_sub(num_open_shards_per_leader_threshold) + { + shards_to_move.extend( + open_shards[..] + .choose_multiple(&mut rng, num_shards_to_move) + .map(|shard_entry| shard_entry.shard.clone()), + ); } - shards_to_move.extend( - open_shards[num_open_shards_per_leader_threshold..] - .iter() - .map(|shard_entry| shard_entry.shard.clone()), - ); } shards_to_move @@ -1103,39 +1188,36 @@ pub(crate) struct RebalanceShardsCallback { pub rebalance_guard: OwnedMutexGuard<()>, } -/// Finds the shard with the highest ingestion rate on the ingester with the most number of open -/// shards. If multiple shards have the same ingestion rate, the shard with the lowest (oldest) +/// Finds a shard on the ingester with the highest number of open +/// shards for this source. +/// +/// If multiple shards are hosted on that ingester, the shard with the lowest (oldest) /// shard ID is chosen. fn find_scale_down_candidate( source_uid: &SourceUid, model: &ControlPlaneModel, ) -> Option<(NodeId, ShardId)> { - let mut per_leader_candidates: HashMap<&String, (usize, &ShardEntry)> = HashMap::new(); + let mut per_leader_shard_entries: HashMap<&String, Vec<&ShardEntry>> = HashMap::new(); + let mut rng = thread_rng(); for shard in model.get_shards_for_source(source_uid)?.values() { if shard.is_open() { - per_leader_candidates + per_leader_shard_entries .entry(&shard.leader_id) - .and_modify(|(num_shards, candidate)| { - *num_shards += 1; - - if shard - .long_term_ingestion_rate - .cmp(&candidate.long_term_ingestion_rate) - .then_with(|| shard.shard_id.cmp(&candidate.shard_id)) - .is_gt() - { - *candidate = shard; - } - }) - .or_insert((1, shard)); + .or_default() + .push(shard); } } - per_leader_candidates + per_leader_shard_entries .into_iter() - .min_by_key(|(_leader_id, (num_shards, _shard))| *num_shards) - .map(|(leader_id, (_num_shards, shard))| { - (leader_id.clone().into(), shard.shard_id().clone()) + // We use a random number to break ties... The HashMap is randomly seeded so this is + // should not make much difference, but we might want to be as explicit as possible. + .max_by_key(|(_leader_id, shard_entries)| (shard_entries.len(), rng.next_u32())) + .map(|(leader_id, shard_entries)| { + ( + leader_id.clone().into(), + shard_entries.choose(&mut rng).unwrap().shard_id().clone(), + ) }) } @@ -1147,6 +1229,7 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; + use itertools::Itertools; use quickwit_actors::Universe; use quickwit_common::setup_logging_for_tests; use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT; @@ -1552,6 +1635,9 @@ mod tests { let leader_follower_pairs_opt = controller.allocate_shards(0, &FnvHashSet::default(), &model); + + // We have only one node so with a replication factor of 2, we can't + // find any solution. assert!(leader_follower_pairs_opt.is_none()); ingester_pool.insert("test-ingester-2".into(), IngesterServiceClient::mocked()); @@ -1559,57 +1645,56 @@ mod tests { let leader_follower_pairs = controller .allocate_shards(0, &FnvHashSet::default(), &model) .unwrap(); + + // We tried to allocate 0 shards, so an empty vec makes sense. assert!(leader_follower_pairs.is_empty()); let leader_follower_pairs = controller .allocate_shards(1, &FnvHashSet::default(), &model) .unwrap(); + assert_eq!(leader_follower_pairs.len(), 1); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); + + // The leader follower is picked at random: both ingester have the same number of shards. + if leader_follower_pairs[0].0 == "test-ingester-1" { + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + } else { + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-1")) + ); + } let leader_follower_pairs = controller .allocate_shards(2, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 2); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); - assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); - assert_eq!( - leader_follower_pairs[1].1, - Some(NodeId::from("test-ingester-1")) - ); + for leader_follower_pair in leader_follower_pairs { + if leader_follower_pair.0 == "test-ingester-1" { + assert_eq!( + leader_follower_pair.1, + Some(NodeId::from("test-ingester-2")) + ); + } else { + assert_eq!(leader_follower_pair.0, "test-ingester-2"); + assert_eq!( + leader_follower_pair.1, + Some(NodeId::from("test-ingester-1")) + ); + } + } let leader_follower_pairs = controller .allocate_shards(3, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 3); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); - - assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); - assert_eq!( - leader_follower_pairs[1].1, - Some(NodeId::from("test-ingester-1")) - ); - - assert_eq!(leader_follower_pairs[2].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[2].1, - Some(NodeId::from("test-ingester-2")) - ); - let index_uid = IndexUid::for_test("test-index", 0); + let source_id: SourceId = "test-source".to_string(); let open_shards = vec![Shard { index_uid: Some(index_uid.clone()), @@ -1625,10 +1710,10 @@ mod tests { .allocate_shards(3, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 3); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); assert_eq!( leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) + Some(NodeId::from("test-ingester-1")) ); assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); @@ -1667,6 +1752,7 @@ mod tests { .allocate_shards(1, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 1); + // Ingester 1 already has two shards, so ingester 2 is picked as leader assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); assert_eq!( leader_follower_pairs[0].1, @@ -1678,6 +1764,7 @@ mod tests { let leader_follower_pairs = controller .allocate_shards(4, &unavailable_leaders, &model) .unwrap(); + // Ingester 2 is unavailable. Ingester 1 has open shards. Ingester 3 ends up leader. assert_eq!(leader_follower_pairs.len(), 4); assert_eq!(leader_follower_pairs[0].0, "test-ingester-3"); assert_eq!( @@ -1697,10 +1784,10 @@ mod tests { Some(NodeId::from("test-ingester-1")) ); - assert_eq!(leader_follower_pairs[3].0, "test-ingester-1"); + assert_eq!(leader_follower_pairs[3].0, "test-ingester-3"); assert_eq!( leader_follower_pairs[3].1, - Some(NodeId::from("test-ingester-3")) + Some(NodeId::from("test-ingester-1")) ); } @@ -2152,8 +2239,6 @@ mod tests { assert_eq!(request.shard_pkeys.len(), 1); assert_eq!(request.shard_pkeys[0].index_uid(), &index_uid_clone); assert_eq!(request.shard_pkeys[0].source_id, "test-source"); - assert_eq!(request.shard_pkeys[0].shard_id(), ShardId::from(2)); - Err(IngestV2Error::Internal( "failed to close shards".to_string(), )) @@ -2523,7 +2608,7 @@ mod tests { index_uid: index_uid.clone().into(), source_id: source_id.clone(), shard_id: Some(ShardId::from(3)), - shard_state: ShardState::Closed as i32, + shard_state: ShardState::Closed as i32, //< this one is closed leader_id: "test-ingester-0".to_string(), ..Default::default() }, @@ -2552,6 +2637,7 @@ mod tests { ..Default::default() }, ]; + // That's 3 open shards on indexer-1, 2 open shard and one closed shard on indexer-0.. model.insert_shards(&index_uid, &source_id, shards); let shard_infos = BTreeSet::from_iter([ @@ -2594,9 +2680,9 @@ mod tests { ]); model.update_shards(&source_uid, &shard_infos); - let (leader_id, shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap(); - assert_eq!(leader_id, "test-ingester-0"); - assert_eq!(shard_id, ShardId::from(2)); + let (leader_id, _shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap(); + // We pick ingester 1 has it has more open shard + assert_eq!(leader_id, "test-ingester-1"); } #[tokio::test] @@ -3073,4 +3159,138 @@ mod tests { let callback = &callbacks[0]; assert_eq!(callback.closed_shards.len(), 1); } + + // #[track_caller] + fn test_allocate_shards_aux_aux( + shard_counts_map: &HashMap, + num_shards: usize, + replication_enabled: bool, + ) { + let shard_allocations_opt = + super::allocate_shards(shard_counts_map, num_shards, replication_enabled); + if num_shards == 0 { + assert_eq!(shard_allocations_opt, Some(Vec::new())); + return; + } + let num_nodes_required = if replication_enabled { 2 } else { 1 }; + if shard_counts_map.len() < num_nodes_required { + assert!(shard_allocations_opt.is_none()); + return; + } + let shard_allocations = shard_allocations_opt.unwrap(); + let mut total_counts: HashMap<&NodeIdRef, usize> = HashMap::default(); + assert_eq!(shard_allocations.len(), num_shards); + if num_shards == 0 { + return; + } + for (leader, follower_opt) in shard_allocations { + assert_eq!(follower_opt.is_some(), replication_enabled); + *total_counts.entry(leader).or_default() += 1; + if let Some(follower) = follower_opt { + *total_counts.entry(follower).or_default() += 1; + assert_ne!(follower, leader); + } + } + for (shard, count) in shard_counts_map { + if let Some(shard_count) = total_counts.get_mut(shard.as_ref()) { + *shard_count += *count; + } + } + let (min, max) = total_counts + .values() + .copied() + .minmax() + .into_option() + .unwrap(); + if !replication_enabled { + // If replication is enabled, we can end up being forced to not spread shards as evenly + // as we would wish. For instance, if there are only two nodes initially + // unbalanced. + assert!(min + 1 >= max); + } else { + let (previous_min, previous_max) = shard_counts_map + .values() + .copied() + .minmax() + .into_option() + .unwrap(); + // The algorithm is supposed to reduce the variance. + // Of course sometimes it is not possible. For instance for 3 nodes that are + // perfectly balanced to begin with, if we as for a single shard. + assert!((previous_max - previous_min).max(1) >= (max - min)); + } + } + + fn test_allocate_shards_aux(shard_counts: &[usize]) { + let mut shard_counts_map: HashMap = HashMap::new(); + let shards: Vec = (0..shard_counts.len()) + .map(|i| format!("shard-{}", i)) + .collect(); + for (shard, &shard_count) in shards.into_iter().zip(shard_counts.iter()) { + shard_counts_map.insert(NodeId::from(shard), shard_count); + } + for i in 0..10 { + test_allocate_shards_aux_aux(&shard_counts_map, i, false); + test_allocate_shards_aux_aux(&shard_counts_map, i, true); + } + } + + use proptest::prelude::*; + + proptest! { + #[test] + fn test_proptest_allocate_shards(shard_counts in proptest::collection::vec(0..10usize, 0..10usize)) { + test_allocate_shards_aux(&shard_counts); + } + } + + #[test] + fn test_allocate_shards_prop_test() { + test_allocate_shards_aux(&[]); + test_allocate_shards_aux(&[1]); + test_allocate_shards_aux(&[1, 1]); + test_allocate_shards_aux(&[1, 2]); + test_allocate_shards_aux(&[1, 4]); + test_allocate_shards_aux(&[2, 3, 2]); + test_allocate_shards_aux(&[2, 4, 6]); + test_allocate_shards_aux(&[2, 3, 10]); + } + + #[test] + fn test_allocate_shards_prop_test_bug() { + test_allocate_shards_aux(&[7, 7, 7]); + } + + #[test] + fn test_pick_one() { + let mut shard_counts = BTreeMap::default(); + shard_counts.insert( + 1, + vec![NodeIdRef::from_str("node1"), NodeIdRef::from_str("node2")], + ); + let mut rng = rand::thread_rng(); + let node = pick_one( + &mut shard_counts, + Some(NodeIdRef::from_str("node2")), + &mut rng, + ) + .unwrap(); + assert_eq!(node.as_str(), "node1"); + assert_eq!(shard_counts.len(), 2); + assert_eq!( + &shard_counts.get(&1).unwrap()[..], + &[NodeIdRef::from_str("node2")] + ); + assert_eq!( + &shard_counts.get(&2).unwrap()[..], + &[NodeIdRef::from_str("node1")] + ); + let node = pick_one(&mut shard_counts, None, &mut rng).unwrap(); + assert_eq!(node.as_str(), "node2"); + assert_eq!(shard_counts.len(), 1); + assert_eq!( + &shard_counts.get(&2).unwrap()[..], + &[NodeIdRef::from_str("node1"), NodeIdRef::from_str("node2")] + ); + } } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index f07f93839c1..29c579cddcd 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -159,7 +159,7 @@ fn remove_shard_from_ingesters_internal( ) { for node in shard.ingesters() { let ingester_shards = ingester_shards - .get_mut(&node) + .get_mut(node) .expect("shard table reached inconsistent state"); let shard_ids = ingester_shards.get_mut(source_uid).unwrap(); let shard_was_removed = shard_ids.remove(shard.shard_id()); @@ -231,11 +231,7 @@ impl ShardTable { for shard_id in shard_ids { let shard_table_entry = self.table_entries.get(source_uid).unwrap(); debug_assert!(shard_table_entry.shard_entries.contains_key(shard_id)); - debug_assert!(shard_sets_in_shard_table.remove(&( - node.clone(), - source_uid, - shard_id - ))); + debug_assert!(shard_sets_in_shard_table.remove(&(node, source_uid, shard_id))); } } } @@ -365,7 +361,7 @@ impl ShardTable { } for shard in &opened_shards { for node in shard.ingesters() { - let ingester_shards = self.ingester_shards.entry(node).or_default(); + let ingester_shards = self.ingester_shards.entry(node.to_owned()).or_default(); let shard_ids = ingester_shards.entry(source_uid.clone()).or_default(); shard_ids.insert(shard.shard_id().clone()); } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index a60be74653d..f83f4f42c86 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -25,9 +25,8 @@ use quickwit_common::tower::MakeLoadShedError; use self::ingester::{PersistFailureReason, ReplicateFailureReason}; use self::router::IngestFailureReason; -use super::types::NodeId; use super::GrpcServiceError; -use crate::types::{queue_id, DocUid, Position, QueueId, ShardId, SourceUid}; +use crate::types::{queue_id, DocUid, NodeIdRef, Position, QueueId, ShardId, SourceUid}; use crate::{ServiceError, ServiceErrorCode}; pub mod ingester; @@ -95,11 +94,11 @@ impl MakeLoadShedError for IngestV2Error { impl Shard { /// List of nodes that are storing the shard (the leader, and optionally the follower). - pub fn ingesters(&self) -> impl Iterator + '_ { + pub fn ingesters(&self) -> impl Iterator + '_ { [Some(&self.leader_id), self.follower_id.as_ref()] .into_iter() .flatten() - .map(|node_id| NodeId::new(node_id.clone())) + .map(|node_id| NodeIdRef::from_str(node_id)) } pub fn source_uid(&self) -> SourceUid {