From 53069b0a2278d03a4a62362edd095428f1f43377 Mon Sep 17 00:00:00 2001 From: Eran Ifrah Date: Tue, 19 Nov 2024 17:36:21 +0200 Subject: [PATCH] When no keys are provided, route to a random node to achieve the same behavior as the standalone client (and it will also report a meaningful error to the log) Fixes: https://github.com/valkey-io/valkey-glide/issues/2714 Signed-off-by: Eran Ifrah --- .../redis-rs/redis/src/cluster_async/mod.rs | 14 +++++++++++++- glide-core/redis-rs/redis/src/cluster_routing.rs | 6 +++++- glide-core/tests/test_client.rs | 15 +++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 39e547c85b..19b71a33d8 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1510,12 +1510,24 @@ where convert_result(receiver.await) }; + // Sanity + if receivers.is_empty() { + return Err(RedisError::from(( + ErrorKind::ClientError, + "Client internal error", + "Failed to aggregate results for multi-slot command. Maybe a malformed command?" + .to_string(), + ))); + } + // TODO - once Value::Error will be merged, these will need to be updated to handle this new value. match response_policy { Some(ResponsePolicy::AllSucceeded) => { future::try_join_all(receivers.into_iter().map(get_receiver)) .await - .map(|mut results| results.pop().unwrap()) // unwrap is safe, since at least one function succeeded + .map(|mut results| { + results.pop().unwrap() // unwrap is safe, since at least one function succeeded + }) } Some(ResponsePolicy::OneSucceeded) => future::select_ok( receivers diff --git a/glide-core/redis-rs/redis/src/cluster_routing.rs b/glide-core/redis-rs/redis/src/cluster_routing.rs index 858e725408..eab3bf398a 100644 --- a/glide-core/redis-rs/redis/src/cluster_routing.rs +++ b/glide-core/redis-rs/redis/src/cluster_routing.rs @@ -26,7 +26,7 @@ pub enum LogicalAggregateOp { // Or, omitted due to dead code warnings. ATM this value isn't constructed anywhere } -/// Numerical aggreagting operators. +/// Numerical aggregating operators. #[derive(Debug, Clone, Copy, PartialEq)] pub enum AggregateOp { /// Choose minimal value @@ -512,6 +512,10 @@ where } let mut routes: Vec<(Route, Vec)> = routes.into_iter().collect(); + if routes.is_empty() { + return None; + } + Some(if routes.len() == 1 { RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(routes.pop().unwrap().0)) } else { diff --git a/glide-core/tests/test_client.rs b/glide-core/tests/test_client.rs index 4e197b5c66..ffc672fee6 100644 --- a/glide-core/tests/test_client.rs +++ b/glide-core/tests/test_client.rs @@ -638,6 +638,21 @@ pub(crate) mod shared_client_tests { }); } + #[test] + #[serial_test::serial] + fn test_multi_key_no_args_in_cluster() { + block_on_all(async { + let cluster = cluster::setup_default_cluster().await; + println!("Creating 1st cluster client..."); + let mut c1 = cluster::setup_default_client(&cluster).await; + let result = c1.send_command(&redis::cmd("MSET"), None).await; + assert!(result.is_err()); + let e = result.unwrap_err(); + assert!(e.kind().clone().eq(&redis::ErrorKind::ResponseError)); + assert!(e.to_string().contains("wrong number of arguments")); + }); + } + #[rstest] #[serial_test::serial] #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]