diff --git a/k8s/upgrade/src/bin/upgrade-job/common/constants.rs b/k8s/upgrade/src/bin/upgrade-job/common/constants.rs index a2fea91a0..850a320e8 100644 --- a/k8s/upgrade/src/bin/upgrade-job/common/constants.rs +++ b/k8s/upgrade/src/bin/upgrade-job/common/constants.rs @@ -1,3 +1,5 @@ +use semver::Version; + /// This is the name of the project that is being upgraded. pub(crate) const PRODUCT: &str = "Mayastor"; @@ -30,6 +32,14 @@ pub(crate) const TO_UMBRELLA_SEMVER: &str = "4.0.0"; pub(crate) const UMBRELLA_CHART_UPGRADE_DOCS_URL: &str = "https://openebs.io/docs/user-guides/upgrade#mayastor-upgrade"; +/// This is the limit for the number of objects we want to collect over the network from +/// the kubernetes api. +pub(crate) const KUBE_API_PAGE_SIZE: u32 = 500; + +/// The Core chart version limits for requiring partial rebuild to be disabled for upgrade. +pub(crate) const PARTIAL_REBUILD_DISABLE_EXTENTS: (Version, Version) = + (Version::new(2, 2, 0), Version::new(2, 5, 0)); + /// Version value for the earliest possible 2.0 release. pub(crate) const TWO_DOT_O_RC_ONE: &str = "2.0.0-rc.1"; diff --git a/k8s/upgrade/src/bin/upgrade-job/common/error.rs b/k8s/upgrade/src/bin/upgrade-job/common/error.rs index bb71b0fe7..f079c2511 100644 --- a/k8s/upgrade/src/bin/upgrade-job/common/error.rs +++ b/k8s/upgrade/src/bin/upgrade-job/common/error.rs @@ -202,20 +202,6 @@ pub(crate) enum Error { pod_namespace: String, }, - /// Error for when a Kubernetes API request for GET-ing a list of Pods filtered by label(s) - /// fails. - #[snafu(display( - "Failed to list Pods with label {} in namespace {}: {}", - label, - namespace, - source - ))] - ListPodsWithLabel { - source: kube::Error, - label: String, - namespace: String, - }, - /// Error for when a Kubernetes API request for GET-ing a list of Nodes filtered by label(s) /// fails. #[snafu(display("Failed to list Nodes with label {}: {}", label, source))] @@ -491,23 +477,6 @@ pub(crate) enum Error { #[snafu(display("Failed to send Event over the channel"))] EventChannelSend, - /// Error for when the no value for version label is found on the helm chart. - #[snafu(display( - "Failed to get the value of the {} label in Pod {} in Namespace {}", - CHART_VERSION_LABEL_KEY, - pod_name, - namespace - ))] - HelmChartVersionLabelHasNoValue { pod_name: String, namespace: String }, - - /// Error for when a pod does not have Namespace set on it. - #[snafu(display( - "Found None when trying to get Namespace for Pod {}, context: {}", - pod_name, - context - ))] - NoNamespaceInPod { pod_name: String, context: String }, - /// Error for the Umbrella chart is not upgraded. #[snafu(display( "The {} helm chart is not upgraded to version {}: Upgrade for helm chart {} is not \ @@ -694,6 +663,13 @@ pub(crate) enum Error { #[snafu(display("failed to list CustomResourceDefinitions: {source}"))] ListCrds { source: kube::Error }, + + #[snafu(display("Partial rebuild must be disabled for upgrades from {chart_name} chart versions >= {lower_extent}, <= {upper_extent}"))] + PartialRebuildNotAllowed { + chart_name: String, + lower_extent: String, + upper_extent: String, + }, } /// A wrapper type to remove repeated Result returns. diff --git a/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs b/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs index 8877c10fd..da03ca0bf 100644 --- a/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs +++ b/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs @@ -1,4 +1,7 @@ -use crate::common::error::{K8sClientGeneration, Result}; +use crate::common::{ + constants::KUBE_API_PAGE_SIZE, + error::{K8sClientGeneration, ListPodsWithLabelAndField, Result}, +}; use k8s_openapi::{ api::{ apps::v1::Deployment, @@ -6,7 +9,10 @@ use k8s_openapi::{ }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, }; -use kube::{api::Api, Client}; +use kube::{ + api::{Api, ListParams}, + Client, +}; use snafu::ResultExt; /// Generate a new kube::Client. @@ -38,3 +44,46 @@ pub(crate) async fn pods_api(namespace: &str) -> Result> { pub(crate) async fn deployments_api(namespace: &str) -> Result> { Ok(Api::namespaced(client().await?, namespace)) } + +pub(crate) async fn list_pods( + namespace: String, + label_selector: Option, + field_selector: Option, +) -> Result> { + let mut pods: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); + if let Some(ref labels) = label_selector { + list_params = list_params.labels(labels.as_str()); + } + if let Some(ref fields) = field_selector { + list_params = list_params.fields(fields.as_str()); + } + + let list_pods_error_ctx = ListPodsWithLabelAndField { + label: label_selector.unwrap_or_default(), + field: field_selector.unwrap_or_default(), + namespace: namespace.clone(), + }; + + loop { + let pod_list = pods_api(namespace.as_str()) + .await? + .list(&list_params) + .await + .context(list_pods_error_ctx.clone())?; + + let continue_ = pod_list.metadata.continue_.clone(); + + pods = pods.into_iter().chain(pod_list).collect(); + + match continue_ { + Some(token) => { + list_params = list_params.continue_token(token.as_str()); + } + None => break, + } + } + + Ok(pods) +} diff --git a/k8s/upgrade/src/bin/upgrade-job/helm/chart.rs b/k8s/upgrade/src/bin/upgrade-job/helm/chart.rs index 0da2efe7b..0855f8b2d 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm/chart.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm/chart.rs @@ -31,6 +31,8 @@ impl Chart { pub(crate) trait HelmValuesCollection { /// This is a getter for state of the 'ha' feature (enabled/disabled). fn ha_is_enabled(&self) -> bool; + /// This is a getter for the partial-rebuild toggle value. + fn partial_rebuild_is_enabled(&self) -> bool; } /// UmbrellaValues is used to deserialize the helm values.yaml for the Umbrella chart. The Core @@ -56,6 +58,10 @@ impl HelmValuesCollection for UmbrellaValues { fn ha_is_enabled(&self) -> bool { self.core.ha_is_enabled() } + + fn partial_rebuild_is_enabled(&self) -> bool { + self.core.partial_rebuild_is_enabled() + } } /// This is used to deserialize the values.yaml of the Core chart. @@ -114,14 +120,13 @@ impl HelmValuesCollection for CoreValues { fn ha_is_enabled(&self) -> bool { self.agents.ha_is_enabled() } -} -impl CoreValues { - /// This is a getter for state of the 'ha' feature (enabled/disabled). - pub(crate) fn ha_is_enabled(&self) -> bool { - self.agents.ha_is_enabled() + fn partial_rebuild_is_enabled(&self) -> bool { + self.agents.partial_rebuild_is_enabled() } +} +impl CoreValues { /// This is a getter for the container image tag of the Core chart. pub(crate) fn image_tag(&self) -> &str { self.image.tag() @@ -302,6 +307,7 @@ impl CoreValues { /// This is used to deserialize the yaml object agents. #[derive(Deserialize)] struct Agents { + core: Core, ha: Ha, } @@ -310,6 +316,10 @@ impl Agents { fn ha_is_enabled(&self) -> bool { self.ha.enabled() } + + fn partial_rebuild_is_enabled(&self) -> bool { + self.core.partial_rebuild_is_enabled() + } } /// This is used to deserialize the yaml object base. @@ -326,6 +336,57 @@ impl Base { } } +/// This is used to deserialize the yaml object 'agents.core'. +#[derive(Deserialize)] +struct Core { + #[serde(default)] + rebuild: Rebuild, +} + +impl Core { + fn partial_rebuild_is_enabled(&self) -> bool { + self.rebuild.partial_is_enabled() + } +} + +/// This is used to deserialize the yaml object 'agents.core.rebuild'. +#[derive(Default, Deserialize)] +struct Rebuild { + partial: RebuildPartial, +} + +impl Rebuild { + fn partial_is_enabled(&self) -> bool { + self.partial.enabled() + } +} + +/// This is used to deserialize the yaml object 'agents.core.rebuild.partial'. +#[derive(Deserialize)] +struct RebuildPartial { + enabled: bool, +} + +impl Default for RebuildPartial { + /// We've never shipped with partial rebuild set to off. Also for a good while after + /// the feature was introduced, it was enabled without an option to disable it. So + /// assuming that partial rebuild is enabled, if the YAML object for Rebuild is missing. + /// The Rebuild type will be deserialized with a default value if it's absent from the + /// helm values. + /// + /// #[serde(default)] + /// rebuild: Rebuild, + fn default() -> Self { + Self { enabled: true } + } +} + +impl RebuildPartial { + fn enabled(&self) -> bool { + self.enabled + } +} + /// This is used to deserialize the yaml object 'agents.ha'. #[derive(Deserialize)] struct Ha { diff --git a/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs b/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs index a65e0cb62..b6154ce6e 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs @@ -27,7 +27,8 @@ use tracing::info; /// HelmUpgradeRunner is returned after an upgrade is validated and dry-run-ed. Running /// it carries out helm upgrade. -pub(crate) type HelmUpgradeRunner = Pin>>>; +pub(crate) type HelmUpgradeRunner = + Pin>>>>; /// A trait object of type HelmUpgrader is either CoreHelmUpgrader or an UmbrellaHelmUpgrader. /// They either deal with upgrading the Core helm chart or the Umbrella helm chart respectively. @@ -42,9 +43,6 @@ pub(crate) trait HelmUpgrader { /// Return the target helm chart version as a String. fn target_version(&self) -> String; - - /// Returns a deserialized struct with tools to gather information about the source helm values. - fn source_values(&self) -> &dyn HelmValuesCollection; } /// This is a builder for the Helm chart upgrade. @@ -183,14 +181,11 @@ impl HelmUpgraderBuilder { // Fail if the Umbrella chart isn't already upgraded. ensure!(already_upgraded, UmbrellaChartNotUpgraded); - // Deserialize umbrella chart values yaml. - let source_values = UmbrellaValues::try_from(source_values_buf.as_slice())?; - Ok(Box::new(UmbrellaHelmUpgrader { release_name, + client, source_version, target_version, - source_values, })) } else if Regex::new(core_regex.as_str())?.is_match(chart) { // Skip upgrade-path validation and allow all upgrades for the Core helm chart, if @@ -292,7 +287,9 @@ impl HelmUpgrader for CoreHelmUpgrader { is the same as that of this upgrade-job's helm chart" ); - Ok(()) + let source_values: Box = Box::new(self.source_values); + + Ok(source_values) })); } @@ -320,14 +317,20 @@ is the same as that of this upgrade-job's helm chart" info!("Starting helm upgrade..."); self.client .upgrade( - self.release_name, + self.release_name.as_str(), self.chart_dir, Some(self.helm_upgrade_extra_args), ) .await?; info!("Helm upgrade successful!"); - Ok(()) + let final_values_buf = self + .client + .get_values_as_yaml::<&str, String>(self.release_name.as_str(), None)?; + let final_values: Box = + Box::new(CoreValues::try_from(final_values_buf.as_slice())?); + + Ok(final_values) })) } @@ -338,19 +341,15 @@ is the same as that of this upgrade-job's helm chart" fn target_version(&self) -> String { self.target_version.to_string() } - - fn source_values(&self) -> &dyn HelmValuesCollection { - &self.source_values - } } /// This is a HelmUpgrader for the Umbrella chart. This gathers information, and doesn't /// set up a helm upgrade or a dry-run in any way. pub(crate) struct UmbrellaHelmUpgrader { release_name: String, + client: HelmReleaseClient, source_version: Version, target_version: Version, - source_values: UmbrellaValues, } #[async_trait] @@ -362,7 +361,13 @@ impl HelmUpgrader for UmbrellaHelmUpgrader { self.release_name.as_str() ); - Ok(()) + let final_values_buf = self + .client + .get_values_as_yaml::<&str, String>(self.release_name.as_str(), None)?; + let final_values: Box = + Box::new(UmbrellaValues::try_from(final_values_buf.as_slice())?); + + Ok(final_values) })) } @@ -373,8 +378,4 @@ impl HelmUpgrader for UmbrellaHelmUpgrader { fn target_version(&self) -> String { self.target_version.to_string() } - - fn source_values(&self) -> &dyn HelmValuesCollection { - &self.source_values - } } diff --git a/k8s/upgrade/src/bin/upgrade-job/helm/values.rs b/k8s/upgrade/src/bin/upgrade-job/helm/values.rs index 71ddac761..6e292a1cf 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm/values.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm/values.rs @@ -1,7 +1,8 @@ use crate::{ common::{ constants::{ - TWO_DOT_FIVE, TWO_DOT_FOUR, TWO_DOT_ONE, TWO_DOT_O_RC_ONE, TWO_DOT_SIX, TWO_DOT_THREE, + KUBE_API_PAGE_SIZE, TWO_DOT_FIVE, TWO_DOT_FOUR, TWO_DOT_ONE, TWO_DOT_O_RC_ONE, + TWO_DOT_SIX, TWO_DOT_THREE, }, error::{ DeserializePromtailExtraConfig, ListCrds, Result, SemverParse, @@ -489,8 +490,6 @@ fn loki_address_to_clients( /// Use pre-defined helm chart templating to disable CRD installation if they already exist. async fn safe_crd_install(upgrade_values_filepath: &Path, yq: &YqV4) -> Result<()> { - const MAX_ENTRIES: u32 = 500; - let mut crd_set_to_helm_toggle: HashMap, YamlKey> = HashMap::new(); // These 3 CRDs usually exist together. crd_set_to_helm_toggle.insert( @@ -507,8 +506,8 @@ async fn safe_crd_install(upgrade_values_filepath: &Path, yq: &YqV4) -> Result<( ); let crds_api = KubeClient::crds_api().await?; - let mut all_crd_names: Vec = Vec::with_capacity(MAX_ENTRIES as usize); - let mut list_params = ListParams::default().limit(MAX_ENTRIES); + let mut all_crd_names: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); loop { let crd_list = crds_api diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade.rs index be4903872..2417ea605 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade.rs @@ -1,11 +1,23 @@ use crate::{ - common::{constants::PRODUCT, error::Result}, + common::{ + constants::{ + CHART_VERSION_LABEL_KEY, CORE_CHART_NAME, IO_ENGINE_LABEL, + PARTIAL_REBUILD_DISABLE_EXTENTS, PRODUCT, + }, + error::{PartialRebuildNotAllowed, Result}, + kube_client as KubeClient, + }, events::event_recorder::{EventAction, EventRecorder}, helm::upgrade::{HelmUpgradeRunner, HelmUpgraderBuilder}, opts::CliArgs, }; use data_plane::upgrade_data_plane; +use k8s_openapi::api::core::v1::Pod; +use kube::ResourceExt; +use semver::Version; +use tracing::error; + /// Contains the data-plane upgrade logic. pub(crate) mod data_plane; @@ -54,9 +66,6 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() event.set_source_version(source_version.clone()); event.set_target_version(target_version.clone()); - // Capture HA state before helm upgrade is consumed. - let ha_is_enabled = helm_upgrader.source_values().ha_is_enabled(); - // Dry-run helm upgrade. let dry_run_result: Result = helm_upgrader.dry_run().await; let run_helm_upgrade = match dry_run_result { @@ -82,10 +91,13 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() .await?; // Control plane containers are updated in this step. - if let Err(error) = run_helm_upgrade.await { - event.publish_unrecoverable(&error, false).await; - return Err(error); - } + let final_values = match run_helm_upgrade.await { + Ok(values) => values, + Err(error) => { + event.publish_unrecoverable(&error, false).await; + return Err(error); + } + }; event .publish_normal( @@ -96,6 +108,22 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() // Data plane containers are updated in this step. if !opts.skip_data_plane_restart() { + let yet_to_upgrade_io_engine_label = format!( + "{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}!={}", + target_version.as_str() + ); + let yet_to_upgrade_io_engine_pods = KubeClient::list_pods( + opts.namespace(), + Some(yet_to_upgrade_io_engine_label.clone()), + None, + ) + .await?; + + partial_rebuild_check( + yet_to_upgrade_io_engine_pods.as_slice(), + final_values.partial_rebuild_is_enabled(), + )?; + event .publish_normal( format!("Upgrading {PRODUCT} data-plane"), @@ -107,7 +135,9 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() opts.namespace(), opts.rest_endpoint(), target_version, - ha_is_enabled, + final_values.ha_is_enabled(), + yet_to_upgrade_io_engine_label, + yet_to_upgrade_io_engine_pods, ) .await { @@ -132,3 +162,30 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() Ok(()) } + +fn partial_rebuild_check( + yet_to_upgrade_io_engine_pods: &[Pod], + partial_rebuild_is_enabled: bool, +) -> Result<()> { + let partial_rebuild_disable_required = yet_to_upgrade_io_engine_pods + .iter() + .filter_map(|pod| pod.labels().get(CHART_VERSION_LABEL_KEY)) + .any(|v| { + let version = + Version::parse(v).expect("failed to parse version from io-engine Pod label"); + version.ge(&PARTIAL_REBUILD_DISABLE_EXTENTS.0) + & version.le(&PARTIAL_REBUILD_DISABLE_EXTENTS.1) + }); + + if partial_rebuild_disable_required && partial_rebuild_is_enabled { + error!("Partial rebuild must be disabled for upgrades from {CORE_CHART_NAME} chart versions >= {}, <= {}", PARTIAL_REBUILD_DISABLE_EXTENTS.0, PARTIAL_REBUILD_DISABLE_EXTENTS.1); + return PartialRebuildNotAllowed { + chart_name: CORE_CHART_NAME.to_string(), + lower_extent: PARTIAL_REBUILD_DISABLE_EXTENTS.0.to_string(), + upper_extent: PARTIAL_REBUILD_DISABLE_EXTENTS.1.to_string(), + } + .fail(); + } + + Ok(()) +} diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs index dc7f9b579..2b38bbb90 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs @@ -6,15 +6,15 @@ use crate::{ }, error::{ DrainStorageNode, EmptyPodNodeName, EmptyPodSpec, EmptyStorageNodeSpec, GetStorageNode, - ListNodesWithLabel, ListPodsWithLabel, ListPodsWithLabelAndField, ListStorageNodes, - PodDelete, Result, StorageNodeUncordon, TooManyIoEnginePods, + ListNodesWithLabel, ListStorageNodes, PodDelete, Result, StorageNodeUncordon, + TooManyIoEnginePods, }, kube_client as KubeClient, rest_client::RestClientSet, }, upgrade::utils::{ - all_pods_are_ready, cordon_storage_node, data_plane_is_upgraded, list_all_volumes, - rebuild_result, uncordon_storage_node, RebuildResult, + all_pods_are_ready, cordon_storage_node, list_all_volumes, rebuild_result, + uncordon_storage_node, RebuildResult, }, }; use k8s_openapi::api::core::v1::{Node, Pod}; @@ -35,34 +35,17 @@ pub(crate) async fn upgrade_data_plane( rest_endpoint: String, upgrade_target_version: String, ha_is_enabled: bool, + yet_to_upgrade_io_engine_label: String, + yet_to_upgrade_io_engine_pods: Vec, ) -> Result<()> { - // Generate k8s clients. - let k8s_pods_api = KubeClient::pods_api(namespace.as_str()).await?; - // This makes data-plane upgrade idempotent. - let io_engine_label = format!("{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}"); - let io_engine_listparams = ListParams::default().labels(io_engine_label.as_str()); - let io_engine_pod_list = - k8s_pods_api - .list(&io_engine_listparams) - .await - .context(ListPodsWithLabel { - label: io_engine_label, - namespace: namespace.clone(), - })?; - if data_plane_is_upgraded(&upgrade_target_version, &io_engine_pod_list).await? { + if yet_to_upgrade_io_engine_pods.is_empty() { info!("Skipping data-plane upgrade: All data-plane Pods are already upgraded"); return Ok(()); } // If here, then there is a need to proceed to data-plane upgrade. - let yet_to_upgrade_io_engine_label_selector = - format!("{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}!={upgrade_target_version}"); - let io_engine_listparams = - ListParams::default().labels(yet_to_upgrade_io_engine_label_selector.as_str()); - let namespace = namespace.clone(); - // Generate storage REST API client. let rest_client = RestClientSet::new_with_url(rest_endpoint)?; @@ -84,27 +67,21 @@ pub(crate) async fn upgrade_data_plane( } loop { - let initial_io_engine_pod_list: ObjectList = k8s_pods_api - .list(&io_engine_listparams) - .await - .context(ListPodsWithLabel { - label: yet_to_upgrade_io_engine_label_selector.clone(), - namespace: namespace.clone(), - })?; + let initial_io_engine_pod_list: Vec = KubeClient::list_pods( + namespace.clone(), + Some(yet_to_upgrade_io_engine_label.clone()), + None, + ) + .await?; // Infinite loop exit. - if initial_io_engine_pod_list.items.is_empty() { + if initial_io_engine_pod_list.is_empty() { break; } for pod in initial_io_engine_pod_list.iter() { // Validate the control plane pod is up and running before we start. - verify_control_plane_is_running( - namespace.clone(), - &k8s_pods_api, - &upgrade_target_version, - ) - .await?; + verify_control_plane_is_running(namespace.clone(), &upgrade_target_version).await?; // Fetch the node name on which the io-engine pod is running let node_name = pod @@ -150,16 +127,11 @@ pub(crate) async fn upgrade_data_plane( } // restart the data plane pod - delete_data_plane_pod(node_name, pod, &k8s_pods_api).await?; + delete_data_plane_pod(node_name, pod, namespace.clone()).await?; // validate the new pod is up and running - verify_data_plane_pod_is_running( - node_name, - namespace.clone(), - &upgrade_target_version, - &k8s_pods_api, - ) - .await?; + verify_data_plane_pod_is_running(node_name, namespace.clone(), &upgrade_target_version) + .await?; // Uncordon the drained node uncordon_drained_storage_node(node_name, &rest_client).await?; @@ -221,7 +193,9 @@ async fn uncordon_drained_storage_node(node_id: &str, rest_client: &RestClientSe } /// Issue delete command on dataplane pods. -async fn delete_data_plane_pod(node_name: &str, pod: &Pod, k8s_pods_api: &Api) -> Result<()> { +async fn delete_data_plane_pod(node_name: &str, pod: &Pod, namespace: String) -> Result<()> { + let k8s_pods_api = KubeClient::pods_api(namespace.as_str()).await?; + // Deleting the io-engine pod let pod_name = pod.name_any(); info!( @@ -245,19 +219,11 @@ async fn verify_data_plane_pod_is_running( node_name: &str, namespace: String, upgrade_target_version: &String, - k8s_pods_api: &Api, ) -> Result<()> { let duration = Duration::from_secs(5_u64); // Validate the new pod is up and running info!(node.name = %node_name, "Waiting for data-plane Pod to come to Ready state"); - while !data_plane_pod_is_running( - node_name, - namespace.clone(), - upgrade_target_version, - k8s_pods_api, - ) - .await? - { + while !data_plane_pod_is_running(node_name, namespace.clone(), upgrade_target_version).await? { sleep(duration).await; } Ok(()) @@ -341,29 +307,18 @@ async fn data_plane_pod_is_running( node: &str, namespace: String, upgrade_target_version: &String, - k8s_pods_api: &Api, ) -> Result { let node_name_pod_field = format!("spec.nodeName={node}"); let pod_label = format!("{IO_ENGINE_LABEL},{CHART_VERSION_LABEL_KEY}={upgrade_target_version}"); - let io_engine_listparam = ListParams::default() - .labels(pod_label.as_str()) - .fields(node_name_pod_field.as_str()); - let pod_list: ObjectList = - k8s_pods_api - .list(&io_engine_listparam) - .await - .context(ListPodsWithLabelAndField { - label: pod_label, - field: node_name_pod_field, - namespace: namespace.clone(), - })?; + let pod_list: Vec = + KubeClient::list_pods(namespace, Some(pod_label), Some(node_name_pod_field)).await?; - if pod_list.items.is_empty() { + if pod_list.is_empty() { return Ok(false); } - if pod_list.items.len() != 1 { + if pod_list.len() != 1 { return TooManyIoEnginePods { node_name: node }.fail(); } @@ -372,12 +327,10 @@ async fn data_plane_pod_is_running( async fn verify_control_plane_is_running( namespace: String, - k8s_pods_api: &Api, upgrade_target_version: &String, ) -> Result<()> { let duration = Duration::from_secs(3_u64); - while !control_plane_is_running(namespace.clone(), k8s_pods_api, upgrade_target_version).await? - { + while !control_plane_is_running(namespace.clone(), upgrade_target_version).await? { sleep(duration).await; } @@ -387,38 +340,22 @@ async fn verify_control_plane_is_running( /// Validate if control-plane pods are running -- etcd, agent-core, api-rest. async fn control_plane_is_running( namespace: String, - k8s_pods_api: &Api, upgrade_target_version: &String, ) -> Result { let agent_core_selector_label = format!("{AGENT_CORE_LABEL},{CHART_VERSION_LABEL_KEY}={upgrade_target_version}"); - let pod_list: ObjectList = k8s_pods_api - .list(&ListParams::default().labels(agent_core_selector_label.as_str())) - .await - .context(ListPodsWithLabel { - label: AGENT_CORE_LABEL.to_string(), - namespace: namespace.clone(), - })?; + let pod_list: Vec = + KubeClient::list_pods(namespace.clone(), Some(agent_core_selector_label), None).await?; let core_is_ready = all_pods_are_ready(pod_list); let api_rest_selector_label = format!("{API_REST_LABEL},{CHART_VERSION_LABEL_KEY}={upgrade_target_version}"); - let pod_list: ObjectList = k8s_pods_api - .list(&ListParams::default().labels(api_rest_selector_label.as_str())) - .await - .context(ListPodsWithLabel { - label: API_REST_LABEL.to_string(), - namespace: namespace.clone(), - })?; + let pod_list: Vec = + KubeClient::list_pods(namespace.clone(), Some(api_rest_selector_label), None).await?; let rest_is_ready = all_pods_are_ready(pod_list); - let pod_list: ObjectList = k8s_pods_api - .list(&ListParams::default().labels(ETCD_LABEL)) - .await - .context(ListPodsWithLabel { - label: ETCD_LABEL.to_string(), - namespace: namespace.clone(), - })?; + let pod_list: Vec = + KubeClient::list_pods(namespace, Some(ETCD_LABEL.to_string()), None).await?; let etcd_is_ready = all_pods_are_ready(pod_list); Ok(core_is_ready && rest_is_ready && etcd_is_ready) diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs index 1e283da8c..eb07624d3 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade/utils.rs @@ -1,15 +1,14 @@ use crate::common::{ - constants::{CHART_VERSION_LABEL_KEY, PRODUCT}, + constants::PRODUCT, error::{ - CordonStorageNode, EmptyStorageNodeSpec, GetStorageNode, HelmChartVersionLabelHasNoValue, - ListStorageVolumes, NoNamespaceInPod, Result, SemverParse, StorageNodeUncordon, + CordonStorageNode, EmptyStorageNodeSpec, GetStorageNode, ListStorageVolumes, Result, + StorageNodeUncordon, }, rest_client::RestClientSet, }; use k8s_openapi::api::core::v1::Pod; -use kube::{api::ObjectList, ResourceExt}; +use kube::ResourceExt; use openapi::models::{CordonDrainState, Volume, VolumeStatus}; -use semver::Version; use snafu::ResultExt; use std::{collections::HashSet, time::Duration}; use tracing::{info, warn}; @@ -139,7 +138,7 @@ pub(crate) fn replica_rebuild_count(volume: &Volume) -> i32 { /// This function returns 'true' only if all of the containers in the Pods contained in the /// ObjectList have their Ready status.condition value set to true. -pub(crate) fn all_pods_are_ready(pod_list: ObjectList) -> bool { +pub(crate) fn all_pods_are_ready(pod_list: Vec) -> bool { let not_ready_warning = |pod_name: &String, namespace: &String| { warn!( "Couldn't verify the ready condition of Pod '{}' in namespace '{}' to be true", @@ -176,43 +175,6 @@ pub(crate) fn all_pods_are_ready(pod_list: ObjectList) -> bool { true } -/// Checks to see if all of io-engine Pods are already upgraded to the version of the local helm -/// chart. -pub(crate) async fn data_plane_is_upgraded( - target_version: &str, - io_engine_pod_list: &ObjectList, -) -> Result { - let target_version_requirement: Version = - Version::parse(target_version).context(SemverParse { - version_string: target_version.to_string(), - })?; - - for pod in io_engine_pod_list { - let version_str = pod.labels().get(CHART_VERSION_LABEL_KEY).ok_or( - HelmChartVersionLabelHasNoValue { - pod_name: pod.name_any(), - namespace: pod.namespace().ok_or( - NoNamespaceInPod { - pod_name: pod.name_any(), - context: "checking to see if data-plane Pods are already upgraded" - .to_string(), - } - .build(), - )?, - } - .build(), - )?; - let version = Version::parse(version_str).context(SemverParse { - version_string: version_str.clone(), - })?; - if !target_version_requirement.eq(&version) { - return Ok(false); - } - } - - Ok(true) -} - /// Cordon storage node. pub(crate) async fn cordon_storage_node( node_id: &str,