diff --git a/Cargo.lock b/Cargo.lock index 8a1107eff..0fcd91998 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.8.3" @@ -470,7 +476,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.1", "object", "rustc-demangle", ] @@ -604,6 +610,7 @@ dependencies = [ "snafu", "tempfile", "tokio", + "tokio-retry", "tracing", "tracing-subscriber", "url", @@ -1141,12 +1148,12 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -1999,6 +2006,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "0.8.11" @@ -4086,10 +4102,12 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "base64 0.21.5", "clap", "console-logger", "constants", "convert_case", + "flate2", "http", "humantime", "hyper", diff --git a/call-home/Cargo.toml b/call-home/Cargo.toml index 12cb1f5ef..12490cdd7 100644 --- a/call-home/Cargo.toml +++ b/call-home/Cargo.toml @@ -47,6 +47,7 @@ once_cell = "1.18.0" bytes = "1.5.0" utils = { path = "../dependencies/control-plane/utils/utils-lib" } events-api = { path = "../dependencies/control-plane/utils/dependencies/apis/events" } +tokio-retry = "0.3" # exporter actix-web = { version = "4.4.0", features = ["rustls-0_21"] } diff --git a/call-home/src/bin/callhome/collector/report_models.rs b/call-home/src/bin/callhome/collector/report_models.rs index 7d023a4d7..64d20cd4c 100644 --- a/call-home/src/bin/callhome/collector/report_models.rs +++ b/call-home/src/bin/callhome/collector/report_models.rs @@ -489,19 +489,29 @@ impl Percentiles { #[serde(rename_all = "camelCase")] pub(crate) struct Report { pub(crate) k8s_cluster_id: String, - pub(crate) k8s_node_count: u8, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) k8s_node_count: Option, + #[serde(skip_serializing_if = "String::is_empty")] pub(crate) product_name: String, + #[serde(skip_serializing_if = "String::is_empty")] pub(crate) product_version: String, + #[serde(skip_serializing_if = "String::is_empty")] pub(crate) deploy_namespace: String, - pub(crate) storage_node_count: u8, - pub(crate) pools: Pools, - pub(crate) volumes: Volumes, - pub(crate) replicas: Replicas, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) storage_node_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) pools: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) volumes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) replicas: Option, pub(crate) nexus: Nexus, pub(crate) versions: Versions, pub(crate) storage_nodes: StorageNodes, pub(crate) mayastor_managed_disks: MayastorManagedDisks, pub(crate) storage_media: StorageMedia, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub(crate) logs: Vec, } /// Get maximum value from a vector. diff --git a/call-home/src/bin/callhome/main.rs b/call-home/src/bin/callhome/main.rs index eed75596f..d553851d5 100644 --- a/call-home/src/bin/callhome/main.rs +++ b/call-home/src/bin/callhome/main.rs @@ -17,10 +17,20 @@ use collector::report_models::{MayastorManagedDisks, Nexus, StorageMedia, Storag use obs::common::constants::*; use openapi::tower::client::{ApiClient, Configuration}; use sha256::digest; -use std::{collections::HashMap, time}; +use std::{ + collections::{HashMap, VecDeque}, + sync::{Arc, Mutex}, + time::Duration, +}; use tokio::time::sleep; -use tracing::{error, info}; -use tracing_subscriber::EnvFilter; +use tokio_retry::{ + strategy::{jitter, ExponentialBackoff}, + Retry, +}; +use tracing::{error, info, Event, Level, Subscriber}; +use tracing_subscriber::{ + layer::Context, prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer, +}; use url::Url; use utils::{package_description, version_info_str}; @@ -49,19 +59,27 @@ impl CliArgs { } } +const ERR_LOG_BUF_CAPACITY: usize = 100; + #[tokio::main] async fn main() { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .init(); + let logs = Arc::new(Mutex::new(VecDeque::with_capacity(ERR_LOG_BUF_CAPACITY))); + let vec_layer = LogsLayer::new(logs.clone()); + + let subscriber = tracing_subscriber::Registry::default() + .with(vec_layer) + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()); - if let Err(error) = run().await { + tracing::subscriber::set_global_default(subscriber).expect("setting tracing default failed"); + + if let Err(error) = run(logs).await { error!(?error, "failed call-home"); std::process::exit(1); } } -async fn run() -> anyhow::Result<()> { +async fn run(logs: Arc>>) -> anyhow::Result<()> { let args = CliArgs::args(); let version = release_version(); let endpoint = args.endpoint; @@ -91,7 +109,7 @@ async fn run() -> anyhow::Result<()> { // Generate Mayastor REST client. let config = Configuration::builder() - .with_timeout(time::Duration::from_secs(30)) + .with_timeout(Duration::from_secs(30)) .with_tracing(true) .build_url(endpoint) .map_err(|error| anyhow::anyhow!("failed to create openapi configuration: {:?}", error))?; @@ -106,6 +124,7 @@ async fn run() -> anyhow::Result<()> { namespace.clone(), version.clone(), aggregator_url.clone(), + logs.clone(), ) .await; @@ -138,7 +157,12 @@ async fn generate_report( deploy_namespace: String, product_version: String, aggregator_url: Option, + logs: Arc>>, ) -> Report { + let retry_strategy = ExponentialBackoff::from_millis(100) + .map(jitter) // add jitter to delays + .take(5); // retry up to 5 times + let mut report = Report { product_name: product(), k8s_cluster_id, @@ -172,7 +196,7 @@ async fn generate_report( let k8s_node_count = k8s_client.get_node_len().await; match k8s_node_count { - Ok(k8s_node_count) => report.k8s_node_count = k8s_node_count as u8, + Ok(k8s_node_count) => report.k8s_node_count = Some(k8s_node_count as u8), Err(err) => { error!("{:?}", err); } @@ -180,50 +204,66 @@ async fn generate_report( // List of disks on each node. let mut node_disks = HashMap::new(); - let nodes = http_client.nodes_api().get_nodes(None).await; + let nodes = Retry::spawn(retry_strategy.clone(), || async { + http_client.nodes_api().get_nodes(None).await + }) + .await; match nodes { Ok(nodes) => { let nodes = nodes.into_body(); for node in &nodes { - if let Ok(b_devs_result) = http_client - .block_devices_api() - .get_node_block_devices(&node.id, Some(true)) - .await - { + let b_devs_result = Retry::spawn(retry_strategy.clone(), || async { + http_client + .block_devices_api() + .get_node_block_devices(&node.id, Some(true)) + .await + }) + .await; + if let Ok(b_devs) = b_devs_result { node_disks .entry(node.id.to_string()) .or_insert_with(Vec::new) - .extend(b_devs_result.into_body()); + .extend(b_devs.into_body()); }; } - report.storage_node_count = nodes.len() as u8 + report.storage_node_count = Some(nodes.len() as u8) } Err(err) => { error!("{:?}", err); } }; - let pools = http_client.pools_api().get_pools(None).await; + let pools = Retry::spawn(retry_strategy.clone(), || async { + http_client.pools_api().get_pools(None).await + }) + .await; match pools { - Ok(ref pools) => report.pools = Pools::new(pools.clone().into_body(), event_data.clone()), + Ok(ref pools) => { + report.pools = Some(Pools::new(pools.clone().into_body(), event_data.clone())) + } Err(ref err) => { error!("{:?}", err); } }; - let volumes = list_all_volumes(&http_client) - .await - .map_err(|error| error!("Failed to list all volumes: {error:?}")) - .ok(); + let volumes = Retry::spawn(retry_strategy.clone(), || async { + list_all_volumes(&http_client).await + }) + .await + .map_err(|error| error!("Failed to list all volumes: {error:?}")) + .ok(); if let Some(volumes) = &volumes { - report.volumes = Volumes::new(volumes.clone(), event_data.clone()); + report.volumes = Some(Volumes::new(volumes.clone(), event_data.clone())); } - let replicas = http_client.replicas_api().get_replicas().await; + let replicas = Retry::spawn(retry_strategy, || async { + http_client.replicas_api().get_replicas().await + }) + .await; match replicas { Ok(ref replicas) => { - report.replicas = Replicas::new(replicas.clone().into_body().len(), volumes) + report.replicas = Some(Replicas::new(replicas.clone().into_body().len(), volumes)) } Err(ref err) => { error!("{:?}", err); @@ -256,5 +296,91 @@ async fn generate_report( report.storage_media = StorageMedia::new(valid_disks); report.nexus = Nexus::new(event_data); + + let mut logs = logs.lock().unwrap(); + for log in logs.iter() { + let log_string = format!( + "[{} {} {}:{}] {}", + log.timestamp, log.level, log.file, log.line, log.message + ); + report.logs.push(log_string); + } + // Clear logs, as these entries are no longer needed after updating the report. + logs.clear(); report } + +// Define the LogsLayer +struct LogsLayer { + logs: Arc>>, +} + +impl LogsLayer { + fn new(logs: Arc>>) -> Self { + LogsLayer { logs } + } +} + +impl Layer for LogsLayer +where + S: Subscriber, +{ + fn on_event(&self, event: &Event, _ctx: Context) { + let timestamp = chrono::Utc::now().to_rfc3339(); + let level = event.metadata().level(); + let file = event.metadata().file().unwrap_or("").to_string(); + let line = event.metadata().line().unwrap_or(0); + + let mut visitor = LogVisitor::new(); + event.record(&mut visitor); + + let log_entry = LogEntry { + timestamp, + level: level.to_string(), + file, + line, + message: visitor.log, + }; + + // Only capture logs of level ERROR + if level == &Level::ERROR { + let mut logs = self.logs.lock().unwrap(); + log_with_limit(&mut logs, log_entry, ERR_LOG_BUF_CAPACITY); + } + } +} + +struct LogVisitor { + log: String, +} + +impl LogVisitor { + fn new() -> Self { + LogVisitor { log: String::new() } + } +} + +impl tracing::field::Visit for LogVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.log + .push_str(&format!("{}: {:?};", field.name(), value)); + } +} + +#[derive(Debug, serde::Serialize)] +struct LogEntry { + timestamp: String, + level: String, + file: String, + line: u32, + message: String, +} + +/// Ensures the length of a vector is no more than the input limit, and removes elements from +/// the front if it goes over the limit. +fn log_with_limit(logs: &mut VecDeque, message: T, limit: usize) { + while logs.len().ge(&limit) { + logs.pop_front(); + } + logs.push_back(message); +} diff --git a/chart/Chart.yaml b/chart/Chart.yaml index be4989726..3cc21f695 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -31,6 +31,7 @@ dependencies: - name: etcd repository: https://charts.bitnami.com/bitnami version: 8.6.0 + condition: etcd.enabled - name: jaeger-operator version: 2.50.1 repository: https://jaegertracing.github.io/helm-charts diff --git a/chart/README.md b/chart/README.md index 227c6fa20..f250e265e 100644 --- a/chart/README.md +++ b/chart/README.md @@ -70,10 +70,12 @@ This removes all the Kubernetes components associated with the chart and deletes | agents.​core.​capacity.​thin.​volumeCommitment | When creating replicas for an existing volume, each replica pool must have at least this much free space percentage of the volume size. Example: if this value is 40, the pool has 40GiB free, then the max volume size allowed to be created on the pool is 100GiB. | `"40%"` | | agents.​core.​capacity.​thin.​volumeCommitmentInitial | Same as the `volumeCommitment` argument, but applicable only when creating replicas for a new volume. | `"40%"` | | agents.​core.​logLevel | Log level for the core service | `"info"` | +| agents.​core.​minTimeouts | Enable minimal timeouts | `true` | | agents.​core.​priorityClassName | Set PriorityClass, overrides global. If both local and global are not set, the final deployment manifest has a mayastor custom critical priority class assigned to the pod by default. Refer the `templates/_helpers.tpl` and `templates/mayastor/agents/core/agent-core-deployment.yaml` for more details. | `""` | | agents.​core.​rebuild.​maxConcurrent | The maximum number of system-wide rebuilds permitted at any given time. If set to an empty string, there are no limits. | `""` | | agents.​core.​rebuild.​partial.​enabled | Partial rebuild uses a log of missed IO to rebuild replicas which have become temporarily faulted, hence a bit faster, depending on the log size. | `true` | | agents.​core.​rebuild.​partial.​waitPeriod | If a faulted replica comes back online within this time period then it will be rebuilt using the partial rebuild capability. Otherwise, the replica will be fully rebuilt. A blank value "" means internally derived value will be used. | `""` | +| agents.​core.​requestTimeout | Request timeout for core agents Default value is defined in .base.default_req_timeout | `nil` | | agents.​core.​resources.​limits.​cpu | Cpu limits for core agents | `"1000m"` | | agents.​core.​resources.​limits.​memory | Memory limits for core agents | `"128Mi"` | | agents.​core.​resources.​requests.​cpu | Cpu requests for core agents | `"500m"` | @@ -137,6 +139,8 @@ This removes all the Kubernetes components associated with the chart and deletes | etcd.​autoCompactionMode | AutoCompaction Since etcd keeps an exact history of its keyspace, this history should be periodically compacted to avoid performance degradation and eventual storage space exhaustion. Auto compaction mode. Valid values: "periodic", "revision". - 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. 5m). - 'revision' for revision number based retention. | `"revision"` | | etcd.​autoCompactionRetention | Auto compaction retention length. 0 means disable auto compaction. | `100` | | etcd.​clusterDomain | Kubernetes Cluster Domain | `"cluster.local"` | +| etcd.​enabled | Disable when using an external etcd cluster. | `true` | +| etcd.​externalUrl | Url of the external etcd cluster. Note, etcd.enable must be set to false. | `""` | | etcd.​extraEnvVars[0] | Raise alarms when backend size exceeds the given quota. |
{
"name":"ETCD_QUOTA_BACKEND_BYTES",
"value":"8589934592"
}
| | etcd.​localpvScConfig.​basePath | Host path where local etcd data is stored in. | `"/var/local/{{ .Release.Name }}/localpv-hostpath/etcd"` | | etcd.​localpvScConfig.​reclaimPolicy | ReclaimPolicy of etcd's localpv hostpath storage class. | `"Delete"` | @@ -167,6 +171,7 @@ This removes all the Kubernetes components associated with the chart and deletes | io_engine.​resources.​requests.​cpu | Cpu requests for the io-engine | `""` | | io_engine.​resources.​requests.​hugepages2Mi | Hugepage size available on the nodes | `"2Gi"` | | io_engine.​resources.​requests.​memory | Memory requests for the io-engine | `"1Gi"` | +| io_engine.​runtimeClassName | Runtime class to use. Defaults to cluster standard | `""` | | io_engine.​target.​nvmf.​iface | NVMF target interface (ip, mac, name or subnet) | `""` | | io_engine.​target.​nvmf.​ptpl | Reservations Persist Through Power Loss State | `true` | | io_engine.​tolerations | Set tolerations, overrides global | `[]` | diff --git a/chart/templates/NOTES.txt b/chart/templates/NOTES.txt index eec7098a5..66576e226 100644 --- a/chart/templates/NOTES.txt +++ b/chart/templates/NOTES.txt @@ -1,4 +1,4 @@ OpenEBS Mayastor has been installed. Check its status by running: $ kubectl get pods -n {{ .Release.Namespace }} -For more information or to view the documentation, visit our website at https://mayastor.gitbook.io/introduction/. +For more information or to view the documentation, visit our website at https://openebs.io/docs/. diff --git a/chart/templates/_helpers.tpl b/chart/templates/_helpers.tpl index 69a8fc7c4..a73c3c5bb 100644 --- a/chart/templates/_helpers.tpl +++ b/chart/templates/_helpers.tpl @@ -20,7 +20,7 @@ Usage: */}} {{- define "base_init_core_containers" -}} {{- if .Values.base.initCoreContainers.enabled }} - {{- include "render" (dict "value" .Values.base.initCoreContainers.containers "context" $) | nindent 8 }} + {{- include "render_init_containers" (dict "value" .Values.base.initCoreContainers.containers "context" $) | nindent 8 }} {{- end }} {{- end -}} @@ -31,7 +31,7 @@ Usage: */}} {{- define "base_init_ha_node_containers" -}} {{- if .Values.base.initHaNodeContainers.enabled }} - {{- include "render" (dict "value" .Values.base.initHaNodeContainers.containers "context" $) | nindent 8 }} + {{- include "render_init_containers" (dict "value" .Values.base.initHaNodeContainers.containers "context" $) | nindent 8 }} {{- end }} {{- end -}} @@ -42,7 +42,7 @@ Usage: */}} {{- define "base_init_containers" -}} {{- if .Values.base.initContainers.enabled }} - {{- include "render" (dict "value" .Values.base.initContainers.containers "context" $) | nindent 8 }} + {{- include "render_init_containers" (dict "value" .Values.base.initContainers.containers "context" $) | nindent 8 }} {{- end }} {{- include "jaeger_collector_init_container" . }} {{- end -}} @@ -56,7 +56,7 @@ Usage: {{- if .Values.base.jaeger.enabled }} {{- if .Values.base.jaeger.initContainer }} {{- if .Values.base.jaeger.collector }} - {{- include "render" (dict "value" .Values.base.jaeger.collector.initContainer "context" $) | nindent 8 }} + {{- include "render_init_containers" (dict "value" .Values.base.jaeger.collector.initContainer "context" $) | nindent 8 }} {{- else }} - name: jaeger-probe image: busybox:latest @@ -73,7 +73,7 @@ Usage: */}} {{- define "csi_node_init_containers" -}} {{- if (.Values.csi.node.initContainers).enabled }} - {{- include "render" (dict "value" .Values.csi.node.initContainers.containers "context" $) | nindent 8 }} + {{- include "render_init_containers" (dict "value" .Values.csi.node.initContainers.containers "context" $) | nindent 8 }} {{- end }} {{- end -}} @@ -107,7 +107,7 @@ Usage: */}} {{- define "rest_agent_init_container" -}} {{- if .Values.base.initRestContainer.enabled }} - {{- include "render" (dict "value" .Values.base.initRestContainer.initContainer "context" $) | nindent 8 }} + {{- include "render_init_containers" (dict "value" .Values.base.initRestContainer.initContainer "context" $) | nindent 8 }} {{- end }} {{- end -}} @@ -283,8 +283,63 @@ Get the Jaeger URL */}} {{- define "jaeger_url" -}} {{- if $collector := .Values.base.jaeger.collector }} - {{- $collector.name }}:{{ $collector.port }} + {{- $collector.name }}:{{ $collector.port }} {{- else }} - {{- print "jaeger-collector:4317" -}} + {{- print "jaeger-collector:4317" -}} {{- end }} +{{- end -}} + +{{/* + Create a normalized etcd name based on input parameters + */}} +{{- define "etcdUrl" -}} + {{- if eq (.Values.etcd.enabled) false }} + {{- if .Values.etcd.externalUrl }} + {{- .Values.etcd.externalUrl }} + {{- else }} + {{- fail "etcd.externalUrl must be set" }} + {{- end }} + {{- else }} + {{- .Release.Name }}-etcd:{{ .Values.etcd.service.port }} + {{- end }} +{{- end }} + +{{/* + Check if etcd is explicitly enabled/disabled or implicitly enabled (for upgrades where enabled key was absent) + */}} +{{- define "etcdEnabled" -}} + {{- if eq (.Values.etcd.enabled) false }} + {{- "false" -}} + {{- else if eq (.Values.etcd.enabled) true }} + {{- "true" -}} + {{- else if .Values.etcd.externalUrl }} + {{- "false" -}} + {{- else }} + {{- "true" -}} + {{- end }} +{{- end }} + +{{/* +Renders init containers. If unset it sets the container image. +*/}} +{{- define "render_init_containers" -}} + {{- $containers := list }} + {{- $image := .context.Values.base.initContainers.image }} + {{- $values_image := .context.Values.image }} + {{- range .value -}} + {{ $container := . }} + {{- if not (hasKey . "imagePullPolicy") }} + {{- $pullPolicy := $image.pullPolicy | default $values_image.pullPolicy }} + {{- $_ := set $container "imagePullPolicy" $pullPolicy }} + {{- end }} + {{- if or (not $image) (not (hasKey . "image")) }} + {{- $registry := $image.registry | default $values_image.registry | default "docker.io" }} + {{- $namespace := $image.namespace | default $values_image.repo }} + {{- $name := $image.name | default "alpine-sh" }} + {{- $tag := $image.tag | default "4.1.0" }} + {{- $_ := set $container "image" (printf "%s/%s/%s:%s" $registry $namespace $name $tag) }} + {{- end }} + {{- $containers = append $containers $container }} + {{- end -}} + {{- tpl ($containers | toYaml) .context }} {{- end -}} \ No newline at end of file diff --git a/chart/templates/etcd/storage/localpv-storageclass.yaml b/chart/templates/etcd/storage/localpv-storageclass.yaml index b145995e5..f0cb431cb 100644 --- a/chart/templates/etcd/storage/localpv-storageclass.yaml +++ b/chart/templates/etcd/storage/localpv-storageclass.yaml @@ -1,4 +1,4 @@ -{{ if and .Values.etcd.localpvScConfig.enabled .Values.etcd.persistence.enabled }} +{{ if and (eq (include "etcdEnabled" .) "true") (and .Values.etcd.localpvScConfig.enabled .Values.etcd.persistence.enabled) }} apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: diff --git a/chart/templates/etcd/storage/localpv.yaml b/chart/templates/etcd/storage/localpv.yaml index f690dda9c..27b562c55 100644 --- a/chart/templates/etcd/storage/localpv.yaml +++ b/chart/templates/etcd/storage/localpv.yaml @@ -1,5 +1,5 @@ --- -{{ if and .Values.etcd.persistence.enabled (eq .Values.etcd.persistence.storageClass "manual") }} +{{ if and (eq (include "etcdEnabled" .) "true") (and .Values.etcd.persistence.enabled (eq .Values.etcd.persistence.storageClass "manual")) }} {{- range $index, $end := until (.Values.etcd.replicaCount | int) }} apiVersion: v1 kind: PersistentVolume diff --git a/chart/templates/mayastor/agents/core/agent-core-deployment.yaml b/chart/templates/mayastor/agents/core/agent-core-deployment.yaml index 490270ef5..648d99903 100644 --- a/chart/templates/mayastor/agents/core/agent-core-deployment.yaml +++ b/chart/templates/mayastor/agents/core/agent-core-deployment.yaml @@ -46,8 +46,11 @@ spec: image: "{{ .Values.image.registry }}/{{ .Values.image.repo }}/{{ include "image_prefix" . }}-agent-core:{{ default .Values.image.tag .Values.image.repoTags.controlPlane }}" imagePullPolicy: {{ .Values.image.pullPolicy }} args: - - "-s{{ .Release.Name }}-etcd:{{ .Values.etcd.service.port }}" - - "--request-timeout={{ .Values.base.default_req_timeout }}" + - "--store={{ include "etcdUrl" . }}" + - "--request-timeout={{ default .Values.base.default_req_timeout .Values.agents.core.requestTimeout }}" + {{- if not .Values.agents.core.minTimeouts }} + - "--no-min-timeouts" + {{- end }} - "--cache-period={{ .Values.base.cache_poll_period }}"{{ if .Values.base.jaeger.enabled }} - "--jaeger={{ include "jaeger_url" . }}"{{ end }} - "--grpc-server-addr=0.0.0.0:50051" @@ -95,7 +98,7 @@ spec: imagePullPolicy: {{ .Values.image.pullPolicy }} args: - "-g=0.0.0.0:50052" - - "--store=http://{{ .Release.Name }}-etcd:{{ .Values.etcd.service.port }}" + - "--store=http://{{ include "etcdUrl" . }}" - "--core-grpc=https://{{ .Release.Name }}-agent-core:50051"{{ if .Values.base.jaeger.enabled }} - "--jaeger={{ include "jaeger_url" . }}"{{ end }}{{ if .Values.eventing.enabled }} - "--events-url=nats://{{ .Release.Name }}-nats:4222"{{ end }} diff --git a/chart/templates/mayastor/io/io-engine-daemonset.yaml b/chart/templates/mayastor/io/io-engine-daemonset.yaml index e7e566f4f..56cddab48 100644 --- a/chart/templates/mayastor/io/io-engine-daemonset.yaml +++ b/chart/templates/mayastor/io/io-engine-daemonset.yaml @@ -31,6 +31,9 @@ spec: {{- if $pcName := include "priority_class" (dict "template" . "localPriorityClass" .Values.io_engine.priorityClassName) }} priorityClassName: {{ $pcName }} {{- end }} + {{- if .Values.runtimeClassName }} + runetimeClassName: {{ .Values.runtimeClassName | quote }} + {{- end }} {{- if $tolerations := include "tolerations" (dict "template" . "localTolerations" .Values.io_engine.tolerations) }} tolerations: {{ $tolerations }} {{- end }} @@ -100,7 +103,7 @@ spec: - "-Rhttps://{{ .Release.Name }}-agent-core:50051" - "-y/var/local/{{ .Release.Name }}/io-engine/config.yaml" - "-l{{ include "cpuFlag" . }}" - - "-p={{ .Release.Name }}-etcd:{{ .Values.etcd.service.port }}"{{ if .Values.io_engine.target.nvmf.ptpl }} + - "-p={{ include "etcdUrl" . }}"{{ if .Values.io_engine.target.nvmf.ptpl }} - "--ptpl-dir=/var/local/{{ .Release.Name }}/io-engine/ptpl/"{{ end }} - "--api-versions={{ .Values.io_engine.api }}"{{ if .Values.io_engine.target.nvmf.iface }} - "-T={{ .Values.io_engine.target.nvmf.iface }}"{{ end }}{{ if .Values.io_engine.envcontext }} diff --git a/chart/values.yaml b/chart/values.yaml index a9cb380e6..a03269a86 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -71,24 +71,24 @@ base: silenceLevel: initContainers: enabled: true + image: + name: alpine-sh + tag: 4.1.0 + pullPolicy: IfNotPresent containers: - name: agent-core-grpc-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 {{ .Release.Name }}-agent-core 50051; do date; echo "Waiting for agent-core-grpc services..."; sleep 1; done;'] - name: etcd-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 {{ .Release.Name }}-etcd {{.Values.etcd.service.port}}; do date; echo "Waiting for etcd..."; sleep 1; done;'] initHaNodeContainers: enabled: true containers: - name: agent-cluster-grpc-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 {{ .Release.Name }}-agent-core 50052; do date; echo "Waiting for agent-cluster-grpc services..."; sleep 1; done;'] initCoreContainers: enabled: true containers: - name: etcd-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 {{ .Release.Name }}-etcd {{.Values.etcd.service.port}}; do date; echo "Waiting for etcd..."; sleep 1; done;'] metrics: @@ -106,20 +106,17 @@ base: port: 6831 initContainer: - name: jaeger-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 -u {{.Values.base.jaeger.agent.name}} {{.Values.base.jaeger.agent.port}}; do date; echo "Waiting for jaeger..."; sleep 1; done;'] collector: name: jaeger-collector port: 4317 initContainer: - name: jaeger-probe - image: busybox:latest command: [ 'sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 -u {{.Values.base.jaeger.collector.name}} {{.Values.base.jaeger.collector.port}}; do date; echo "Waiting for jaeger..."; sleep 1; done;' ] initRestContainer: enabled: true initContainer: - name: api-rest-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until nc -vzw 5 {{ .Release.Name }}-api-rest 8081; do date; echo "Waiting for REST API endpoint to become available"; sleep 1; done;'] operators: @@ -160,6 +157,11 @@ jaeger-operator: agents: core: + # -- Request timeout for core agents + # Default value is defined in .base.default_req_timeout + requestTimeout: + # -- Enable minimal timeouts + minTimeouts: true # -- Log level for the core service logLevel: info capacity: @@ -367,7 +369,6 @@ csi: enabled: false containers: - name: nvme-tcp-probe - image: busybox:latest command: ['sh', '-c', 'trap "exit 1" TERM; until $(lsmod | grep nvme_tcp &>/dev/null); do [ -z "$WARNED" ] && echo "nvme_tcp module not loaded..."; WARNED=1; sleep 60; done;'] io_engine: @@ -401,7 +402,6 @@ io_engine: # Max qpairs per controller. maxQpairsPerCtrl: "32" - # -- Pass additional arguments to the Environment Abstraction Layer. # Example: --set {product}.envcontext=iova-mode=pa envcontext: "" @@ -437,8 +437,14 @@ io_engine: tolerations: [] # -- Set PriorityClass, overrides global priorityClassName: "" + # -- Runtime class to use. Defaults to cluster standard + runtimeClassName: "" etcd: + # -- Disable when using an external etcd cluster. + enabled: true + # -- (string) Url of the external etcd cluster. Note, etcd.enable must be set to false. + externalUrl: "" # Configuration for etcd's localpv hostpath storage class. localpvScConfig: enabled: true @@ -508,9 +514,13 @@ etcd: volumePermissions: # chown the mounted volume; this is required if a statically provisioned hostpath volume is used enabled: true + image: + registry: docker.io + repository: openebs/alpine-bash + tag: 4.1.0 + pullSecrets: [] # extra debug information on logs debug: false - initialClusterState: "new" # -- Pod anti-affinity preset # Ref: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity @@ -578,8 +588,8 @@ loki-stack: runAsUser: 1001 # initContainers to chown the static hostpath PV by 1001 user initContainers: - - command: ["/bin/bash", "-ec", "chown -R 1001:1001 /data"] - image: docker.io/bitnami/bitnami-shell:10 + - command: ["/bin/sh", "-ec", "chown -R 1001:1001 /data"] + image: docker.io/openebs/alpine-sh:4.1.0 imagePullPolicy: IfNotPresent name: volume-permissions securityContext: diff --git a/constants/src/lib.rs b/constants/src/lib.rs index b2c3898e6..d03e3e918 100644 --- a/constants/src/lib.rs +++ b/constants/src/lib.rs @@ -27,8 +27,7 @@ pub fn upgrade_event_reason() -> String { /// Upgrade job container image repository. pub const UPGRADE_JOB_IMAGE_REPO: &str = "openebs"; /// This is the user docs URL for the Umbrella chart. -pub const UMBRELLA_CHART_UPGRADE_DOCS_URL: &str = - "https://openebs.io/docs/user-guides/upgrade#mayastor-upgrade"; +pub const UMBRELLA_CHART_UPGRADE_DOCS_URL: &str = "https://openebs.io/docs/user-guides/upgrade"; /// Defines the default helm chart release name. pub const DEFAULT_RELEASE_NAME: &str = PRODUCT_NAME; @@ -49,3 +48,6 @@ pub const UMBRELLA_CHART_NAME: &str = "openebs"; /// RECEIVER_API_ENDPOINT is the URL to anonymous call-home metrics collection endpoint. pub const CALL_HOME_ENDPOINT: &str = "https://openebs.phonehome.datacore.com/openebs/report"; + +/// Label key containing controller revision hash for a controller resource for DaemonSets. +pub const DS_CONTROLLER_REVISION_HASH_LABEL_KEY: &str = "controller-revision-hash"; diff --git a/dependencies/control-plane b/dependencies/control-plane index 246b86977..b928b51ef 160000 --- a/dependencies/control-plane +++ b/dependencies/control-plane @@ -1 +1 @@ -Subproject commit 246b8697753194ef3a69373866eb44c558210b44 +Subproject commit b928b51ef616672dec18c5bf0f1431e669193677 diff --git a/k8s/upgrade/Cargo.toml b/k8s/upgrade/Cargo.toml index c8b38d50c..2c780e341 100644 --- a/k8s/upgrade/Cargo.toml +++ b/k8s/upgrade/Cargo.toml @@ -40,3 +40,5 @@ kube-client = "0.85.0" tempfile = "3.8.0" # Tracing tracing = "0.1.37" +base64 = "0.21.5" +flate2 = "1.0.27" diff --git a/k8s/upgrade/src/bin/upgrade-job/common.rs b/k8s/upgrade/src/bin/upgrade-job/common.rs index 0ff21dadb..01df88a9e 100644 --- a/k8s/upgrade/src/bin/upgrade-job/common.rs +++ b/k8s/upgrade/src/bin/upgrade-job/common.rs @@ -4,8 +4,8 @@ pub(crate) mod constants; /// Contains the error handling tooling. pub(crate) mod error; -/// Contains tools to create Kubernetes API clients. -pub(crate) mod kube_client; +/// Contains tools to work with Kubernetes APIs. +pub(crate) mod kube; /// Contains macros. pub(crate) mod macros; diff --git a/k8s/upgrade/src/bin/upgrade-job/common/constants.rs b/k8s/upgrade/src/bin/upgrade-job/common/constants.rs index 6dee748a1..a4c87a022 100644 --- a/k8s/upgrade/src/bin/upgrade-job/common/constants.rs +++ b/k8s/upgrade/src/bin/upgrade-job/common/constants.rs @@ -16,9 +16,6 @@ pub(crate) const IO_ENGINE_LABEL: &str = "app=io-engine"; /// This is the shared Pod label of the -agent-core Deployment. pub(crate) const AGENT_CORE_LABEL: &str = "app=agent-core"; -/// This is the shared label across the helm chart components which carries the chart version. -pub(crate) use constants::helm_release_version_key; - /// This is the label set on a storage API Node resource when a 'Node Drain' is issued. pub fn drain_for_upgrade() -> String { format!("{CORE_CHART_NAME}-upgrade") @@ -29,9 +26,6 @@ pub fn cordon_ana_check() -> String { format!("{CORE_CHART_NAME}-upgrade-nvme-ana-check") } -/// This is the allowed upgrade to-version/to-version-range for the Umbrella chart. -pub(crate) const TO_UMBRELLA_SEMVER: &str = "4.1.1"; - /// This is the user docs URL for the Umbrella chart. pub(crate) const UMBRELLA_CHART_UPGRADE_DOCS_URL: &str = constants::UMBRELLA_CHART_UPGRADE_DOCS_URL; diff --git a/k8s/upgrade/src/bin/upgrade-job/common/error.rs b/k8s/upgrade/src/bin/upgrade-job/common/error.rs index 091066d63..072252df4 100644 --- a/k8s/upgrade/src/bin/upgrade-job/common/error.rs +++ b/k8s/upgrade/src/bin/upgrade-job/common/error.rs @@ -1,7 +1,6 @@ use crate::{ common::constants::{ - helm_release_version_key, product_train, CORE_CHART_NAME, TO_UMBRELLA_SEMVER, - UMBRELLA_CHART_NAME, UMBRELLA_CHART_UPGRADE_DOCS_URL, + product_train, CORE_CHART_NAME, UMBRELLA_CHART_NAME, UMBRELLA_CHART_UPGRADE_DOCS_URL, }, events::event_recorder::EventNote, helm::chart::PromtailConfigClient, @@ -20,10 +19,8 @@ use url::Url; pub(crate) enum Error { /// Error for when the storage REST API URL is parsed. #[snafu(display( - "Failed to parse {} REST API URL {}: {}", + "Failed to parse {} REST API URL {rest_endpoint}: {source}", product_train(), - rest_endpoint, - source ))] RestUrlParse { source: url::ParseError, @@ -31,11 +28,11 @@ pub(crate) enum Error { }, /// Error for when Kubernetes API client generation fails. - #[snafu(display("Failed to generate kubernetes client: {}", source))] + #[snafu(display("Failed to generate kubernetes client: {source}"))] K8sClientGeneration { source: kube_client::Error }, /// Error for a Kubernetes API GET request for a namespace resource fails. - #[snafu(display("Failed to GET Kubernetes namespace {}: {}", namespace, source))] + #[snafu(display("Failed to GET Kubernetes namespace '{namespace}': {source}"))] GetNamespace { source: kube::Error, namespace: String, @@ -43,10 +40,8 @@ pub(crate) enum Error { /// Error for when REST API configuration fails. #[snafu(display( - "Failed to configure {} REST API client with endpoint {}: {:?}", + "Failed to configure {} REST API client with endpoint '{rest_endpoint}': {source:?}", product_train(), - rest_endpoint, - source, ))] RestClientConfiguration { #[snafu(source(false))] @@ -56,10 +51,7 @@ pub(crate) enum Error { /// Error for when a Helm command fails. #[snafu(display( - "Failed to run Helm command,\ncommand: {},\nargs: {:?},\ncommand_error: {}", - command, - args, - source + "Failed to run Helm command,\ncommand: {command},\nargs: {args:?},\ncommand_error: {source}", ))] HelmCommand { source: std::io::Error, @@ -68,34 +60,64 @@ pub(crate) enum Error { }, /// Error for when regular expression parsing or compilation fails. - #[snafu(display("Failed to compile regex {}: {}", expression, source))] + #[snafu(display("Failed to compile regex {expression}: {source}"))] RegexCompile { source: regex::Error, expression: String, }, /// Error for when Helm v3.x.y is not present in $PATH. - #[snafu(display("Helm version {} does not start with 'v3.x.y'", version))] + #[snafu(display("Helm version {version} does not start with 'v3.x.y'"))] HelmVersion { version: String }, /// Error for when input Helm release is not found in the input namespace. + #[snafu(display("'deployed' Helm release {name} not found in Namespace '{namespace}'"))] + HelmRelease { name: String, namespace: String }, + + /// Error for when no value for helm storage driver is set. + #[snafu(display("No helm storage driver specified"))] + NoHelmStorageDriver, + + /// Error for when there's too few or too many helm secrets for a release in a namespace. #[snafu(display( - "'deployed' Helm release {} not found in Namespace {}", - name, - namespace + "'{count}' is an invalid no. of helm Secrets for release '{release_name}' in namespace '{namespace}'" ))] - HelmRelease { name: String, namespace: String }, + InvalidNoOfHelmSecrets { + release_name: String, + namespace: String, + count: usize, + }, + + /// Error for when there's too few or too many helm configmaps for a release in a namespace. + #[snafu(display( + "'{count}' is an invalid no. of helm ConfigMaps for release '{release_name}' in namespace '{namespace}'" + ))] + InvalidNoOfHelmConfigMaps { + release_name: String, + namespace: String, + count: usize, + }, + + /// Error for when there's no data in helm storage driver. + #[snafu(display("No data in helm {driver}"))] + HelmStorageNoData { driver: &'static str }, + + /// Error for when there's no value for the release key in helm storage data. + #[snafu(display("No value mapped to the 'release' key in helm {driver}"))] + HelmStorageNoReleaseValue { driver: &'static str }, + + /// Error for when the helm storage driver is not supported. + #[snafu(display("'{driver}' is not a supported helm storage driver"))] + UnsupportedStorageDriver { driver: String }, /// Error for when there is a lack of valid input for the Helm chart directory for the chart to /// be upgraded to. - #[snafu(display("No input for {} helm chart's directory path", chart_name))] + #[snafu(display("No input for {chart_name} helm chart's directory path"))] NoInputHelmChartDir { chart_name: String }, /// Error for when the input Pod's owner does not exists. #[snafu(display( - ".metadata.ownerReferences empty for Pod {} in {} namespace, while trying to find Pod's Job owner", - pod_name, - pod_namespace + ".metadata.ownerReferences empty for Pod '{pod_name}' in '{pod_namespace}' namespace, while trying to find Pod's Job owner", ))] JobPodOwnerNotFound { pod_name: String, @@ -104,9 +126,7 @@ pub(crate) enum Error { /// Error for when the number of ownerReferences for this Pod is more than 1. #[snafu(display( - "Pod {} in {} namespace has too many owners, while trying to find Pod's Job owner", - pod_name, - pod_namespace + "Pod '{pod_name}' in '{pod_namespace}' namespace has too many owners, while trying to find Pod's Job owner", ))] JobPodHasTooManyOwners { pod_name: String, @@ -115,9 +135,7 @@ pub(crate) enum Error { /// Error for when the owner of this Pod is not a Job. #[snafu(display( - "Pod {} in {} namespace has an owner which is not a Job, while trying to find Pod's Job owner", - pod_name, - pod_namespace + "Pod '{pod_name}' in '{pod_namespace}' namespace has an owner which is not a Job, while trying to find Pod's Job owner", ))] JobPodOwnerIsNotJob { pod_name: String, @@ -125,30 +143,27 @@ pub(crate) enum Error { }, /// Error for when yaml could not be parsed from a slice. - #[snafu(display("Failed to parse YAML {}: {}", input_yaml, source))] + #[snafu(display("Failed to parse YAML {input_yaml}: {source}"))] YamlParseFromSlice { source: serde_yaml::Error, input_yaml: String, }, /// Error for when yaml could not be parsed from a file (Reader). - #[snafu(display("Failed to parse YAML at {}: {}", filepath.display(), source))] + #[snafu(display("Failed to parse YAML at {}: {source}", filepath.display()))] YamlParseFromFile { source: serde_yaml::Error, filepath: PathBuf, }, /// Error for when yaml could not be parsed from bytes. - #[snafu(display("Failed to parse unsupported versions yaml: {}", source))] + #[snafu(display("Failed to parse unsupported versions yaml: {source}"))] YamlParseBufferForUnsupportedVersion { source: serde_yaml::Error }, /// Error for when the Helm chart installed in the cluster is not of the umbrella or core /// variant. #[snafu(display( - "Helm chart release {} in Namespace {} has an unsupported chart variant: {}", - release_name, - namespace, - chart_name + "Helm chart release {release_name} in Namespace '{namespace}' has an unsupported chart variant: {chart_name}", ))] DetermineChartVariant { release_name: String, @@ -157,14 +172,14 @@ pub(crate) enum Error { }, /// Error for when the path to a directory cannot be validated. - #[snafu(display("Failed to validate directory path {}: {}", path.display(), source))] + #[snafu(display("Failed to validate directory path {}: {source}", path.display()))] ValidateDirPath { source: std::io::Error, path: PathBuf, }, /// Error for when the path to a file cannot be validated. - #[snafu(display("Failed to validate filepath {}: {}", path.display(), source))] + #[snafu(display("Failed to validate filepath {}: {source}", path.display()))] ValidateFilePath { source: std::io::Error, path: PathBuf, @@ -179,7 +194,7 @@ pub(crate) enum Error { NotAFile { path: PathBuf }, /// Error when reading a file. - #[snafu(display("Failed to read from file {}: {}", filepath.display(), source))] + #[snafu(display("Failed to read from file {}: {source}", filepath.display()))] ReadingFile { source: std::io::Error, filepath: PathBuf, @@ -191,10 +206,7 @@ pub(crate) enum Error { /// Error for when a Kubernetes API request for GET-ing a Pod fails. #[snafu(display( - "Failed to GET Kubernetes Pod {} in namespace {}: {}", - pod_name, - pod_namespace, - source + "Failed to GET Kubernetes Pod '{pod_name}' in namespace '{pod_namespace}': {source}", ))] GetPod { source: kube::Error, @@ -202,19 +214,10 @@ pub(crate) enum Error { pod_namespace: String, }, - /// Error for when a Kubernetes API request for GET-ing a list of Nodes filtered by label(s) - /// fails. - #[snafu(display("Failed to list Nodes with label {}: {}", label, source))] - ListNodesWithLabel { source: kube::Error, label: String }, - /// Error for when a Kubernetes API request for GET-ing a list of Pods filtered by label(s) /// and field(s) fails. #[snafu(display( - "Failed to list Pods with label '{}', and field '{}' in namespace {}: {}", - label, - field, - namespace, - source + "Failed to list Pods with label '{label}', and field '{field}' in namespace '{namespace}': {source}", ))] ListPodsWithLabelAndField { source: kube::Error, @@ -223,35 +226,71 @@ pub(crate) enum Error { namespace: String, }, + /// Error for when listing Kubernetes Secrets from a the kubeapi fails. + #[snafu(display( + "Failed to list Secrets with label '{label}', and field '{field}' in namespace '{namespace}': {source}", + ))] + ListSecretsWithLabelAndField { + source: kube::Error, + label: String, + field: String, + namespace: String, + }, + + #[snafu(display( + "Failed to list ConfigMaps with label '{label}', and field '{field}' in namespace '{namespace}': {source}", + ))] + ListConfigMapsWithLabelAndField { + source: kube::Error, + label: String, + field: String, + namespace: String, + }, + + /// Error for when a Kubernetes API request for GET-ing a list of ControllerRevisions + /// filtered by label(s) and field(s) fails. + #[snafu(display( + "Failed to list ControllerRevisions with label '{label}', and field '{field}' in Namespace '{namespace}': {source}", + ))] + ListCtrlRevsWithLabelAndField { + source: kube::Error, + label: String, + field: String, + namespace: String, + }, + + /// Error for when a Kubernetes API request for GET-ing a list of Nodes filtered by label(s) + /// and field(s) fails. + #[snafu(display( + "Failed to list Kubernetes Nodes with label '{label}', and field '{field}': {source}", + ))] + ListNodesWithLabelAndField { + source: kube::Error, + label: String, + field: String, + }, + /// Error for when a Pod does not have a PodSpec struct member. - #[snafu(display("Failed get .spec from Pod {} in Namespace {}", name, namespace))] + #[snafu(display("Failed get .spec from Pod {name} in Namespace '{namespace}'"))] EmptyPodSpec { name: String, namespace: String }, /// Error for when the spec.nodeName of a Pod is empty. - #[snafu(display( - "Failed get .spec.nodeName from Pod {} in Namespace {}", - name, - namespace - ))] + #[snafu(display("Failed get .spec.nodeName from Pod {name} in Namespace '{namespace}'",))] EmptyPodNodeName { name: String, namespace: String }, /// Error for when the metadata.uid of a Pod is empty. - #[snafu(display( - "Failed to get .metadata.uid from Pod {} in Namespace {}", - name, - namespace - ))] + #[snafu(display("Failed to get .metadata.uid from Pod {name} in Namespace '{namespace}'",))] EmptyPodUid { name: String, namespace: String }, /// Error for when an uncordon request for a storage node fails. - #[snafu(display("Failed to uncordon {} Node {}: {}", product_train(), node_id, source))] + #[snafu(display("Failed to uncordon {} Node {node_id}: {source}", product_train()))] StorageNodeUncordon { source: openapi::tower::client::Error, node_id: String, }, /// Error for when an Pod-delete Kubernetes API request fails. - #[snafu(display("Failed get delete Pod {} from Node {}: {}", name, node, source))] + #[snafu(display("Failed get delete Pod {name} from Node {node}: {source}"))] PodDelete { source: kube::Error, name: String, @@ -259,60 +298,64 @@ pub(crate) enum Error { }, /// Error for when listing storage nodes fails. - #[snafu(display("Failed to list {} Nodes: {}", product_train(), source))] + #[snafu(display("Failed to list {} Nodes: {source}", product_train()))] ListStorageNodes { source: openapi::tower::client::Error, }, /// Error for when GET-ing a storage node fails. - #[snafu(display("Failed to list {} Node {}: {}", product_train(), node_id, source))] + #[snafu(display("Failed to list {} Node {node_id}: {source}", product_train()))] GetStorageNode { source: openapi::tower::client::Error, node_id: String, }, /// Error for when the storage node's Spec is empty. - #[snafu(display("Failed to get {} Node {}", product_train(), node_id))] + #[snafu(display("Failed to get {} Node {node_id}", product_train()))] EmptyStorageNodeSpec { node_id: String }, /// Error for when a GET request for a list of storage volumes fails. - #[snafu(display("Failed to list {} Volumes: {}", product_train(), source))] + #[snafu(display("Failed to list {} Volumes: {source}", product_train()))] ListStorageVolumes { source: openapi::tower::client::Error, }, /// Error for when a storage node drain request fails. - #[snafu(display("Failed to drain {} Node {}: {}", product_train(), node_id, source))] + #[snafu(display("Failed to drain {} Node {node_id}: {source}", product_train()))] DrainStorageNode { source: openapi::tower::client::Error, node_id: String, }, /// Error for when a storage node cordon request fails. - #[snafu(display("Failed to cordon {} Node {}: {}", product_train(), node_id, source))] + #[snafu(display("Failed to cordon {} Node {node_id}: {source}", product_train()))] CordonStorageNode { source: openapi::tower::client::Error, node_id: String, }, /// Error for when the requested YAML key is invalid. - #[snafu(display("Failed to parse YAML path {}", yaml_path))] + #[snafu(display("Failed to parse YAML path {yaml_path}"))] YamlStructure { yaml_path: String }, /// Error for use when converting Vec<> to String. - #[snafu(display("Failed to convert Vec to UTF-8 formatted String: {}", source))] + #[snafu(display("Failed to convert Vec to UTF-8 formatted String: {source}"))] U8VectorToString { source: std::str::Utf8Error }, /// Error when publishing kube-events for the Job object. - #[snafu(display("Failed to publish Event: {}", source))] + #[snafu(display("Failed to publish Event: {source}"))] EventPublish { source: kube_client::Error }, + /// Error for when the 'chart' member of a crate::helm::client::HelmReleaseElement cannot be + /// split at the first occurrence of '-', e.g. -2.1.0-rc8. + #[snafu(display( + "Failed to split helm chart name '{chart_name}', at the first occurrence of '{delimiter}'", + ))] + HelmChartNameSplit { chart_name: String, delimiter: char }, + /// Error for when a Helm list command execution succeeds, but with an error. #[snafu(display( - "`helm list` command return an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`helm list` command return an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] HelmListCommand { command: String, @@ -322,10 +365,7 @@ pub(crate) enum Error { /// Error for when a Helm version command execution succeeds, but with an error. #[snafu(display( - "`helm version` command return an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`helm version` command return an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] HelmVersionCommand { command: String, @@ -335,10 +375,7 @@ pub(crate) enum Error { /// Error for when a Helm upgrade command execution succeeds, but with an error. #[snafu(display( - "`helm upgrade` command return an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`helm upgrade` command return an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] HelmUpgradeCommand { command: String, @@ -348,10 +385,7 @@ pub(crate) enum Error { /// Error for when a Helm get values command execution succeeds, but with an error. #[snafu(display( - "`helm get values` command return an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`helm get values` command return an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] HelmGetValuesCommand { command: String, @@ -361,12 +395,8 @@ pub(crate) enum Error { /// Error for when detected helm chart name is not known helm chart. #[snafu(display( - "'{}' is not a known {} helm chart, only helm charts '{}-' and '{}-' \ - are supported", - chart_name, + "'{chart_name}' is not a known {} helm chart, only helm charts '{CORE_CHART_NAME}-' and '{UMBRELLA_CHART_NAME}-' are supported", product_train(), - CORE_CHART_NAME, - UMBRELLA_CHART_NAME ))] NotAKnownHelmChart { chart_name: String }, @@ -393,7 +423,7 @@ pub(crate) enum Error { HelmUpgradeOptionNamespaceAbsent, /// Error for failures in generating semver::Value from a &str input. - #[snafu(display("Failed to parse {} as a valid semver: {}", version_string, source))] + #[snafu(display("Failed to parse {version_string} as a valid semver: {source}"))] SemverParse { source: semver::Error, version_string: String, @@ -404,7 +434,7 @@ pub(crate) enum Error { InvalidUpgradePath, /// Error in serializing crate::event::event_recorder::EventNote to JSON string. - #[snafu(display("Failed to serialize event note {:?}: {}", note, source))] + #[snafu(display("Failed to serialize event note {note:?}: {source}"))] SerializeEventNote { source: serde_json::Error, note: EventNote, @@ -412,9 +442,7 @@ pub(crate) enum Error { /// Error in serializing a helm::chart::PromtailConfigClient to a JSON string. #[snafu(display( - "Failed to serialize .loki-stack.promtail.config.client {:?}: {}", - object, - source + "Failed to serialize .loki-stack.promtail.config.client {object:?}: {source}", ))] SerializePromtailConfigClientToJson { source: serde_json::Error, @@ -423,9 +451,7 @@ pub(crate) enum Error { /// Error in serializing a k8s_openapi::api::core::v1::Container to a JSON string. #[snafu(display( - "Failed to serialize .loki-stack.promtail.initContainer {:?}: {}", - object, - source + "Failed to serialize .loki-stack.promtail.initContainer {object:?}: {source}", ))] SerializePromtailInitContainerToJson { source: serde_json::Error, @@ -435,9 +461,7 @@ pub(crate) enum Error { /// Error in deserializing a promtail helm chart's deprecated extraClientConfig to a /// serde_json::Value. #[snafu(display( - "Failed to deserialize .loki-stack.promtail.config.snippets.extraClientConfig to a serde_json::Value {}: {}", - config, - source + "Failed to deserialize .loki-stack.promtail.config.snippets.extraClientConfig to a serde_json::Value {config}: {source}", ))] DeserializePromtailExtraConfig { source: serde_yaml::Error, @@ -446,7 +470,7 @@ pub(crate) enum Error { /// Error in serializing a promtail helm chart's deprecated extraClientConfig, in a /// serde_json::Value, to JSON. - #[snafu(display("Failed to serialize to JSON {:?}: {}", config, source))] + #[snafu(display("Failed to serialize to JSON {config:?}: {source}"))] SerializePromtailExtraConfigToJson { source: serde_json::Error, config: serde_json::Value, @@ -454,18 +478,14 @@ pub(crate) enum Error { /// Error in serializing the deprecated config.snippets.extraClientConfig from the promtail /// helm chart v3.11.0. - #[snafu(display( - "Failed to serialize object to a serde_json::Value {}: {}", - object, - source - ))] + #[snafu(display("Failed to serialize object to a serde_json::Value {object}: {source}",))] SerializePromtailExtraClientConfigToJson { source: serde_json::Error, object: String, }, /// Error for when there are too many io-engine Pods in one single node; - #[snafu(display("Too many io-engine Pods in Node '{}'", node_name))] + #[snafu(display("Too many io-engine Pods in Node '{node_name}'"))] TooManyIoEnginePods { node_name: String }, /// Error for when the thin-provisioning options are absent, but still tried to fetch it. @@ -479,72 +499,23 @@ pub(crate) enum Error { /// Error for the Umbrella chart is not upgraded. #[snafu(display( - "The {} helm chart is not upgraded to version {}: Upgrade for helm chart {} is not \ - supported, refer to the instructions at {} to upgrade your release of the {} helm \ - chart to version {}", - UMBRELLA_CHART_NAME, - TO_UMBRELLA_SEMVER, - UMBRELLA_CHART_NAME, - UMBRELLA_CHART_UPGRADE_DOCS_URL, - UMBRELLA_CHART_NAME, - TO_UMBRELLA_SEMVER, + "The '{UMBRELLA_CHART_NAME}' helm chart is not upgraded to a version with '{CORE_CHART_NAME}' dependency helm chart version '{target_version}': Upgrade for helm chart {UMBRELLA_CHART_NAME} is not supported, refer to the instructions at {UMBRELLA_CHART_UPGRADE_DOCS_URL} to upgrade your release of the '{UMBRELLA_CHART_NAME}' helm chart.", ))] - UmbrellaChartNotUpgraded, + UmbrellaChartNotUpgraded { target_version: String }, /// Error for when the helm upgrade for the Core chart does not have a chart directory. #[snafu(display( - "The {} helm chart could not be upgraded as input chart directory is absent", - CORE_CHART_NAME + "The {CORE_CHART_NAME} helm chart could not be upgraded as input chart directory is absent", ))] CoreChartUpgradeNoneChartDir, - /// Error for when the Storage REST API Deployment is absent. - #[snafu(display( - "Found no {} REST API Deployments in the namespace {} with labelSelector {}", - product_train(), - namespace, - label_selector - ))] - NoRestDeployment { - namespace: String, - label_selector: String, - }, - - /// Error for when the CHART_VERSION_LABEL_KEY is missing amongst the labels in a Deployment. - #[snafu(display( - "A label with the key {} was not found for Deployment {} in namespace {}", - helm_release_version_key(), - deployment_name, - namespace - ))] - NoVersionLabelInDeployment { - deployment_name: String, - namespace: String, - }, - - /// Error for when a Kubernetes API request for GET-ing a list of Deployments filtered by - /// label(s) fails. - #[snafu(display( - "Failed to list Deployments with label {} in namespace {}: {}", - label_selector, - namespace, - source - ))] - ListDeploymentsWithLabel { - source: kube::Error, - namespace: String, - label_selector: String, - }, - /// Error for when the helm upgrade run is that of an invalid chart configuration. #[snafu(display("Invalid helm upgrade request"))] InvalidHelmUpgrade, /// Error for when the helm upgrade's target version is lower the source version. #[snafu(display( - "Failed to upgrade from {} to {}: upgrade to an earlier-released version is forbidden", - source_version, - target_version + "Failed to upgrade from {source_version} to {target_version}: upgrade to an earlier-released version is forbidden", ))] RollbackForbidden { source_version: String, @@ -553,10 +524,7 @@ pub(crate) enum Error { /// Error for when yq command execution fails. #[snafu(display( - "Failed to run yq command,\ncommand: {},\nargs: {:?},\ncommand_error: {}", - command, - args, - source + "Failed to run yq command,\ncommand: {command},\nargs: {args:?},\ncommand_error: {source}", ))] YqCommandExec { source: std::io::Error, @@ -566,10 +534,7 @@ pub(crate) enum Error { /// Error for when the `yq -V` command returns an error. #[snafu(display( - "`yq -V` command return an error,\ncommand: {},\narg: {},\nstd_err: {}", - command, - arg, - std_err, + "`yq -V` command return an error,\ncommand: {command},\narg: {arg},\nstd_err: {std_err}", ))] YqVersionCommand { command: String, @@ -579,10 +544,7 @@ pub(crate) enum Error { /// Error for when the `yq eq` command returns an error. #[snafu(display( - "`yq ea` command return an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`yq ea` command return an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] YqMergeCommand { command: String, @@ -595,26 +557,23 @@ pub(crate) enum Error { NotYqV4, /// Error for when temporary file creation fails. - #[snafu(display("Failed to create temporary file: {}", source))] + #[snafu(display("Failed to create temporary file: {source}"))] TempFileCreation { source: std::io::Error }, /// Error for when we fail to write to a temporary file. - #[snafu(display("Failed to write to temporary file {}: {}", filepath.display(), source))] + #[snafu(display("Failed to write to temporary file {}: {source}", filepath.display()))] WriteToTempFile { source: std::io::Error, filepath: PathBuf, }, /// Error for when the input yaml key for a string value isn't a valid one. - #[snafu(display("{} is not a valid yaml key for a string value", key))] + #[snafu(display("{key} is not a valid yaml key for a string value"))] NotAValidYamlKeyForStringValue { key: String }, /// Error for when the yq command to update the value of a yaml field returns an error. #[snafu(display( - "`yq` set-value-command returned an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`yq` set-value-command returned an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] YqSetCommand { command: String, @@ -624,10 +583,7 @@ pub(crate) enum Error { /// Error for when the yq command to delete an object path returns an error. #[snafu(display( - "`yq` delete-object-command returned an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`yq` delete-object-command returned an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] YqDeleteObjectCommand { command: String, @@ -637,10 +593,7 @@ pub(crate) enum Error { /// Error for when the yq command to append to an array returns an error. #[snafu(display( - "`yq` append-to-array-command returned an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`yq` append-to-array-command returned an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] YqAppendToArrayCommand { command: String, @@ -650,10 +603,7 @@ pub(crate) enum Error { /// Error for when the yq command to append to an object returns an error. #[snafu(display( - "`yq` append-to-object-command returned an error,\ncommand: {},\nargs: {:?},\nstd_err: {}", - command, - args, - std_err, + "`yq` append-to-object-command returned an error,\ncommand: {command},\nargs: {args:?},\nstd_err: {std_err}", ))] YqAppendToObjectCommand { command: String, @@ -670,6 +620,48 @@ pub(crate) enum Error { lower_extent: String, upper_extent: String, }, + + /// Error for when the list of ControllerRevisions for a controller's resource is empty. + #[snafu(display( + "No ControllerRevisions found in namespace '{namespace}' with label selector '{label_selector}' and field selector '{field_selector}'" + ))] + ControllerRevisionListEmpty { + namespace: String, + label_selector: String, + field_selector: String, + }, + + /// Error for when a ControllerRevision doesn't have a label key containing the controller + /// revision hash. + #[snafu(display( + "ControllerRevisions '{name}' in namespace '{namespace}' doesn't have label key '{hash_label_key}'" + ))] + ControllerRevisionDoesntHaveHashLabel { + name: String, + namespace: String, + hash_label_key: String, + }, + + /// Error for when base64 decode fails for helm storage data. + #[snafu(display("Failed to decode helm storage data"))] + Base64DecodeHelmStorage { source: base64::DecodeError }, + + /// Error for when Gzip decompressed data fails copy to byte buffer. + #[snafu(display("Failed to copy gzip decompressed data to byte buffer: {source}"))] + GzipDecoderReadToEnd { source: std::io::Error }, + + /// Error for when Deserializing the JSON stored in a helm storage driver (secret or cm) fails. + #[snafu(display("Failed to deserialize helm storage data from JSON: {source}"))] + DeserializaHelmStorageData { source: serde_json::Error }, + + /// Error for when an expected JSON member in the helm storage data is missing. + #[snafu(display("Couldn't find '{member}' in helm storage data"))] + MissingMemberInHelmStorageData { member: &'static str }, + + /// Error for when helm dependency data in a helm storage driver contains an invalid/missing + /// entry for the CORE_CHART version. + #[snafu(display("Helm release data doesn't have chart version or contains an invalid version for dependency chart '{CORE_CHART_NAME}'"))] + InvalidDependencyVersionInHelmReleaseData, } /// A wrapper type to remove repeated Result returns. diff --git a/k8s/upgrade/src/bin/upgrade-job/common/kube.rs b/k8s/upgrade/src/bin/upgrade-job/common/kube.rs new file mode 100644 index 000000000..526490322 --- /dev/null +++ b/k8s/upgrade/src/bin/upgrade-job/common/kube.rs @@ -0,0 +1,2 @@ +/// This contains tools for working with the Kubernetes REST API. +pub(crate) mod client; diff --git a/k8s/upgrade/src/bin/upgrade-job/common/kube/client.rs b/k8s/upgrade/src/bin/upgrade-job/common/kube/client.rs new file mode 100644 index 000000000..c887417d7 --- /dev/null +++ b/k8s/upgrade/src/bin/upgrade-job/common/kube/client.rs @@ -0,0 +1,385 @@ +use crate::common::{ + constants::KUBE_API_PAGE_SIZE, + error::{ + ControllerRevisionDoesntHaveHashLabel, ControllerRevisionListEmpty, + InvalidNoOfHelmConfigMaps, InvalidNoOfHelmSecrets, K8sClientGeneration, + ListConfigMapsWithLabelAndField, ListCtrlRevsWithLabelAndField, ListNodesWithLabelAndField, + ListPodsWithLabelAndField, ListSecretsWithLabelAndField, Result, + }, +}; +use k8s_openapi::{ + api::{ + apps::v1::ControllerRevision, + core::v1::{ConfigMap, Namespace, Node, Pod, Secret}, + }, + apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, +}; +use kube::{ + api::{Api, ListParams}, + core::PartialObjectMeta, + Client, Resource, ResourceExt, +}; +use serde::de::DeserializeOwned; +use snafu::{ensure, ErrorCompat, IntoError, ResultExt}; + +/// Generate a new kube::Client. +pub(crate) async fn client() -> Result { + Client::try_default().await.context(K8sClientGeneration) +} + +/// Generate the Node api client. +pub(crate) async fn nodes_api() -> Result> { + Ok(Api::all(client().await?)) +} + +/// Generate the Namespace api client. +pub(crate) async fn namespaces_api() -> Result> { + Ok(Api::all(client().await?)) +} + +/// Generate the CustomResourceDefinition api client. +pub(crate) async fn crds_api() -> Result> { + Ok(Api::all(client().await?)) +} + +/// Generate ControllerRevision api client. +pub(crate) async fn controller_revisions_api(namespace: &str) -> Result> { + Ok(Api::namespaced(client().await?, namespace)) +} + +/// Generate the Pod api client. +pub(crate) async fn pods_api(namespace: &str) -> Result> { + Ok(Api::namespaced(client().await?, namespace)) +} + +/// Generate the Secret api client. +pub(crate) async fn secrets_api(namespace: &str) -> Result> { + Ok(Api::namespaced(client().await?, namespace)) +} + +/// Generate the Configmap api client. +pub(crate) async fn configmaps_api(namespace: &str) -> Result> { + Ok(Api::namespaced(client().await?, namespace)) +} + +pub(crate) async fn list_pods( + namespace: String, + label_selector: Option, + field_selector: Option, +) -> Result> { + let mut pods: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); + if let Some(ref labels) = label_selector { + list_params = list_params.labels(labels); + } + if let Some(ref fields) = field_selector { + list_params = list_params.fields(fields); + } + + let pods_api = pods_api(namespace.as_str()).await?; + + let list_pods_error_ctx = ListPodsWithLabelAndField { + label: label_selector.unwrap_or_default(), + field: field_selector.unwrap_or_default(), + namespace: namespace.clone(), + }; + + paginated_list(pods_api, &mut pods, Some(list_params), list_pods_error_ctx).await?; + + Ok(pods) +} + +/// List Nodes metadata in the kubernetes cluster. +pub(crate) async fn list_nodes_metadata( + label_selector: Option, + field_selector: Option, +) -> Result>> { + let mut nodes: Vec> = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); + if let Some(ref labels) = label_selector { + list_params = list_params.labels(labels); + } + if let Some(ref fields) = field_selector { + list_params = list_params.fields(fields); + } + + let nodes_api = nodes_api().await?; + + let list_nodes_error_ctx = ListNodesWithLabelAndField { + label: label_selector.unwrap_or_default(), + field: field_selector.unwrap_or_default(), + }; + + paginated_list_metadata( + nodes_api, + &mut nodes, + Some(list_params), + list_nodes_error_ctx, + ) + .await?; + + Ok(nodes) +} + +/// List ControllerRevisions in a Kubernetes namespace. +pub(crate) async fn list_controller_revisions( + namespace: String, + label_selector: Option, + field_selector: Option, +) -> Result> { + let mut ctrl_revs: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); + if let Some(ref labels) = label_selector { + list_params = list_params.labels(labels); + } + if let Some(ref fields) = field_selector { + list_params = list_params.fields(fields); + } + + let controller_revisions_api = controller_revisions_api(namespace.as_str()).await?; + + let list_ctrl_revs_error_ctx = ListCtrlRevsWithLabelAndField { + label: label_selector.unwrap_or_default(), + field: field_selector.unwrap_or_default(), + namespace: namespace.clone(), + }; + + paginated_list( + controller_revisions_api, + &mut ctrl_revs, + Some(list_params), + list_ctrl_revs_error_ctx, + ) + .await?; + + Ok(ctrl_revs) +} + +/// Returns the controller-revision-hash of the latest revision of a resource's ControllerRevisions. +pub(crate) async fn latest_controller_revision_hash( + namespace: String, + label_selector: Option, + field_selector: Option, + hash_label_key: String, +) -> Result { + let mut ctrl_revs = list_controller_revisions( + namespace.clone(), + label_selector.clone(), + field_selector.clone(), + ) + .await?; + // Fail if ControllerRevisions list is empty. + ensure!( + !ctrl_revs.is_empty(), + ControllerRevisionListEmpty { + namespace: namespace.clone(), + label_selector: label_selector.unwrap_or_default(), + field_selector: field_selector.unwrap_or_default() + } + ); + + // Sort non-ascending by revision no. + ctrl_revs.sort_unstable_by(|a, b| b.revision.cmp(&a.revision)); + + ctrl_revs[0] + .labels() + .get(&hash_label_key) + .map(|s| s.into()) + .ok_or( + ControllerRevisionDoesntHaveHashLabel { + name: ctrl_revs[0].name_unchecked(), + namespace, + hash_label_key, + } + .build(), + ) +} + +/// This returns a list of Secrets based on filtering criteria. Returns all if criteria is absent. +pub(crate) async fn list_secrets( + namespace: String, + label_selector: Option, + field_selector: Option, +) -> Result> { + let mut secrets: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); + if let Some(ref labels) = label_selector { + list_params = list_params.labels(labels); + } + if let Some(ref fields) = field_selector { + list_params = list_params.fields(fields); + } + + let secrets_api = secrets_api(namespace.as_str()).await?; + + let list_secrets_error = ListSecretsWithLabelAndField { + label: label_selector.unwrap_or_default(), + field: field_selector.unwrap_or_default(), + namespace: namespace.clone(), + }; + + paginated_list( + secrets_api, + &mut secrets, + Some(list_params), + list_secrets_error, + ) + .await?; + + Ok(secrets) +} + +/// This returns a list of ConfigMaps based on filtering criteria. Returns all if criteria is +/// absent. +pub(crate) async fn list_configmaps( + namespace: String, + label_selector: Option, + field_selector: Option, +) -> Result> { + let mut configmaps: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); + + let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); + if let Some(ref labels) = label_selector { + list_params = list_params.labels(labels); + } + if let Some(ref fields) = field_selector { + list_params = list_params.fields(fields); + } + + let configmaps_api = configmaps_api(namespace.as_str()).await?; + + let list_configmaps_error = ListConfigMapsWithLabelAndField { + label: label_selector.unwrap_or_default(), + field: field_selector.unwrap_or_default(), + namespace: namespace.clone(), + }; + + paginated_list( + configmaps_api, + &mut configmaps, + Some(list_params), + list_configmaps_error, + ) + .await?; + + Ok(configmaps) +} + +/// GET the helm release secret for a helm release in a namespace. +pub(crate) async fn get_helm_release_secret( + release_name: String, + namespace: String, +) -> Result { + let secrets = list_secrets( + namespace.clone(), + Some(format!("name={release_name},status=deployed")), + Some("type=helm.sh/release.v1".to_string()), + ) + .await?; + let wrong_no_of_secrets = InvalidNoOfHelmSecrets { + release_name, + namespace, + count: secrets.len(), + }; + ensure!(secrets.len() == 1, wrong_no_of_secrets.clone()); + + secrets + .into_iter() + .next() + .ok_or(wrong_no_of_secrets.build()) +} + +/// GET the helm release configmap for a helm release in a namespace. +pub(crate) async fn get_helm_release_configmap( + release_name: String, + namespace: String, +) -> Result { + let cms = list_configmaps( + namespace.clone(), + Some(format!("name={release_name},owner=helm,status=deployed")), + None, + ) + .await?; + let wrong_no_of_cms = InvalidNoOfHelmConfigMaps { + release_name, + namespace, + count: cms.len(), + }; + ensure!(cms.len() == 1, wrong_no_of_cms.clone()); + + cms.into_iter().next().ok_or(wrong_no_of_cms.build()) +} + +async fn paginated_list( + resource_api: Api, + resources: &mut Vec, + list_params: Option, + list_err_ctx: C, +) -> Result<()> +where + K: Resource + Clone + DeserializeOwned + std::fmt::Debug, + C: IntoError + Clone, + E2: std::error::Error + ErrorCompat, + crate::common::error::Error: From, +{ + let mut list_params = list_params.unwrap_or_default().limit(KUBE_API_PAGE_SIZE); + + loop { + let resource_list = resource_api + .list(&list_params) + .await + .context(list_err_ctx.clone())?; + + let maybe_token = resource_list.metadata.continue_.clone(); + + resources.extend(resource_list); + + match maybe_token { + Some(ref token) => { + list_params = list_params.continue_token(token); + } + None => break, + } + } + + Ok(()) +} + +async fn paginated_list_metadata( + resource_api: Api, + resources: &mut Vec>, + list_params: Option, + list_err_ctx: C, +) -> Result<()> +where + K: Resource + Clone + DeserializeOwned + std::fmt::Debug, + C: IntoError + Clone, + E2: std::error::Error + ErrorCompat, + crate::common::error::Error: From, +{ + let mut list_params = list_params.unwrap_or_default().limit(KUBE_API_PAGE_SIZE); + + loop { + let resource_list = resource_api + .list_metadata(&list_params) + .await + .context(list_err_ctx.clone())?; + + let maybe_token = resource_list.metadata.continue_.clone(); + + resources.extend(resource_list); + + match maybe_token { + Some(ref token) => { + list_params = list_params.continue_token(token); + } + None => break, + } + } + + Ok(()) +} diff --git a/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs b/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs deleted file mode 100644 index da03ca0bf..000000000 --- a/k8s/upgrade/src/bin/upgrade-job/common/kube_client.rs +++ /dev/null @@ -1,89 +0,0 @@ -use crate::common::{ - constants::KUBE_API_PAGE_SIZE, - error::{K8sClientGeneration, ListPodsWithLabelAndField, Result}, -}; -use k8s_openapi::{ - api::{ - apps::v1::Deployment, - core::v1::{Namespace, Node, Pod}, - }, - apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, -}; -use kube::{ - api::{Api, ListParams}, - Client, -}; -use snafu::ResultExt; - -/// Generate a new kube::Client. -pub(crate) async fn client() -> Result { - Client::try_default().await.context(K8sClientGeneration) -} - -/// Generate the Node api client. -pub(crate) async fn nodes_api() -> Result> { - Ok(Api::all(client().await?)) -} - -/// Generate the Namespace api client. -pub(crate) async fn namespaces_api() -> Result> { - Ok(Api::all(client().await?)) -} - -/// Generate the CustomResourceDefinition api client. -pub(crate) async fn crds_api() -> Result> { - Ok(Api::all(client().await?)) -} - -/// Generate the Pod api client. -pub(crate) async fn pods_api(namespace: &str) -> Result> { - Ok(Api::namespaced(client().await?, namespace)) -} - -/// Generate the Deployment api client. -pub(crate) async fn deployments_api(namespace: &str) -> Result> { - Ok(Api::namespaced(client().await?, namespace)) -} - -pub(crate) async fn list_pods( - namespace: String, - label_selector: Option, - field_selector: Option, -) -> Result> { - let mut pods: Vec = Vec::with_capacity(KUBE_API_PAGE_SIZE as usize); - - let mut list_params = ListParams::default().limit(KUBE_API_PAGE_SIZE); - if let Some(ref labels) = label_selector { - list_params = list_params.labels(labels.as_str()); - } - if let Some(ref fields) = field_selector { - list_params = list_params.fields(fields.as_str()); - } - - let list_pods_error_ctx = ListPodsWithLabelAndField { - label: label_selector.unwrap_or_default(), - field: field_selector.unwrap_or_default(), - namespace: namespace.clone(), - }; - - loop { - let pod_list = pods_api(namespace.as_str()) - .await? - .list(&list_params) - .await - .context(list_pods_error_ctx.clone())?; - - let continue_ = pod_list.metadata.continue_.clone(); - - pods = pods.into_iter().chain(pod_list).collect(); - - match continue_ { - Some(token) => { - list_params = list_params.continue_token(token.as_str()); - } - None => break, - } - } - - Ok(pods) -} diff --git a/k8s/upgrade/src/bin/upgrade-job/events/event_recorder.rs b/k8s/upgrade/src/bin/upgrade-job/events/event_recorder.rs index 3556a7542..593d8e4bf 100644 --- a/k8s/upgrade/src/bin/upgrade-job/events/event_recorder.rs +++ b/k8s/upgrade/src/bin/upgrade-job/events/event_recorder.rs @@ -4,7 +4,7 @@ use crate::common::{ EventChannelSend, EventPublish, EventRecorderOptionsAbsent, GetPod, JobPodHasTooManyOwners, JobPodOwnerIsNotJob, JobPodOwnerNotFound, Result, SerializeEventNote, }, - kube_client as KubeClient, + kube::client as KubeClient, }; use k8s_openapi::{api::core::v1::ObjectReference, serde_json}; use kube::runtime::events::{Event, EventType, Recorder}; diff --git a/k8s/upgrade/src/bin/upgrade-job/helm.rs b/k8s/upgrade/src/bin/upgrade-job/helm.rs index e0eb0bdc9..1687be328 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm.rs @@ -1,14 +1,10 @@ +/// Contains the structs required to deserialize yaml files from the helm charts. +pub(crate) mod chart; /// Contains the HelmReleaseClient. Used for interacting with installed helm chart releases. pub(crate) mod client; - /// Contains helm chart upgrade logic. pub(crate) mod upgrade; - /// Contains validation and logic to generate helm values options for the `helm upgrade` command. pub(crate) mod values; - -/// Contains the structs required to deserialize yaml files from the helm charts. -pub(crate) mod chart; - /// This contains tools for use with yaml files. pub(crate) mod yaml; diff --git a/k8s/upgrade/src/bin/upgrade-job/helm/client.rs b/k8s/upgrade/src/bin/upgrade-job/helm/client.rs index 69cc36b02..f5d9bc194 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm/client.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm/client.rs @@ -1,24 +1,115 @@ use crate::{ - common::error::{ - HelmClientNs, HelmCommand, HelmGetValuesCommand, HelmListCommand, HelmRelease, - HelmUpgradeCommand, Result, U8VectorToString, YamlParseFromSlice, + common::{ + error::{ + Base64DecodeHelmStorage, DeserializaHelmStorageData, GzipDecoderReadToEnd, + HelmClientNs, HelmCommand, HelmGetValuesCommand, HelmListCommand, HelmRelease, + HelmStorageNoData, HelmStorageNoReleaseValue, HelmUpgradeCommand, + MissingMemberInHelmStorageData, NoHelmStorageDriver, Result, U8VectorToString, + UnsupportedStorageDriver, YamlParseFromSlice, + }, + kube::client as KubeClient, }, vec_to_strings, }; -use k8s_openapi::serde; +use base64::engine::{general_purpose::STANDARD, Engine as base64_engine}; +use flate2::read::GzDecoder; +use k8s_openapi::kind; use serde::Deserialize; use snafu::{ensure, ResultExt}; -use std::{path::Path, process::Command, str}; +use std::{io::Read, path::Path, process::Command, str}; use tracing::debug; +/// This is used to pick out the .data field from a kubernetes secret or a configmap. +macro_rules! extract_data { + ($source:ident) => {{ + let driver = kind(&$source); + $source + .data + .ok_or(HelmStorageNoData { driver }.build())? + .into_iter() + .find_map(|(k, v)| k.eq("release").then_some(v)) + .ok_or(HelmStorageNoReleaseValue { driver }.build()) + }}; +} + +/// This is used to deserialize the JSON data in a helm storage resource (secret or configmap). +#[derive(Debug, Deserialize)] +pub(crate) struct HelmChartRelease { + chart: Option, +} + +/// This is used to deserialize release.chart. +#[derive(Debug, Deserialize)] +pub(crate) struct HelmChartReleaseChart { + metadata: HelmChartReleaseChartMetadata, +} + +/// This is used to deserialize release.chart.metadata. +#[derive(Debug, Deserialize)] +pub(crate) struct HelmChartReleaseChartMetadata { + dependencies: Option>, +} + +/// This is used to deserialize release.chart.metadata.dependency[]. +#[derive(Debug, Deserialize)] +pub(crate) struct HelmChartReleaseChartMetadataDependency { + name: String, + version: Option, +} + +impl HelmChartReleaseChartMetadataDependency { + /// Returns the name of the dependency chart. + pub(crate) fn name(&self) -> &str { + self.name.as_str() + } + + /// Returns the version of the dependency chart. + pub(crate) fn version(self) -> Option { + self.version + } +} + +/// This performs a base64 decode and Gzip Decode for the data extracted from the helm storage. +fn decode_decompress_data(data: impl AsRef<[u8]>) -> Result> { + let data_compressed = + base64_engine::decode(&STANDARD, data).context(Base64DecodeHelmStorage)?; + + let mut gzip_decoder = GzDecoder::new(&data_compressed[..]); + let mut data: Vec = Vec::new(); + gzip_decoder + .read_to_end(&mut data) + .context(GzipDecoderReadToEnd)?; + + Ok(data) +} + +/// Extract list of dependencies from a chart's release_data or fail. +fn dependencies_from_release_data( + data: Vec, +) -> Result> { + let release: HelmChartRelease = + serde_json::from_slice(data.as_slice()).context(DeserializaHelmStorageData)?; + + let missing_member_err = |member: &'static str| -> crate::common::error::Error { + MissingMemberInHelmStorageData { member }.build() + }; + + release + .chart + .ok_or(missing_member_err(".chart"))? + .metadata + .dependencies + .ok_or(missing_member_err(".chart.metadata.dependencies")) +} + /// This struct is used to deserialize the output of `helm list -n --deployed -o yaml`. #[derive(Clone, Deserialize)] -pub(crate) struct HelmReleaseElement { +pub(crate) struct HelmListReleaseElement { name: String, chart: String, } -impl HelmReleaseElement { +impl HelmListReleaseElement { /// This is a getter function for the name of the release. pub(crate) fn name(&self) -> &str { self.name.as_str() @@ -34,6 +125,7 @@ impl HelmReleaseElement { #[derive(Default)] pub(crate) struct HelmReleaseClientBuilder { namespace: Option, + storage_driver: Option, } impl HelmReleaseClientBuilder { @@ -48,10 +140,21 @@ impl HelmReleaseClientBuilder { self } + /// Set the storage driver to use with helm commands. + #[must_use] + pub(crate) fn with_storage_driver(mut self, driver: String) -> Self { + self.storage_driver = Some(driver); + self + } + /// Build the HelmReleaseClient. pub(crate) fn build(self) -> Result { - let ns = self.namespace.ok_or(HelmClientNs.build())?; - Ok(HelmReleaseClient { namespace: ns }) + let namespace = self.namespace.ok_or(HelmClientNs.build())?; + let storage_driver = self.storage_driver.ok_or(NoHelmStorageDriver.build())?; + Ok(HelmReleaseClient { + namespace, + storage_driver, + }) } } @@ -60,6 +163,9 @@ impl HelmReleaseClientBuilder { #[derive(Clone)] pub(crate) struct HelmReleaseClient { pub(crate) namespace: String, + /// This is the information that Helm stores on the cluster about the state of a helm release. + /// Ref: https://github.com/helm/helm/blob/v3.15.0/pkg/action/action.go#L383 + pub(crate) storage_driver: String, } impl HelmReleaseClient { @@ -100,6 +206,7 @@ impl HelmReleaseClient { debug!(%command, ?args, "Helm get values command"); let output = Command::new(command) + .env("HELM_DRIVER", self.storage_driver.as_str()) .args(args.clone()) .output() .context(HelmCommand { @@ -125,7 +232,7 @@ impl HelmReleaseClient { pub(crate) fn list_as_yaml( &self, maybe_extra_args: Option>, - ) -> Result> + ) -> Result> where A: ToString, { @@ -145,6 +252,7 @@ impl HelmReleaseClient { debug!(%command, ?args, "Helm list command"); let output = Command::new(command) + .env("HELM_DRIVER", self.storage_driver.as_str()) .args(args.clone()) .output() .context(HelmCommand { @@ -170,6 +278,48 @@ impl HelmReleaseClient { }) } + /// Reads from the helm storage driver and returns a type with info. about dependencies. + pub(crate) async fn get_dependencies( + &self, + release_name: &str, + ) -> Result> { + match self.storage_driver.as_str() { + "" | "secret" | "secrets" => { + debug!("Using helm secret as helm storage"); + let secret = KubeClient::get_helm_release_secret( + release_name.to_string(), + self.namespace.clone(), + ) + .await?; + + let release_data = extract_data!(secret)?; + let decoded_data = decode_decompress_data(release_data.0)?; + let dependencies = dependencies_from_release_data(decoded_data)?; + debug!(data=?dependencies, "Found helm chart release chart metadata dependency in helm secret"); + + Ok(dependencies) + } + "configmap" | "configmaps" => { + debug!("Using helm configmap as helm storage"); + let cm = KubeClient::get_helm_release_configmap( + release_name.to_string(), + self.namespace.clone(), + ) + .await?; + let release_data = extract_data!(cm)?; + let decoded_data = decode_decompress_data(release_data)?; + let dependencies = dependencies_from_release_data(decoded_data)?; + debug!(data=?dependencies, "Found helm chart release chart metadata dependency in helm configmap"); + + Ok(dependencies) + } + unsupported_driver => UnsupportedStorageDriver { + driver: unsupported_driver.to_string(), + } + .fail(), + } + } + /// Runs command `helm upgrade -n `. pub(crate) async fn upgrade( &self, @@ -200,6 +350,7 @@ impl HelmReleaseClient { debug!(%command, ?args, "Helm upgrade command"); let output = Command::new(command) + .env("HELM_DRIVER", self.storage_driver.as_str()) .args(args.clone()) .output() .context(HelmCommand { @@ -224,7 +375,7 @@ impl HelmReleaseClient { } /// Fetches info about a Helm release in the Namespace, if it exists. - pub(crate) fn release_info(&self, release_name: A) -> Result + pub(crate) fn release_info(&self, release_name: A) -> Result where A: ToString, { diff --git a/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs b/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs index b6154ce6e..1c18c8b27 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm/upgrade.rs @@ -1,10 +1,10 @@ use crate::{ common::{ - constants::{CORE_CHART_NAME, TO_UMBRELLA_SEMVER, UMBRELLA_CHART_NAME}, + constants::{CORE_CHART_NAME, UMBRELLA_CHART_NAME}, error::{ HelmUpgradeOptionNamespaceAbsent, HelmUpgradeOptionReleaseNameAbsent, - InvalidUpgradePath, NoInputHelmChartDir, NotAKnownHelmChart, Result, RollbackForbidden, - UmbrellaChartNotUpgraded, + InvalidUpgradePath, NoHelmStorageDriver, NoInputHelmChartDir, NotAKnownHelmChart, + Result, RollbackForbidden, UmbrellaChartNotUpgraded, }, regex::Regex, }, @@ -14,7 +14,8 @@ use crate::{ values::generate_values_yaml_file, }, upgrade::path::{ - is_valid_for_core_chart, version_from_chart_yaml_file, version_from_rest_deployment_label, + core_version_from_umbrella_release, is_valid_for_core_chart, version_from_chart_yaml_file, + version_from_core_chart_release, }, vec_to_strings, }; @@ -39,10 +40,10 @@ pub(crate) trait HelmUpgrader { async fn dry_run(self: Box) -> Result; /// Return the source helm chart version as a String. - fn source_version(&self) -> String; + fn source_version(&self) -> Version; /// Return the target helm chart version as a String. - fn target_version(&self) -> String; + fn target_version(&self) -> Version; } /// This is a builder for the Helm chart upgrade. @@ -54,6 +55,9 @@ pub(crate) struct HelmUpgraderBuilder { skip_upgrade_path_validation: bool, helm_args_set: Option, helm_args_set_file: Option, + helm_storage_driver: Option, + // Defaults to false, which is what we want for the time being. + helm_reset_then_reuse_values: bool, } impl HelmUpgraderBuilder { @@ -112,6 +116,19 @@ impl HelmUpgraderBuilder { self } + /// This is a builder option to add the value of the helm storage driver. + #[must_use] + pub(crate) fn with_helm_storage_driver(mut self, driver: String) -> Self { + self.helm_storage_driver = Some(driver); + self + } + + /// This is a builder option to enable the use of the helm --reset-then-reuse-values flag. + pub(crate) fn with_helm_reset_then_reuse_values(mut self, use_it: bool) -> Self { + self.helm_reset_then_reuse_values = use_it; + self + } + /// This builds the HelmUpgrade object. pub(crate) async fn build(self) -> Result> { // Unwrapping builder inputs. Fails for mandatory inputs. @@ -127,22 +144,23 @@ impl HelmUpgraderBuilder { } .build(), )?; + let storage_driver = self + .helm_storage_driver + .ok_or(NoHelmStorageDriver.build())?; let helm_args_set = self.helm_args_set.unwrap_or_default(); let helm_args_set_file = self.helm_args_set_file.unwrap_or_default(); // Generate HelmReleaseClient. let client = HelmReleaseClient::builder() .with_namespace(namespace.as_str()) + .with_storage_driver(storage_driver) .build()?; - // Get the chart_name from the HelmReleaseElement object for the release specified in CLI - // options. + // Get the chart_name from the HelmListReleaseElement object for the release specified in + // CLI options. let helm_release = client.release_info(release_name.as_str())?; let chart = helm_release.chart(); - // The version of the Core helm chart (installed as the parent chart or as a dependent - // chart) which is installed in the cluster. - let source_version = version_from_rest_deployment_label(namespace.as_str()).await?; // source_values from installed helm chart release. let source_values_buf = client.get_values_as_yaml::<&str, String>(release_name.as_str(), None)?; @@ -151,12 +169,10 @@ impl HelmUpgraderBuilder { // The version of the Core chart which we are (maybe) upgrading to. let target_version = version_from_chart_yaml_file(chart_dot_yaml_path)?; - // Check if already upgraded. - let already_upgraded = target_version.eq(&source_version); - // Define regular expression to pick out the chart name from the // - string. - let umbrella_regex = format!(r"^({UMBRELLA_CHART_NAME}-[0-9]+\.[0-9]+\.[0-9]+)$"); + let umbrella_regex = + format!(r"^({UMBRELLA_CHART_NAME}-[0-9]+\.[0-9]+\.[0-9]+(-[a-zA-Z0-9]+(\.[0-9]+)?)*)$"); // Accepts pre-release and release, both. // Q: How do I read this regex? // A: This regular expressions is bounded by the '^' and '$' characters, which means @@ -178,8 +194,22 @@ impl HelmUpgraderBuilder { // Determine chart variant. if Regex::new(umbrella_regex.as_str())?.is_match(chart.as_ref()) { + // The version of the Core helm chart (installed as a dependent chart) which is + // installed in the cluster. + let source_version = + core_version_from_umbrella_release(&client, helm_release.name()).await?; + info!(version=%source_version, "Found version of dependency chart {CORE_CHART_NAME}"); + + // Check if already upgraded. + let already_upgraded = target_version.eq(&source_version); + // Fail if the Umbrella chart isn't already upgraded. - ensure!(already_upgraded, UmbrellaChartNotUpgraded); + ensure!( + already_upgraded, + UmbrellaChartNotUpgraded { + target_version: target_version.to_string() + } + ); Ok(Box::new(UmbrellaHelmUpgrader { release_name, @@ -188,12 +218,15 @@ impl HelmUpgraderBuilder { target_version, })) } else if Regex::new(core_regex.as_str())?.is_match(chart) { + // The version of the Core helm chart (installed as the parent chart) + // which is installed in the cluster. + let source_version = version_from_core_chart_release(chart)?; + info!(version=%source_version, "Found version of chart {CORE_CHART_NAME}"); + // Skip upgrade-path validation and allow all upgrades for the Core helm chart, if // the flag is set. if !self.skip_upgrade_path_validation { // Rollbacks not supported. - // TODO: Support same version upgrades. Distinguish data plane Pods by uid - // instead of labels. ensure!( target_version.ge(&source_version), RollbackForbidden { @@ -227,26 +260,30 @@ impl HelmUpgraderBuilder { ) .await?; - // helm upgrade .. -f --set --set-file --atomic - let helm_upgrade_extra_args = vec_to_strings![ - "-f", - upgrade_values_file.path().to_string_lossy(), + let mut helm_upgrade_extra_args: Vec; + if self.helm_reset_then_reuse_values { + // helm upgrade .. --reset-then-reuse-values --set --set-file --atomic + helm_upgrade_extra_args = vec_to_strings!["--reset-then-reuse-values"]; + } else { + // helm upgrade .. -f --set --set-file --atomic + helm_upgrade_extra_args = + vec_to_strings!["-f", upgrade_values_file.path().to_string_lossy()]; + } + helm_upgrade_extra_args.extend(vec_to_strings![ "--set", helm_args_set, "--set-file", helm_args_set_file, "--atomic" - ]; + ]); Ok(Box::new(CoreHelmUpgrader { - already_upgraded, chart_dir, release_name, client, helm_upgrade_extra_args, source_version, target_version, - source_values, upgrade_values_file, })) } else { @@ -259,15 +296,12 @@ impl HelmUpgraderBuilder { /// This is a HelmUpgrader for the core helm chart. Unlike the UmbrellaHelmUpgrader, /// this actually can set up a helm upgrade. pub(crate) struct CoreHelmUpgrader { - // TODO: remove this when same version upgrade is implemented. - already_upgraded: bool, chart_dir: PathBuf, release_name: String, client: HelmReleaseClient, helm_upgrade_extra_args: Vec, source_version: Version, target_version: Version, - source_values: CoreValues, #[allow(dead_code)] upgrade_values_file: TempFile, } @@ -278,21 +312,6 @@ impl HelmUpgrader for CoreHelmUpgrader { /// from this method returns a HelmUpgradeRunner which is a Future which runs 'helm upgrade' /// when awaited on. async fn dry_run(self: Box) -> Result { - // TODO: Remove this if block after same-version upgrade is implemented. - if self.already_upgraded { - // Returned HelmUpgradeRunner logs and exits. - return Ok(Box::pin(async move { - info!( - "Skipping helm upgrade, as the version of the installed helm chart \ -is the same as that of this upgrade-job's helm chart" - ); - - let source_values: Box = Box::new(self.source_values); - - Ok(source_values) - })); - } - // Running 'helm upgrade --dry-run'. let mut dry_run_extra_args = self.helm_upgrade_extra_args.clone(); dry_run_extra_args.push("--dry-run".to_string()); @@ -334,12 +353,12 @@ is the same as that of this upgrade-job's helm chart" })) } - fn source_version(&self) -> String { - self.source_version.to_string() + fn source_version(&self) -> Version { + self.source_version.clone() } - fn target_version(&self) -> String { - self.target_version.to_string() + fn target_version(&self) -> Version { + self.target_version.clone() } } @@ -357,8 +376,9 @@ impl HelmUpgrader for UmbrellaHelmUpgrader { async fn dry_run(self: Box) -> Result { Ok(Box::pin(async move { info!( - "Verified that {UMBRELLA_CHART_NAME} helm chart release '{}' has version {TO_UMBRELLA_SEMVER}", - self.release_name.as_str() + "Verified that '{UMBRELLA_CHART_NAME}' helm chart release '{}' has dependency '{CORE_CHART_NAME}' of version '{}'", + self.release_name.as_str(), + self.target_version.to_string() ); let final_values_buf = self @@ -371,11 +391,11 @@ impl HelmUpgrader for UmbrellaHelmUpgrader { })) } - fn source_version(&self) -> String { - self.source_version.to_string() + fn source_version(&self) -> Version { + self.source_version.clone() } - fn target_version(&self) -> String { - self.target_version.to_string() + fn target_version(&self) -> Version { + self.target_version.clone() } } diff --git a/k8s/upgrade/src/bin/upgrade-job/helm/values.rs b/k8s/upgrade/src/bin/upgrade-job/helm/values.rs index 6e292a1cf..eb89f83ae 100644 --- a/k8s/upgrade/src/bin/upgrade-job/helm/values.rs +++ b/k8s/upgrade/src/bin/upgrade-job/helm/values.rs @@ -10,7 +10,7 @@ use crate::{ SerializePromtailInitContainerToJson, }, file::write_to_tempfile, - kube_client as KubeClient, + kube::client as KubeClient, }, helm::{ chart::{CoreValues, PromtailConfigClient}, diff --git a/k8s/upgrade/src/bin/upgrade-job/opts.rs b/k8s/upgrade/src/bin/upgrade-job/opts.rs index c65bf38f4..a9083ea0d 100644 --- a/k8s/upgrade/src/bin/upgrade-job/opts.rs +++ b/k8s/upgrade/src/bin/upgrade-job/opts.rs @@ -32,7 +32,7 @@ pub(crate) struct CliArgs { skip_data_plane_restart: bool, /// If set then this skips the upgrade path validation. - #[arg(long, default_value_t = false)] + #[arg(long, default_value_t = false, hide = true)] skip_upgrade_path_validation: bool, /// The name of the Kubernetes Job Pod. The Job object will be used to post upgrade event. @@ -41,21 +41,29 @@ pub(crate) struct CliArgs { /// The set values specified by the user for upgrade /// (can specify multiple or separate values with commas: key1=val1,key2=val2). - #[arg(long)] + #[arg(long, default_value = "")] helm_args_set: String, /// The set file values specified by the user for upgrade /// (can specify multiple or separate values with commas: key1=path1,key2=path2). - #[arg(long)] + #[arg(long, default_value = "")] helm_args_set_file: String, /// Formatting style to be used while logging. - #[clap(default_value = FmtStyle::Pretty.as_ref(), short, long)] + #[arg(default_value = FmtStyle::Pretty.as_ref(), short, long)] fmt_style: FmtStyle, /// Use ANSI colors for the logs. - #[clap(long)] + #[arg(long, default_value_t = true)] ansi_colors: bool, + + /// This is the helm storage driver, e.g. secret, configmap, memory, etc. + #[arg(env = "HELM_DRIVER", default_value = "")] + helm_storage_driver: String, + + /// Use helm's --reset-then-reuse-values option instead of using yq to derive the helm values. + #[arg(long, default_value_t = false)] + helm_reset_then_reuse_values: bool, } impl CliArgs { @@ -114,4 +122,14 @@ impl CliArgs { pub(crate) fn helm_args_set_file(&self) -> String { self.helm_args_set_file.clone() } + + /// This is the helm storage driver, e.g.: secret, secrets, configmap, configmaps, memory, sql. + pub(crate) fn helm_storage_driver(&self) -> String { + self.helm_storage_driver.clone() + } + + /// Return true if the --helm-reset-then-reuse-values has been specified. + pub(crate) fn helm_reset_then_reuse_values(&self) -> bool { + self.helm_reset_then_reuse_values + } } diff --git a/k8s/upgrade/src/bin/upgrade-job/opts/validators.rs b/k8s/upgrade/src/bin/upgrade-job/opts/validators.rs index e4117ea8b..d62ead99e 100644 --- a/k8s/upgrade/src/bin/upgrade-job/opts/validators.rs +++ b/k8s/upgrade/src/bin/upgrade-job/opts/validators.rs @@ -7,7 +7,7 @@ use crate::{ RegexCompile, Result, U8VectorToString, ValidateDirPath, ValidateFilePath, YamlParseFromFile, }, - kube_client as KubeClient, + kube::client as KubeClient, rest_client::RestClientSet, }, helm::chart::Chart, diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade.rs index ab31aedb1..196bb2c85 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade.rs @@ -1,20 +1,18 @@ use crate::{ common::{ constants::{ - helm_release_version_key, product_train, CORE_CHART_NAME, IO_ENGINE_LABEL, - PARTIAL_REBUILD_DISABLE_EXTENTS, + product_train, CORE_CHART_NAME, IO_ENGINE_LABEL, PARTIAL_REBUILD_DISABLE_EXTENTS, }, error::{PartialRebuildNotAllowed, Result}, - kube_client as KubeClient, + kube::client as KubeClient, }, events::event_recorder::{EventAction, EventRecorder}, helm::upgrade::{HelmUpgradeRunner, HelmUpgraderBuilder}, opts::CliArgs, }; +use constants::DS_CONTROLLER_REVISION_HASH_LABEL_KEY; use data_plane::upgrade_data_plane; -use k8s_openapi::api::core::v1::Pod; -use kube::ResourceExt; use semver::Version; use tracing::error; @@ -54,6 +52,8 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() .with_skip_upgrade_path_validation(opts.skip_upgrade_path_validation()) .with_helm_args_set(opts.helm_args_set()) .with_helm_args_set_file(opts.helm_args_set_file()) + .with_helm_storage_driver(opts.helm_storage_driver()) + .with_helm_reset_then_reuse_values(opts.helm_reset_then_reuse_values()) .build() .await?; @@ -63,8 +63,8 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() // Updating the EventRecorder with version values from the HelmUpgrade. // These two operations are thread-safe. The EventRecorder itself is not // shared with any other tokio task. - event.set_source_version(source_version.clone()); - event.set_target_version(target_version.clone()); + event.set_source_version(source_version.to_string()); + event.set_target_version(target_version.to_string()); // Dry-run helm upgrade. let dry_run_result: Result = helm_upgrader.dry_run().await; @@ -108,11 +108,21 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() // Data plane containers are updated in this step. if !opts.skip_data_plane_restart() { + partial_rebuild_check(&source_version, final_values.partial_rebuild_is_enabled())?; + + let latest_io_engine_ctrl_rev_hash = KubeClient::latest_controller_revision_hash( + opts.namespace(), + Some(IO_ENGINE_LABEL.to_string()), + None, + DS_CONTROLLER_REVISION_HASH_LABEL_KEY.to_string(), + ) + .await?; + let yet_to_upgrade_io_engine_label = format!( - "{IO_ENGINE_LABEL},{}!={}", - helm_release_version_key(), - target_version.as_str() + "{IO_ENGINE_LABEL},{DS_CONTROLLER_REVISION_HASH_LABEL_KEY}!={}", + latest_io_engine_ctrl_rev_hash.as_str() ); + let yet_to_upgrade_io_engine_pods = KubeClient::list_pods( opts.namespace(), Some(yet_to_upgrade_io_engine_label.clone()), @@ -120,11 +130,6 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() ) .await?; - partial_rebuild_check( - yet_to_upgrade_io_engine_pods.as_slice(), - final_values.partial_rebuild_is_enabled(), - )?; - event .publish_normal( format!("Upgrading {} data-plane", product_train()), @@ -135,7 +140,7 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() if let Err(error) = upgrade_data_plane( opts.namespace(), opts.rest_endpoint(), - target_version, + latest_io_engine_ctrl_rev_hash, final_values.ha_is_enabled(), yet_to_upgrade_io_engine_label, yet_to_upgrade_io_engine_pods, @@ -164,19 +169,9 @@ async fn upgrade_product(opts: &CliArgs, event: &mut EventRecorder) -> Result<() Ok(()) } -fn partial_rebuild_check( - yet_to_upgrade_io_engine_pods: &[Pod], - partial_rebuild_is_enabled: bool, -) -> Result<()> { - let partial_rebuild_disable_required = yet_to_upgrade_io_engine_pods - .iter() - .filter_map(|pod| pod.labels().get(&helm_release_version_key())) - .any(|v| { - let version = - Version::parse(v).expect("failed to parse version from io-engine Pod label"); - version.ge(&PARTIAL_REBUILD_DISABLE_EXTENTS.0) - & version.le(&PARTIAL_REBUILD_DISABLE_EXTENTS.1) - }); +fn partial_rebuild_check(source_version: &Version, partial_rebuild_is_enabled: bool) -> Result<()> { + let partial_rebuild_disable_required = source_version.ge(&PARTIAL_REBUILD_DISABLE_EXTENTS.0) + && source_version.le(&PARTIAL_REBUILD_DISABLE_EXTENTS.1); if partial_rebuild_disable_required && partial_rebuild_is_enabled { error!("Partial rebuild must be disabled for upgrades from {CORE_CHART_NAME} chart versions >= {}, <= {}", PARTIAL_REBUILD_DISABLE_EXTENTS.0, PARTIAL_REBUILD_DISABLE_EXTENTS.1); @@ -190,3 +185,21 @@ fn partial_rebuild_check( Ok(()) } + +#[cfg(test)] +mod tests { + #[test] + fn test_partial_rebuild_check() { + use crate::upgrade::partial_rebuild_check; + use semver::Version; + + let source = Version::new(2, 1, 0); + assert!(matches!(partial_rebuild_check(&source, true), Ok(()))); + let source = Version::new(2, 2, 0); + assert!(partial_rebuild_check(&source, true).is_err()); + let source = Version::new(2, 5, 0); + assert!(partial_rebuild_check(&source, true).is_err()); + let source = Version::new(2, 6, 0); + assert!(matches!(partial_rebuild_check(&source, true), Ok(()))); + } +} diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs index ef635f250..7d7200651 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade/data_plane.rs @@ -1,15 +1,13 @@ use crate::{ common::{ constants::{ - cordon_ana_check, drain_for_upgrade, helm_release_version_key, product_train, - AGENT_CORE_LABEL, IO_ENGINE_LABEL, + cordon_ana_check, drain_for_upgrade, product_train, AGENT_CORE_LABEL, IO_ENGINE_LABEL, }, error::{ DrainStorageNode, EmptyPodNodeName, EmptyPodSpec, EmptyStorageNodeSpec, GetStorageNode, - ListNodesWithLabel, ListStorageNodes, PodDelete, Result, StorageNodeUncordon, - TooManyIoEnginePods, + ListStorageNodes, PodDelete, Result, StorageNodeUncordon, TooManyIoEnginePods, }, - kube_client as KubeClient, + kube::client as KubeClient, rest_client::RestClientSet, }, upgrade::utils::{ @@ -17,11 +15,9 @@ use crate::{ uncordon_storage_node, RebuildResult, }, }; +use constants::DS_CONTROLLER_REVISION_HASH_LABEL_KEY; use k8s_openapi::api::core::v1::{Node, Pod}; -use kube::{ - api::{Api, DeleteParams, ListParams, ObjectList}, - ResourceExt, -}; +use kube::{api::DeleteParams, core::PartialObjectMeta, ResourceExt}; use openapi::models::CordonDrainState; use snafu::ResultExt; use std::time::Duration; @@ -33,7 +29,7 @@ use utils::{csi_node_nvme_ana, API_REST_LABEL, ETCD_LABEL}; pub(crate) async fn upgrade_data_plane( namespace: String, rest_endpoint: String, - upgrade_target_version: String, + latest_io_engine_ctrl_rev_hash: String, ha_is_enabled: bool, yet_to_upgrade_io_engine_label: String, yet_to_upgrade_io_engine_pods: Vec, @@ -82,7 +78,7 @@ pub(crate) async fn upgrade_data_plane( for pod in initial_io_engine_pod_list.iter() { // Validate the control plane pod is up and running before we start. - verify_control_plane_is_running(namespace.clone(), &upgrade_target_version).await?; + verify_control_plane_is_running(namespace.clone()).await?; // Fetch the node name on which the io-engine pod is running let node_name = pod @@ -115,14 +111,7 @@ pub(crate) async fn upgrade_data_plane( // Wait for any rebuild to complete wait_for_rebuild(node_name, &rest_client).await?; - if is_node_drainable( - ha_is_enabled, - node_name, - KubeClient::nodes_api().await?, - &rest_client, - ) - .await? - { + if is_node_drainable(ha_is_enabled, node_name, &rest_client).await? { // Issue node drain command if NVMe Ana is enabled. drain_storage_node(node_name, &rest_client).await?; } @@ -131,8 +120,12 @@ pub(crate) async fn upgrade_data_plane( delete_data_plane_pod(node_name, pod, namespace.clone()).await?; // validate the new pod is up and running - verify_data_plane_pod_is_running(node_name, namespace.clone(), &upgrade_target_version) - .await?; + verify_data_plane_pod_is_running( + node_name, + namespace.clone(), + latest_io_engine_ctrl_rev_hash.as_str(), + ) + .await?; // Uncordon the drained node uncordon_drained_storage_node(node_name, &rest_client).await?; @@ -222,12 +215,14 @@ async fn delete_data_plane_pod(node_name: &str, pod: &Pod, namespace: String) -> async fn verify_data_plane_pod_is_running( node_name: &str, namespace: String, - upgrade_target_version: &String, + latest_io_engine_ctrl_rev_hash: &str, ) -> Result<()> { let duration = Duration::from_secs(5_u64); // Validate the new pod is up and running info!(node.name = %node_name, "Waiting for data-plane Pod to come to Ready state"); - while !data_plane_pod_is_running(node_name, namespace.clone(), upgrade_target_version).await? { + while !data_plane_pod_is_running(node_name, namespace.clone(), latest_io_engine_ctrl_rev_hash) + .await? + { sleep(duration).await; } Ok(()) @@ -310,12 +305,11 @@ async fn drain_storage_node(node_id: &str, rest_client: &RestClientSet) -> Resul async fn data_plane_pod_is_running( node: &str, namespace: String, - upgrade_target_version: &String, + latest_io_engine_ctrl_rev_hash: &str, ) -> Result { let node_name_pod_field = format!("spec.nodeName={node}"); let pod_label = format!( - "{IO_ENGINE_LABEL},{}={upgrade_target_version}", - helm_release_version_key() + "{IO_ENGINE_LABEL},{DS_CONTROLLER_REVISION_HASH_LABEL_KEY}={latest_io_engine_ctrl_rev_hash}", ); let pod_list: Vec = @@ -332,12 +326,9 @@ async fn data_plane_pod_is_running( Ok(all_pods_are_ready(pod_list)) } -async fn verify_control_plane_is_running( - namespace: String, - upgrade_target_version: &String, -) -> Result<()> { +async fn verify_control_plane_is_running(namespace: String) -> Result<()> { let duration = Duration::from_secs(3_u64); - while !control_plane_is_running(namespace.clone(), upgrade_target_version).await? { + while !control_plane_is_running(namespace.clone()).await? { sleep(duration).await; } @@ -345,24 +336,13 @@ async fn verify_control_plane_is_running( } /// Validate if control-plane pods are running -- etcd, agent-core, api-rest. -async fn control_plane_is_running( - namespace: String, - upgrade_target_version: &String, -) -> Result { - let agent_core_selector_label = format!( - "{AGENT_CORE_LABEL},{}={upgrade_target_version}", - helm_release_version_key() - ); +async fn control_plane_is_running(namespace: String) -> Result { let pod_list: Vec = - KubeClient::list_pods(namespace.clone(), Some(agent_core_selector_label), None).await?; + KubeClient::list_pods(namespace.clone(), Some(AGENT_CORE_LABEL.to_string()), None).await?; let core_is_ready = all_pods_are_ready(pod_list); - let api_rest_selector_label = format!( - "{API_REST_LABEL},{}={upgrade_target_version}", - helm_release_version_key() - ); let pod_list: Vec = - KubeClient::list_pods(namespace.clone(), Some(api_rest_selector_label), None).await?; + KubeClient::list_pods(namespace.clone(), Some(API_REST_LABEL.to_string()), None).await?; let rest_is_ready = all_pods_are_ready(pod_list); let pod_list: Vec = @@ -378,7 +358,6 @@ async fn control_plane_is_running( async fn is_node_drainable( ha_is_enabled: bool, node_name: &str, - k8s_nodes_api: Api, rest_client: &RestClientSet, ) -> Result { if !ha_is_enabled { @@ -387,16 +366,10 @@ async fn is_node_drainable( } let ana_disabled_label = format!("{}=false", csi_node_nvme_ana()); - let ana_disabled_filter = ListParams::default().labels(ana_disabled_label.as_str()); let ana_disabled_nodes = - k8s_nodes_api - .list(&ana_disabled_filter) - .await - .context(ListNodesWithLabel { - label: ana_disabled_label, - })?; - - if ana_disabled_nodes.items.is_empty() { + KubeClient::list_nodes_metadata(Some(ana_disabled_label), None).await?; + + if ana_disabled_nodes.is_empty() { info!( "There are no ANA-incapable nodes in this cluster, it is safe to drain node {}", node_name @@ -416,7 +389,7 @@ async fn is_node_drainable( /// ANA-incapability. async fn frontend_nodes_ana_check( node_name: &str, - ana_disabled_nodes: ObjectList, + ana_disabled_nodes: Vec>, rest_client: &RestClientSet, ) -> Result { let volumes = list_all_volumes(rest_client).await?; diff --git a/k8s/upgrade/src/bin/upgrade-job/upgrade/path.rs b/k8s/upgrade/src/bin/upgrade-job/upgrade/path.rs index 00118884b..432f2c9d8 100644 --- a/k8s/upgrade/src/bin/upgrade-job/upgrade/path.rs +++ b/k8s/upgrade/src/bin/upgrade-job/upgrade/path.rs @@ -1,20 +1,17 @@ use crate::{ common::{ - constants::helm_release_version_key, + constants::CORE_CHART_NAME, error::{ - ListDeploymentsWithLabel, NoRestDeployment, NoVersionLabelInDeployment, ReadingFile, - Result, SemverParse, YamlParseBufferForUnsupportedVersion, YamlParseFromFile, + HelmChartNameSplit, InvalidDependencyVersionInHelmReleaseData, ReadingFile, Result, + SemverParse, YamlParseBufferForUnsupportedVersion, YamlParseFromFile, }, - kube_client as KubeClient, }, - helm::chart::Chart, + helm::{chart::Chart, client::HelmReleaseClient}, }; -use kube_client::{api::ListParams, ResourceExt}; use semver::Version; use serde::Deserialize; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use std::{fs, path::PathBuf}; -use utils::API_REST_LABEL; /// Validates the upgrade path from 'from' Version to 'to' Version for the Core helm chart. pub(crate) fn is_valid_for_core_chart(from: &Version) -> Result { @@ -37,57 +34,41 @@ pub(crate) fn version_from_chart_yaml_file(path: PathBuf) -> Result { Ok(to_chart.version().clone()) } -/// Generate a semver::Version from the CHART_VERSION_LABEL_KEY label on the Storage REST API -/// Deployment. -pub(crate) async fn version_from_rest_deployment_label(ns: &str) -> Result { - let labels = format!("{API_REST_LABEL},{}", helm_release_version_key()); - - let deployments_api = KubeClient::deployments_api(ns).await?; - let mut deploy_list = deployments_api - .list(&ListParams::default().labels(labels.as_str())) - .await - .context(ListDeploymentsWithLabel { - namespace: ns.to_string(), - label_selector: labels.clone(), - })?; - - ensure!( - !deploy_list.items.is_empty(), - NoRestDeployment { - namespace: ns.to_string(), - label_selector: labels - } - ); - - // The most recent one sits on top. - deploy_list - .items - .sort_by_key(|b| std::cmp::Reverse(b.creation_timestamp())); - - // The only ways there could be more than one version of the Storage REST API Pod in the - // namespace are: 1. More than one version of the Storage cluster is deployed, by means of - // multiple helm charts or otherwise This will never come to a stable state, as some - // of the components will be trying to claim the same resources. So, in this case - // the Storage cluster isn't broken because of upgrade-job. Upgrade should - // eventually fail for these cases, because the component containers keep erroring out. - // 2. Helm upgrade is stuck with the older REST API Pod in 'Terminating' state: This scenario is - // more likely than the one above. This may result is more-than-one - // REST API deployments. If the helm upgrade has succeeded already, we'd want to hit - // the 'already_upgraded' case in crate::helm::upgrade. The upgraded version will be - // on the latest-created REST API deployment. - let deploy = &deploy_list.items[0]; - let deploy_version = deploy.labels().get(&helm_release_version_key()).ok_or( - NoVersionLabelInDeployment { - deployment_name: deploy.name_any(), - namespace: ns.to_string(), +/// Generate a semver::Version from the 'chart' member of the Helm chart's ReleaseElement. +/// The output of `helm ls -n -o yaml` is a list of ReleaseElements. +pub(crate) fn version_from_core_chart_release(chart_name: &str) -> Result { + let delimiter: char = '-'; + // e.g. -1.2.3-rc.5 -- here the 2nd chunk is the version + let (_, version) = chart_name.split_once(delimiter).ok_or( + HelmChartNameSplit { + chart_name: chart_name.to_string(), + delimiter, } .build(), )?; - Version::parse(deploy_version.as_str()).context(SemverParse { - version_string: deploy_version.clone(), + + Version::parse(version).context(SemverParse { + version_string: version.to_string(), }) } +pub(crate) async fn core_version_from_umbrella_release( + client: &HelmReleaseClient, + release_name: &str, +) -> Result { + let deps = client.get_dependencies(release_name).await?; + deps.into_iter() + .find_map(|dep| { + dep.name() + .eq(CORE_CHART_NAME) + .then_some(dep.version()) + .flatten() + // Parse the String into a semver::Version. + .and_then(|version| Version::parse(version.as_str()).ok()) + }) + .ok_or(InvalidDependencyVersionInHelmReleaseData.build()) +} + /// Struct to deserialize the unsupported version yaml. #[derive(Deserialize)] struct UnsupportedVersions { diff --git a/k8s/upgrade/src/plugin/objects.rs b/k8s/upgrade/src/plugin/objects.rs index 6472979a1..6d6ba4b83 100644 --- a/k8s/upgrade/src/plugin/objects.rs +++ b/k8s/upgrade/src/plugin/objects.rs @@ -17,7 +17,7 @@ use k8s_openapi::api::{ }, rbac::v1::{ClusterRole, ClusterRoleBinding, PolicyRule, RoleRef, Subject}, }; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, env}; use kube::core::ObjectMeta; use maplit::btreemap; @@ -76,7 +76,14 @@ pub(crate) fn upgrade_job_cluster_role( PolicyRule { api_groups: Some(vec!["apps"].into_vec()), resources: Some( - vec!["daemonsets", "replicasets", "statefulsets", "deployments"].into_vec(), + vec![ + "controllerrevisions", + "daemonsets", + "replicasets", + "statefulsets", + "deployments", + ] + .into_vec(), ), verbs: vec!["create", "delete", "get", "list", "patch"].into_vec(), ..Default::default() @@ -294,6 +301,9 @@ pub(crate) fn upgrade_job( if args.skip_upgrade_path_validation_for_unsupported_version { job_args.push("--skip-upgrade-path-validation".to_string()); } + if args.reset_then_reuse_values { + job_args.push("--helm-reset-then-reuse-values".to_string()); + } Job { metadata: ObjectMeta { @@ -324,7 +334,7 @@ pub(crate) fn upgrade_job( env: Some(vec![ EnvVar { name: "RUST_LOG".to_string(), - value: Some("info".to_string()), + value: Some(env::var("RUST_LOG").unwrap_or("info".to_string())), ..Default::default() }, EnvVar { @@ -338,6 +348,12 @@ pub(crate) fn upgrade_job( }), ..Default::default() }, + EnvVar { + // Ref: https://github.com/helm/helm/blob/main/cmd/helm/helm.go#L76 + name: "HELM_DRIVER".to_string(), + value: env::var("HELM_DRIVER").ok(), + ..Default::default() + }, ]), liveness_probe: Some(Probe { exec: Some(ExecAction { diff --git a/k8s/upgrade/src/plugin/upgrade.rs b/k8s/upgrade/src/plugin/upgrade.rs index 2556c5f59..b5d4d5abb 100644 --- a/k8s/upgrade/src/plugin/upgrade.rs +++ b/k8s/upgrade/src/plugin/upgrade.rs @@ -121,6 +121,10 @@ pub struct UpgradeArgs { /// (can specify multiple or separate values with commas: key1=path1,key2=path2). #[clap(global = true, long)] pub set_file: Vec, + + /// Use helm's --reset-then-reuse-values option. + #[arg(long, default_value_t = false)] + pub reset_then_reuse_values: bool, } impl Default for UpgradeArgs { @@ -143,6 +147,7 @@ impl UpgradeArgs { skip_upgrade_path_validation_for_unsupported_version: false, set: Default::default(), set_file: Default::default(), + reset_then_reuse_values: false, } } /// Upgrade the resources.