diff --git a/k8s/supportability/src/collect/logs/mod.rs b/k8s/supportability/src/collect/logs/mod.rs index 7e5d0f815..5ec5715df 100644 --- a/k8s/supportability/src/collect/logs/mod.rs +++ b/k8s/supportability/src/collect/logs/mod.rs @@ -14,8 +14,12 @@ use crate::collect::{ utils::log, }; use async_trait::async_trait; -use k8s_openapi::api::core::v1::Pod; -use std::{collections::HashSet, iter::Iterator, path::PathBuf}; +use k8s_openapi::api::core::v1::{Node, Pod}; +use std::{ + collections::{HashMap, HashSet}, + iter::Iterator, + path::PathBuf, +}; /// Error that can occur while interacting with logs module #[derive(Debug)] @@ -109,95 +113,111 @@ impl LogCollection { })) } - async fn get_logging_resources( + async fn pod_logging_resources( &self, - pods: Vec, + pod: Pod, + nodes_map: &HashMap, ) -> Result, LogError> { - let nodes_map = self - .k8s_logger_client - .get_k8s_clientset() - .get_nodes_map() - .await?; let mut logging_resources = HashSet::new(); - - for pod in pods { - let service_name = pod - .metadata - .labels - .as_ref() + let service_name = pod + .metadata + .labels + .as_ref() + .ok_or_else(|| { + K8sResourceError::invalid_k8s_resource_value(format!( + "No labels found in pod {:?}", + pod.metadata.name + )) + })? + .get("app") + .unwrap_or(&"".to_string()) + .clone(); + + let mut hostname = None; + if is_host_name_required(service_name.clone()) { + let node_name = pod + .spec + .clone() .ok_or_else(|| { K8sResourceError::invalid_k8s_resource_value(format!( - "No labels found in pod {:?}", + "Pod spec not found in pod {:?} resource", pod.metadata.name )) })? - .get("app") - .unwrap_or(&"".to_string()) + .node_name + .as_ref() + .ok_or_else(|| { + K8sResourceError::invalid_k8s_resource_value( + "Node name not found in running pod resource".to_string(), + ) + })? .clone(); - - let mut hostname = None; - if is_host_name_required(service_name.clone()) { - let node_name = pod - .spec - .clone() + hostname = Some( + nodes_map + .get(node_name.as_str()) .ok_or_else(|| { K8sResourceError::invalid_k8s_resource_value(format!( - "Pod spec not found in pod {:?} resource", - pod.metadata.name + "Unable to find node: {} object", + node_name.clone() )) })? - .node_name + .metadata + .labels .as_ref() .ok_or_else(|| { - K8sResourceError::invalid_k8s_resource_value( - "Node name not found in running pod resource".to_string(), - ) + K8sResourceError::invalid_k8s_resource_value(format!( + "No labels found in node {}", + node_name.clone() + )) })? - .clone(); - hostname = Some( - nodes_map - .get(node_name.as_str()) - .ok_or_else(|| { - K8sResourceError::invalid_k8s_resource_value(format!( - "Unable to find node: {} object", - node_name.clone() - )) - })? - .metadata - .labels - .as_ref() - .ok_or_else(|| { - K8sResourceError::invalid_k8s_resource_value(format!( - "No labels found in node {}", - node_name.clone() - )) - })? - .get(KUBERNETES_HOST_LABEL_KEY) - .ok_or_else(|| { - K8sResourceError::invalid_k8s_resource_value(format!( - "Hostname not found for node {}", - node_name.clone() - )) - })? - .clone(), - ); - } - // Since pod object fetched from Kube-apiserver there will be always - // spec associated to pod - let containers = pod - .spec - .ok_or_else(|| { - K8sResourceError::invalid_k8s_resource_value("Pod sepc not found".to_string()) - })? - .containers; - - for container in containers { - logging_resources.insert(LogResource { - container_name: container.name, - host_name: hostname.clone(), - label_selector: format!("app={}", service_name.clone()), - service_type: service_name.clone(), - }); + .get(KUBERNETES_HOST_LABEL_KEY) + .ok_or_else(|| { + K8sResourceError::invalid_k8s_resource_value(format!( + "Hostname not found for node {}", + node_name.clone() + )) + })? + .clone(), + ); + } + // Since pod object fetched from Kube-apiserver there will be always + // spec associated to pod + let containers = pod + .spec + .ok_or_else(|| { + K8sResourceError::invalid_k8s_resource_value("Pod spec not found".to_string()) + })? + .containers; + + for container in containers { + logging_resources.insert(LogResource { + container_name: container.name, + host_name: hostname.clone(), + label_selector: format!("app={}", service_name.clone()), + service_type: service_name.clone(), + }); + } + Ok(logging_resources) + } + + async fn get_logging_resources( + &self, + pods: Vec, + ) -> Result, LogError> { + let nodes_map = self + .k8s_logger_client + .get_k8s_clientset() + .get_nodes_map() + .await?; + let mut logging_resources = HashSet::new(); + + for pod in pods { + match self.pod_logging_resources(pod.clone(), &nodes_map).await { + Ok(resources) => logging_resources.extend(resources), + Err(error) => log(format!( + "Skipping the pod {:?} due to error: {error:?}", + pod.metadata.name + )), } } Ok(logging_resources)