From 8067bbf817197d070c5d215e3ed6d8023d63ad81 Mon Sep 17 00:00:00 2001 From: sinhaashish Date: Fri, 1 Sep 2023 03:40:16 +0000 Subject: [PATCH] chore(upgrade): add volume node validation Signed-off-by: sinhaashish --- .../src/bin/upgrade-job/upgrade/data_plane.rs | 2 +- .../src/bin/upgrade-job/upgrade/utils.rs | 60 ++++++++++++------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs index 40684505c..805216ee3 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs @@ -255,7 +255,7 @@ async fn wait_for_rebuild(node_name: &str, rest_client: &RestClientSet) -> Resul let mut result = RebuildResult::default(); loop { - let rebuild = rebuild_result(rest_client, &mut result.discarded_volumes).await?; + let rebuild = rebuild_result(rest_client, &mut result.discarded_volumes, node_name).await?; if rebuild.rebuilding { info!(node.name = %node_name, "Waiting for volume rebuilds to complete"); diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs index de144d493..efa666779 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs @@ -10,7 +10,7 @@ use kube::{api::ObjectList, ResourceExt}; use openapi::models::{Volume, VolumeStatus}; use semver::{Version, VersionReq}; use snafu::ResultExt; -use std::time::Duration; +use std::{collections::HashSet, time::Duration}; use tracing::{info, warn}; /// Contains the Rebuild Results. @@ -24,6 +24,7 @@ pub(crate) struct RebuildResult { pub(crate) async fn rebuild_result( rest_client: &RestClientSet, stale_volumes: &mut Vec, + node_name: &str, ) -> Result { loop { let unhealthy_volumes = list_unhealthy_volumes(rest_client, stale_volumes).await?; @@ -32,33 +33,50 @@ pub(crate) async fn rebuild_result( } for volume in unhealthy_volumes.iter() { - match replica_rebuild_count(volume.clone()).await { - 0 => { - for _i in 0 .. 11 { - // wait for a minute for any rebuild to start - tokio::time::sleep(Duration::from_secs(60_u64)).await; - let count = replica_rebuild_count(volume.clone()).await; - if count > 0 { - return Ok(RebuildResult { - rebuilding: true, - discarded_volumes: stale_volumes.clone(), - }); + let target = if let Some(target) = volume.state.target.as_ref() { + target + } else { + continue; + }; + + let mut volume_over_nodes = HashSet::new(); + volume_over_nodes.insert(target.node.as_str()); + + for (_, topology) in volume.state.replica_topology.iter() { + if let Some(node) = topology.node.as_ref() { + volume_over_nodes.insert(node); + } + } + + if volume_over_nodes.contains(node_name) { + match replica_rebuild_count(volume) { + 0 => { + for _i in 0 .. 11 { + // wait for a minute for any rebuild to start + tokio::time::sleep(Duration::from_secs(60_u64)).await; + let count = replica_rebuild_count(volume); + if count > 0 { + return Ok(RebuildResult { + rebuilding: true, + discarded_volumes: stale_volumes.clone(), + }); + } } + stale_volumes.push(volume.clone()); + } + _ => { + return Ok(RebuildResult { + rebuilding: true, + discarded_volumes: stale_volumes.to_vec(), + }) } - stale_volumes.push(volume.clone()); - } - _ => { - return Ok(RebuildResult { - rebuilding: true, - discarded_volumes: stale_volumes.clone(), - }) } } } } Ok(RebuildResult { rebuilding: false, - discarded_volumes: stale_volumes.clone(), + discarded_volumes: stale_volumes.to_vec(), }) } @@ -96,7 +114,7 @@ pub(crate) async fn list_unhealthy_volumes( } /// Count of number of replica rebuilding. -pub(crate) async fn replica_rebuild_count(volume: Volume) -> i32 { +pub(crate) fn replica_rebuild_count(volume: &Volume) -> i32 { let mut rebuild_count = 0; if let Some(target) = &volume.state.target { for child in target.children.iter() {