diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index dff26829584..fbda5acbacb 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -48,6 +48,8 @@ pub mod tower; pub mod type_map; pub mod uri; +mod socket_addr_legacy_hash; + use std::env; use std::fmt::{Debug, Display}; use std::future::Future; @@ -58,6 +60,7 @@ pub use coolid::new_coolid; pub use kill_switch::KillSwitch; pub use path_hasher::PathHasher; pub use progress::{Progress, ProtectedZoneGuard}; +pub use socket_addr_legacy_hash::SocketAddrLegacyHash; pub use stream_utils::{BoxStream, ServiceStream}; use tracing::{error, info}; diff --git a/quickwit/quickwit-common/src/rendezvous_hasher.rs b/quickwit/quickwit-common/src/rendezvous_hasher.rs index 2d3c24efd3f..aadfd314768 100644 --- a/quickwit/quickwit-common/src/rendezvous_hasher.rs +++ b/quickwit/quickwit-common/src/rendezvous_hasher.rs @@ -43,6 +43,7 @@ mod tests { use std::net::SocketAddr; use super::*; + use crate::SocketAddrLegacyHash; fn test_socket_addr(last_byte: u8) -> SocketAddr { ([127, 0, 0, last_byte], 10_000u16).into() @@ -55,17 +56,38 @@ mod tests { let socket3 = test_socket_addr(3); let socket4 = test_socket_addr(4); - let mut socket_set1 = vec![socket4, socket3, socket1, socket2]; + let legacy_socket1 = SocketAddrLegacyHash(&socket1); + let legacy_socket2 = SocketAddrLegacyHash(&socket2); + let legacy_socket3 = SocketAddrLegacyHash(&socket3); + let legacy_socket4 = SocketAddrLegacyHash(&socket4); + + let mut socket_set1 = vec![ + legacy_socket4, + legacy_socket3, + legacy_socket1, + legacy_socket2, + ]; sort_by_rendez_vous_hash(&mut socket_set1, "key"); - let mut socket_set2 = vec![socket1, socket2, socket4]; + let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4]; sort_by_rendez_vous_hash(&mut socket_set2, "key"); - let mut socket_set3 = vec![socket1, socket4]; + let mut socket_set3 = vec![legacy_socket1, legacy_socket4]; sort_by_rendez_vous_hash(&mut socket_set3, "key"); - assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]); - assert_eq!(socket_set2, &[socket1, socket2, socket4]); - assert_eq!(socket_set3, &[socket1, socket4]); + assert_eq!( + socket_set1, + &[ + legacy_socket1, + legacy_socket2, + legacy_socket3, + legacy_socket4 + ] + ); + assert_eq!( + socket_set2, + &[legacy_socket1, legacy_socket2, legacy_socket4] + ); + assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]); } } diff --git a/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs new file mode 100644 index 00000000000..0adadf8f2e4 --- /dev/null +++ b/quickwit/quickwit-common/src/socket_addr_legacy_hash.rs @@ -0,0 +1,100 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::hash::Hasher; +use std::net::SocketAddr; + +/// Computes the hash of socket addr, the way it was done before Rust 1.81 +/// +/// In +/// rustc change the implementation of Hash for IpAddr v4 and v6. +/// +/// The idea was to not hash an array of bytes but instead interpret it as a register +/// and hash this. +/// +/// This was done for performance reason, but this change the result of the hash function +/// used to compute affinity in quickwit. As a result, the switch would invalidate all +/// existing cache. +/// +/// In order to avoid this, we introduce the following function that reproduces the old +/// behavior. +#[repr(transparent)] +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr); + +impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> { + fn hash(&self, state: &mut H) { + std::mem::discriminant(self.0).hash(state); + match self.0 { + SocketAddr::V4(socket_addr_v4) => { + socket_addr_v4.ip().octets().hash(state); + socket_addr_v4.port().hash(state); + } + SocketAddr::V6(socket_addr_v6) => { + socket_addr_v6.ip().octets().hash(state); + socket_addr_v6.port().hash(state); + socket_addr_v6.flowinfo().hash(state); + socket_addr_v6.scope_id().hash(state); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddrV6; + + use super::*; + + fn sample_socket_addr_v4() -> SocketAddr { + "17.12.15.3:1834".parse().unwrap() + } + + fn sample_socket_addr_v6() -> SocketAddr { + let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap(); + socket_addr_v6.set_scope_id(4047u32); + socket_addr_v6.set_flowinfo(303u32); + socket_addr_v6.into() + } + + fn compute_hash(hashable: impl std::hash::Hash) -> u64 { + // I wish I could have used the sip hasher but we don't have the deps here and I did + // not want to move that code to quickwit-common. + // + // If test break because rust changed its default hasher, we can just update the tests in + // this file with the new values. + let mut hasher = siphasher::sip::SipHasher::default(); + hashable.hash(&mut hasher); + hasher.finish() + } + + #[test] + fn test_legacy_hash_socket_addr_v4() { + let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4())); + // This value is coming from using rust 1.80 to hash socket addr + assert_eq!(h, 8725442259486497862); + } + + #[test] + fn test_legacy_hash_socket_addr_v6() { + let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6())); + // This value is coming from using rust 1.80 to hash socket addr + assert_eq!(h, 14277248675058176752); + } +} diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index b8042f03fb7..d32ad92327c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -746,30 +746,30 @@ mod tests { #[tokio::test] async fn test_put_kv_happy_path() { // 3 servers 1, 2, 3 - // Targeted key has affinity [3, 2, 1]. + // Targeted key has affinity [2, 3, 1]. // // Put on 2 and 3 is successful - // Get succeeds on 3. + // Get succeeds on 2. let mock_search_service_1 = MockSearchService::new(); let mut mock_search_service_2 = MockSearchService::new(); - // Due to the buffered call it is possible for the - // put request to 2 to be emitted too. - mock_search_service_2 - .expect_put_kv() - .returning(|_put_req: quickwit_proto::search::PutKvRequest| {}); - let mut mock_search_service_3 = MockSearchService::new(); - mock_search_service_3.expect_put_kv().once().returning( + mock_search_service_2.expect_put_kv().once().returning( |put_req: quickwit_proto::search::PutKvRequest| { assert_eq!(put_req.key, b"my_key"); assert_eq!(put_req.payload, b"my_payload"); }, ); - mock_search_service_3.expect_get_kv().once().returning( + mock_search_service_2.expect_get_kv().once().returning( |get_req: quickwit_proto::search::GetKvRequest| { assert_eq!(get_req.key, b"my_key"); Some(b"my_payload".to_vec()) }, ); + let mut mock_search_service_3 = MockSearchService::new(); + // Due to the buffered call it is possible for the + // put request to 3 to be emitted too. + mock_search_service_3 + .expect_put_kv() + .returning(|_put_req: quickwit_proto::search::PutKvRequest| {}); let searcher_pool = searcher_pool_for_test([ ("127.0.0.1:1001", mock_search_service_1), ("127.0.0.1:1002", mock_search_service_2), @@ -791,11 +791,11 @@ mod tests { #[tokio::test] async fn test_put_kv_failing_get() { // 3 servers 1, 2, 3 - // Targeted key has affinity [3, 2, 1]. + // Targeted key has affinity [2, 3, 1]. // // Put on 2 and 3 is successful - // Get fails on 3. - // Get succeeds on 2. + // Get fails on 2. + // Get succeeds on 3. let mock_search_service_1 = MockSearchService::new(); let mut mock_search_service_2 = MockSearchService::new(); mock_search_service_2.expect_put_kv().once().returning( @@ -807,7 +807,7 @@ mod tests { mock_search_service_2.expect_get_kv().once().returning( |get_req: quickwit_proto::search::GetKvRequest| { assert_eq!(get_req.key, b"my_key"); - Some(b"my_payload".to_vec()) + None }, ); let mut mock_search_service_3 = MockSearchService::new(); @@ -820,7 +820,7 @@ mod tests { mock_search_service_3.expect_get_kv().once().returning( |get_req: quickwit_proto::search::GetKvRequest| { assert_eq!(get_req.key, b"my_key"); - None + Some(b"my_payload".to_vec()) }, ); let searcher_pool = searcher_pool_for_test([ diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index eb15513a76c..d739a76eed4 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -27,6 +27,7 @@ use anyhow::bail; use async_trait::async_trait; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; +use quickwit_common::SocketAddrLegacyHash; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use tracing::{info, warn}; @@ -77,7 +78,9 @@ impl EventSubscriber for SearchJobPlacer { for report_split in evt.report_splits { let node_addr = nodes .keys() - .max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id)) + .max_by_key(|node_addr| { + node_affinity(SocketAddrLegacyHash(node_addr), &report_split.split_id) + }) // This actually never happens thanks to the if-condition at the // top of this function. .expect("`nodes` should not be empty"); @@ -115,7 +118,7 @@ struct SocketAddrAndClient { impl Hash for SocketAddrAndClient { fn hash(&self, hasher: &mut H) { - self.socket_addr.hash(hasher); + SocketAddrLegacyHash(&self.socket_addr).hash(hasher); } } @@ -174,7 +177,7 @@ impl SearchJobPlacer { all_nodes.len() ); } - let mut candidate_nodes: Vec<_> = all_nodes + let mut candidate_nodes: Vec = all_nodes .into_iter() .map(|(grpc_addr, client)| CandidateNode { grpc_addr, @@ -259,7 +262,7 @@ struct CandidateNode { impl Hash for CandidateNode { fn hash(&self, state: &mut H) { - self.grpc_addr.hash(state); + SocketAddrLegacyHash(&self.grpc_addr).hash(state); } } @@ -432,17 +435,17 @@ mod tests { ( expected_searcher_addr_1, vec![ + SearchJob::for_test("split5", 5), + SearchJob::for_test("split4", 4), SearchJob::for_test("split3", 3), - SearchJob::for_test("split2", 2), - SearchJob::for_test("split1", 1), ], ), ( expected_searcher_addr_2, vec![ SearchJob::for_test("split6", 6), - SearchJob::for_test("split5", 5), - SearchJob::for_test("split4", 4), + SearchJob::for_test("split2", 2), + SearchJob::for_test("split1", 1), ], ), ];