Skip to content

Commit

Permalink
chore(bors): merge pull request #336
Browse files Browse the repository at this point in the history
336: chore(upgrade) : add volume node validation r=sinhaashish a=sinhaashish

<!

Co-authored-by: sinhaashish <[email protected]>
  • Loading branch information
mayastor-bors and sinhaashish committed Sep 15, 2023
2 parents b9aaaf1 + 8067bbf commit feabbb7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
2 changes: 1 addition & 1 deletion k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async fn wait_for_rebuild(node_name: &str, rest_client: &RestClientSet) -> Resul

let mut result = RebuildResult::default();
loop {
let rebuild = rebuild_result(rest_client, &mut result.discarded_volumes).await?;
let rebuild = rebuild_result(rest_client, &mut result.discarded_volumes, node_name).await?;

if rebuild.rebuilding {
info!(node.name = %node_name, "Waiting for volume rebuilds to complete");
Expand Down
60 changes: 39 additions & 21 deletions k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use kube::{api::ObjectList, ResourceExt};
use openapi::models::{Volume, VolumeStatus};
use semver::{Version, VersionReq};
use snafu::ResultExt;
use std::time::Duration;
use std::{collections::HashSet, time::Duration};
use tracing::{info, warn};

/// Contains the Rebuild Results.
Expand All @@ -24,6 +24,7 @@ pub(crate) struct RebuildResult {
pub(crate) async fn rebuild_result(
rest_client: &RestClientSet,
stale_volumes: &mut Vec<Volume>,
node_name: &str,
) -> Result<RebuildResult> {
loop {
let unhealthy_volumes = list_unhealthy_volumes(rest_client, stale_volumes).await?;
Expand All @@ -32,33 +33,50 @@ pub(crate) async fn rebuild_result(
}

for volume in unhealthy_volumes.iter() {
match replica_rebuild_count(volume.clone()).await {
0 => {
for _i in 0 .. 11 {
// wait for a minute for any rebuild to start
tokio::time::sleep(Duration::from_secs(60_u64)).await;
let count = replica_rebuild_count(volume.clone()).await;
if count > 0 {
return Ok(RebuildResult {
rebuilding: true,
discarded_volumes: stale_volumes.clone(),
});
let target = if let Some(target) = volume.state.target.as_ref() {
target
} else {
continue;
};

let mut volume_over_nodes = HashSet::new();
volume_over_nodes.insert(target.node.as_str());

for (_, topology) in volume.state.replica_topology.iter() {
if let Some(node) = topology.node.as_ref() {
volume_over_nodes.insert(node);
}
}

if volume_over_nodes.contains(node_name) {
match replica_rebuild_count(volume) {
0 => {
for _i in 0 .. 11 {
// wait for a minute for any rebuild to start
tokio::time::sleep(Duration::from_secs(60_u64)).await;
let count = replica_rebuild_count(volume);
if count > 0 {
return Ok(RebuildResult {
rebuilding: true,
discarded_volumes: stale_volumes.clone(),
});
}
}
stale_volumes.push(volume.clone());
}
_ => {
return Ok(RebuildResult {
rebuilding: true,
discarded_volumes: stale_volumes.to_vec(),
})
}
stale_volumes.push(volume.clone());
}
_ => {
return Ok(RebuildResult {
rebuilding: true,
discarded_volumes: stale_volumes.clone(),
})
}
}
}
}
Ok(RebuildResult {
rebuilding: false,
discarded_volumes: stale_volumes.clone(),
discarded_volumes: stale_volumes.to_vec(),
})
}

Expand Down Expand Up @@ -96,7 +114,7 @@ pub(crate) async fn list_unhealthy_volumes(
}

/// Count of number of replica rebuilding.
pub(crate) async fn replica_rebuild_count(volume: Volume) -> i32 {
pub(crate) fn replica_rebuild_count(volume: &Volume) -> i32 {
let mut rebuild_count = 0;
if let Some(target) = &volume.state.target {
for child in target.children.iter() {
Expand Down

0 comments on commit feabbb7

Please sign in to comment.