Skip to content

Commit

Permalink
Ensures the affinity function is the same as in Quickwit 0.8
Browse files Browse the repository at this point in the history
Closes #5576
  • Loading branch information
fulmicoton committed Dec 10, 2024
1 parent 7ec03f9 commit 0b8d821
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 10 deletions.
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub mod tower;
pub mod type_map;
pub mod uri;

mod socket_addr_legacy_hash;

use std::env;
use std::fmt::{Debug, Display};
use std::future::Future;
Expand All @@ -58,6 +60,7 @@ pub use coolid::new_coolid;
pub use kill_switch::KillSwitch;
pub use path_hasher::PathHasher;
pub use progress::{Progress, ProtectedZoneGuard};
pub use socket_addr_legacy_hash::SocketAddrLegacyHash;
pub use stream_utils::{BoxStream, ServiceStream};
use tracing::{error, info};

Expand Down
34 changes: 28 additions & 6 deletions quickwit/quickwit-common/src/rendezvous_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod tests {
use std::net::SocketAddr;

use super::*;
use crate::SocketAddrLegacyHash;

fn test_socket_addr(last_byte: u8) -> SocketAddr {
([127, 0, 0, last_byte], 10_000u16).into()
Expand All @@ -55,17 +56,38 @@ mod tests {
let socket3 = test_socket_addr(3);
let socket4 = test_socket_addr(4);

let mut socket_set1 = vec![socket4, socket3, socket1, socket2];
let legacy_socket1 = SocketAddrLegacyHash(&socket1);
let legacy_socket2 = SocketAddrLegacyHash(&socket2);
let legacy_socket3 = SocketAddrLegacyHash(&socket3);
let legacy_socket4 = SocketAddrLegacyHash(&socket4);

let mut socket_set1 = vec![
legacy_socket4,
legacy_socket3,
legacy_socket1,
legacy_socket2,
];
sort_by_rendez_vous_hash(&mut socket_set1, "key");

let mut socket_set2 = vec![socket1, socket2, socket4];
let mut socket_set2 = vec![legacy_socket1, legacy_socket2, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set2, "key");

let mut socket_set3 = vec![socket1, socket4];
let mut socket_set3 = vec![legacy_socket1, legacy_socket4];
sort_by_rendez_vous_hash(&mut socket_set3, "key");

assert_eq!(socket_set1, &[socket1, socket3, socket2, socket4]);
assert_eq!(socket_set2, &[socket1, socket2, socket4]);
assert_eq!(socket_set3, &[socket1, socket4]);
assert_eq!(
socket_set1,
&[
legacy_socket1,
legacy_socket2,
legacy_socket3,
legacy_socket4
]
);
assert_eq!(
socket_set2,
&[legacy_socket1, legacy_socket2, legacy_socket4]
);
assert_eq!(socket_set3, &[legacy_socket1, legacy_socket4]);
}
}
100 changes: 100 additions & 0 deletions quickwit/quickwit-common/src/socket_addr_legacy_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::hash::Hasher;
use std::net::SocketAddr;

/// Computes the hash of socket addr, the way it was done before Rust 1.81
///
/// In https://github.com/rust-lang/rust/commit/ba620344301aaa3b2733575a0696cdfd877edbdf
/// rustc change the implementation of Hash for IpAddr v4 and v6.
///
/// The idea was to not hash an array of bytes but instead interpret it as a register
/// and hash this.
///
/// This was done for performance reason, but this change the result of the hash function
/// used to compute affinity in quickwit. As a result, the switch would invalidate all
/// existing cache.
///
/// In order to avoid this, we introduce the following function that reproduces the old
/// behavior.
#[repr(transparent)]
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct SocketAddrLegacyHash<'a>(pub &'a SocketAddr);

impl<'a> std::hash::Hash for SocketAddrLegacyHash<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
std::mem::discriminant(self.0).hash(state);
match self.0 {
SocketAddr::V4(socket_addr_v4) => {
socket_addr_v4.ip().octets().hash(state);
socket_addr_v4.port().hash(state);
}
SocketAddr::V6(socket_addr_v6) => {
socket_addr_v6.ip().octets().hash(state);
socket_addr_v6.port().hash(state);
socket_addr_v6.flowinfo().hash(state);
socket_addr_v6.scope_id().hash(state);
}
}
}
}

#[cfg(test)]
mod tests {
use std::net::SocketAddrV6;

use super::*;

fn sample_socket_addr_v4() -> SocketAddr {
"17.12.15.3:1834".parse().unwrap()
}

fn sample_socket_addr_v6() -> SocketAddr {
let mut socket_addr_v6: SocketAddrV6 = "[fe80::240:63ff:fede:3c19]:8080".parse().unwrap();
socket_addr_v6.set_scope_id(4047u32);
socket_addr_v6.set_flowinfo(303u32);
socket_addr_v6.into()
}

fn compute_hash(hashable: impl std::hash::Hash) -> u64 {
// I wish I could have used the sip hasher but we don't have the deps here and I did
// not want to move that code to quickwit-common.
//
// If test break because rust changed its default hasher, we can just update the tests in
// this file with the new values.
let mut hasher = siphasher::sip::SipHasher::default();
hashable.hash(&mut hasher);
hasher.finish()
}

#[test]
fn test_legacy_hash_socket_addr_v4() {
let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v4()));
// This value is coming from using rust 1.80 to hash socket addr
assert_eq!(h, 8725442259486497862);
}

#[test]
fn test_legacy_hash_socket_addr_v6() {
let h = compute_hash(SocketAddrLegacyHash(&sample_socket_addr_v6()));
// This value is coming from using rust 1.80 to hash socket addr
assert_eq!(h, 14277248675058176752);
}
}
11 changes: 7 additions & 4 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use anyhow::bail;
use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_common::SocketAddrLegacyHash;
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::{info, warn};

Expand Down Expand Up @@ -77,7 +78,9 @@ impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
for report_split in evt.report_splits {
let node_addr = nodes
.keys()
.max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id))
.max_by_key(|node_addr| {
node_affinity(SocketAddrLegacyHash(node_addr), &report_split.split_id)
})
// This actually never happens thanks to the if-condition at the
// top of this function.
.expect("`nodes` should not be empty");
Expand Down Expand Up @@ -115,7 +118,7 @@ struct SocketAddrAndClient {

impl Hash for SocketAddrAndClient {
fn hash<H: Hasher>(&self, hasher: &mut H) {
self.socket_addr.hash(hasher);
SocketAddrLegacyHash(&self.socket_addr).hash(hasher);
}
}

Expand Down Expand Up @@ -174,7 +177,7 @@ impl SearchJobPlacer {
all_nodes.len()
);
}
let mut candidate_nodes: Vec<_> = all_nodes
let mut candidate_nodes: Vec<CandidateNode> = all_nodes
.into_iter()
.map(|(grpc_addr, client)| CandidateNode {
grpc_addr,
Expand Down Expand Up @@ -259,7 +262,7 @@ struct CandidateNode {

impl Hash for CandidateNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.grpc_addr.hash(state);
SocketAddrLegacyHash(&self.grpc_addr).hash(state);
}
}

Expand Down

0 comments on commit 0b8d821

Please sign in to comment.