Skip to content

Commit

Permalink
chore(bors): merge pull request #570
Browse files Browse the repository at this point in the history
570: Backport fixes to release/2.7 r=tiagolobocastro a=tiagolobocastro

    build: update controller dep
    
    Signed-off-by: Tiago Castro <[email protected]>



Co-authored-by: mayastor-bors <[email protected]>
Co-authored-by: Niladri Halder <[email protected]>
Co-authored-by: Mike Beaumont <[email protected]>
Co-authored-by: Oleksandr Shalbanov <[email protected]>
Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
5 people committed Nov 27, 2024
2 parents a66e394 + 14a93f0 commit 6c67a93
Show file tree
Hide file tree
Showing 34 changed files with 1,274 additions and 581 deletions.
26 changes: 22 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions call-home/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ once_cell = "1.18.0"
bytes = "1.5.0"
utils = { path = "../dependencies/control-plane/utils/utils-lib" }
events-api = { path = "../dependencies/control-plane/utils/dependencies/apis/events" }
tokio-retry = "0.3"

# exporter
actix-web = { version = "4.4.0", features = ["rustls-0_21"] }
Expand Down
20 changes: 15 additions & 5 deletions call-home/src/bin/callhome/collector/report_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,19 +489,29 @@ impl Percentiles {
#[serde(rename_all = "camelCase")]
pub(crate) struct Report {
pub(crate) k8s_cluster_id: String,
pub(crate) k8s_node_count: u8,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) k8s_node_count: Option<u8>,
#[serde(skip_serializing_if = "String::is_empty")]
pub(crate) product_name: String,
#[serde(skip_serializing_if = "String::is_empty")]
pub(crate) product_version: String,
#[serde(skip_serializing_if = "String::is_empty")]
pub(crate) deploy_namespace: String,
pub(crate) storage_node_count: u8,
pub(crate) pools: Pools,
pub(crate) volumes: Volumes,
pub(crate) replicas: Replicas,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) storage_node_count: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) pools: Option<Pools>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) volumes: Option<Volumes>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) replicas: Option<Replicas>,
pub(crate) nexus: Nexus,
pub(crate) versions: Versions,
pub(crate) storage_nodes: StorageNodes,
pub(crate) mayastor_managed_disks: MayastorManagedDisks,
pub(crate) storage_media: StorageMedia,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub(crate) logs: Vec<String>,
}

/// Get maximum value from a vector.
Expand Down
180 changes: 153 additions & 27 deletions call-home/src/bin/callhome/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ use collector::report_models::{MayastorManagedDisks, Nexus, StorageMedia, Storag
use obs::common::constants::*;
use openapi::tower::client::{ApiClient, Configuration};
use sha256::digest;
use std::{collections::HashMap, time};
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Mutex},
time::Duration,
};
use tokio::time::sleep;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use tokio_retry::{
strategy::{jitter, ExponentialBackoff},
Retry,
};
use tracing::{error, info, Event, Level, Subscriber};
use tracing_subscriber::{
layer::Context, prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer,
};
use url::Url;
use utils::{package_description, version_info_str};

Expand Down Expand Up @@ -49,19 +59,27 @@ impl CliArgs {
}
}

const ERR_LOG_BUF_CAPACITY: usize = 100;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let logs = Arc::new(Mutex::new(VecDeque::with_capacity(ERR_LOG_BUF_CAPACITY)));
let vec_layer = LogsLayer::new(logs.clone());

let subscriber = tracing_subscriber::Registry::default()
.with(vec_layer)
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env());

if let Err(error) = run().await {
tracing::subscriber::set_global_default(subscriber).expect("setting tracing default failed");

if let Err(error) = run(logs).await {
error!(?error, "failed call-home");
std::process::exit(1);
}
}

async fn run() -> anyhow::Result<()> {
async fn run(logs: Arc<Mutex<VecDeque<LogEntry>>>) -> anyhow::Result<()> {
let args = CliArgs::args();
let version = release_version();
let endpoint = args.endpoint;
Expand Down Expand Up @@ -91,7 +109,7 @@ async fn run() -> anyhow::Result<()> {

// Generate Mayastor REST client.
let config = Configuration::builder()
.with_timeout(time::Duration::from_secs(30))
.with_timeout(Duration::from_secs(30))
.with_tracing(true)
.build_url(endpoint)
.map_err(|error| anyhow::anyhow!("failed to create openapi configuration: {:?}", error))?;
Expand All @@ -106,6 +124,7 @@ async fn run() -> anyhow::Result<()> {
namespace.clone(),
version.clone(),
aggregator_url.clone(),
logs.clone(),
)
.await;

Expand Down Expand Up @@ -138,7 +157,12 @@ async fn generate_report(
deploy_namespace: String,
product_version: String,
aggregator_url: Option<Url>,
logs: Arc<Mutex<VecDeque<LogEntry>>>,
) -> Report {
let retry_strategy = ExponentialBackoff::from_millis(100)
.map(jitter) // add jitter to delays
.take(5); // retry up to 5 times

let mut report = Report {
product_name: product(),
k8s_cluster_id,
Expand Down Expand Up @@ -172,58 +196,74 @@ async fn generate_report(

let k8s_node_count = k8s_client.get_node_len().await;
match k8s_node_count {
Ok(k8s_node_count) => report.k8s_node_count = k8s_node_count as u8,
Ok(k8s_node_count) => report.k8s_node_count = Some(k8s_node_count as u8),
Err(err) => {
error!("{:?}", err);
}
};

// List of disks on each node.
let mut node_disks = HashMap::new();
let nodes = http_client.nodes_api().get_nodes(None).await;
let nodes = Retry::spawn(retry_strategy.clone(), || async {
http_client.nodes_api().get_nodes(None).await
})
.await;
match nodes {
Ok(nodes) => {
let nodes = nodes.into_body();
for node in &nodes {
if let Ok(b_devs_result) = http_client
.block_devices_api()
.get_node_block_devices(&node.id, Some(true))
.await
{
let b_devs_result = Retry::spawn(retry_strategy.clone(), || async {
http_client
.block_devices_api()
.get_node_block_devices(&node.id, Some(true))
.await
})
.await;
if let Ok(b_devs) = b_devs_result {
node_disks
.entry(node.id.to_string())
.or_insert_with(Vec::new)
.extend(b_devs_result.into_body());
.extend(b_devs.into_body());
};
}
report.storage_node_count = nodes.len() as u8
report.storage_node_count = Some(nodes.len() as u8)
}
Err(err) => {
error!("{:?}", err);
}
};

let pools = http_client.pools_api().get_pools(None).await;
let pools = Retry::spawn(retry_strategy.clone(), || async {
http_client.pools_api().get_pools(None).await
})
.await;
match pools {
Ok(ref pools) => report.pools = Pools::new(pools.clone().into_body(), event_data.clone()),
Ok(ref pools) => {
report.pools = Some(Pools::new(pools.clone().into_body(), event_data.clone()))
}
Err(ref err) => {
error!("{:?}", err);
}
};

let volumes = list_all_volumes(&http_client)
.await
.map_err(|error| error!("Failed to list all volumes: {error:?}"))
.ok();
let volumes = Retry::spawn(retry_strategy.clone(), || async {
list_all_volumes(&http_client).await
})
.await
.map_err(|error| error!("Failed to list all volumes: {error:?}"))
.ok();

if let Some(volumes) = &volumes {
report.volumes = Volumes::new(volumes.clone(), event_data.clone());
report.volumes = Some(Volumes::new(volumes.clone(), event_data.clone()));
}

let replicas = http_client.replicas_api().get_replicas().await;
let replicas = Retry::spawn(retry_strategy, || async {
http_client.replicas_api().get_replicas().await
})
.await;
match replicas {
Ok(ref replicas) => {
report.replicas = Replicas::new(replicas.clone().into_body().len(), volumes)
report.replicas = Some(Replicas::new(replicas.clone().into_body().len(), volumes))
}
Err(ref err) => {
error!("{:?}", err);
Expand Down Expand Up @@ -256,5 +296,91 @@ async fn generate_report(
report.storage_media = StorageMedia::new(valid_disks);

report.nexus = Nexus::new(event_data);

let mut logs = logs.lock().unwrap();
for log in logs.iter() {
let log_string = format!(
"[{} {} {}:{}] {}",
log.timestamp, log.level, log.file, log.line, log.message
);
report.logs.push(log_string);
}
// Clear logs, as these entries are no longer needed after updating the report.
logs.clear();
report
}

// Define the LogsLayer
struct LogsLayer {
logs: Arc<Mutex<VecDeque<LogEntry>>>,
}

impl LogsLayer {
fn new(logs: Arc<Mutex<VecDeque<LogEntry>>>) -> Self {
LogsLayer { logs }
}
}

impl<S> Layer<S> for LogsLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event, _ctx: Context<S>) {
let timestamp = chrono::Utc::now().to_rfc3339();
let level = event.metadata().level();
let file = event.metadata().file().unwrap_or("").to_string();
let line = event.metadata().line().unwrap_or(0);

let mut visitor = LogVisitor::new();
event.record(&mut visitor);

let log_entry = LogEntry {
timestamp,
level: level.to_string(),
file,
line,
message: visitor.log,
};

// Only capture logs of level ERROR
if level == &Level::ERROR {
let mut logs = self.logs.lock().unwrap();
log_with_limit(&mut logs, log_entry, ERR_LOG_BUF_CAPACITY);
}
}
}

struct LogVisitor {
log: String,
}

impl LogVisitor {
fn new() -> Self {
LogVisitor { log: String::new() }
}
}

impl tracing::field::Visit for LogVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.log
.push_str(&format!("{}: {:?};", field.name(), value));
}
}

#[derive(Debug, serde::Serialize)]
struct LogEntry {
timestamp: String,
level: String,
file: String,
line: u32,
message: String,
}

/// Ensures the length of a vector is no more than the input limit, and removes elements from
/// the front if it goes over the limit.
fn log_with_limit<T>(logs: &mut VecDeque<T>, message: T, limit: usize) {
while logs.len().ge(&limit) {
logs.pop_front();
}
logs.push_back(message);
}
1 change: 1 addition & 0 deletions chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- name: etcd
repository: https://charts.bitnami.com/bitnami
version: 8.6.0
condition: etcd.enabled
- name: jaeger-operator
version: 2.50.1
repository: https://jaegertracing.github.io/helm-charts
Expand Down
Loading

0 comments on commit 6c67a93

Please sign in to comment.