Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
Added lustre integation test
Browse files Browse the repository at this point in the history
  • Loading branch information
breuhan committed May 17, 2024
1 parent a5ca5c6 commit 6ff4a38
Show file tree
Hide file tree
Showing 11 changed files with 4,896 additions and 114 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ jobs:
set -e
set -o pipefail
cargo llvm-cov --all-features --workspace --codecov --output-path codecov.json
- name: Measure code coverage for lustre tests
run: |
# Fail if any tests fail
set -e
set -o pipefail
cargo llvm-cov --all-features --workspace --codecov --output-path codecov_145.json --package lustre_collector --lib -- lustre_integration_tests::test_lustre_ddn145
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
Expand Down
1 change: 1 addition & 0 deletions cassettes/2.14.0_ddn145/lctl_mgs_fs_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"command":"lctl","args":["get_param","-N","mgs.*.live.*"],"stdout":"mgs.MGS.live.fs\nmgs.MGS.live.params\n","stderr":""}is.whamcloud.int: nodename nor servname provided, or not known\r\nConnection closed by UNKNOWN port 65535\r\n"}
1 change: 1 addition & 0 deletions cassettes/2.14.0_ddn145/lctl_output.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cassettes/2.14.0_ddn145/lctl_recovery_status_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"command":"lctl","args":["get_param","-N","obdfilter.*OST*.recovery_status","mdt.*MDT*.recovery_status"],"stdout":"obdfilter.fs-OST0000.recovery_status\nobdfilter.fs-OST0001.recovery_status\nmdt.fs-MDT0000.recovery_status\n","stderr":""}ection closed by UNKNOWN port 65535\r\n"}
1 change: 1 addition & 0 deletions cassettes/2.14.0_ddn145/lnetctl_show_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"command":"lnetctl","args":["net","show","-v","4"],"stdout":"net:\n - net type: lo\n local NI(s):\n - nid: 0@lo\n status: up\n statistics:\n send_count: 284847\n recv_count: 284844\n drop_count: 3\n sent_stats:\n put: 284847\n get: 0\n reply: 0\n ack: 0\n hello: 0\n received_stats:\n put: 284827\n get: 0\n reply: 0\n ack: 17\n hello: 0\n dropped_stats:\n put: 3\n get: 0\n reply: 0\n ack: 0\n hello: 0\n health stats:\n fatal_error: 0\n health value: 1000\n interrupts: 0\n dropped: 0\n aborted: 0\n no route: 0\n timeouts: 0\n error: 0\n ping_count: 0\n next_ping: 0\n tunables:\n peer_timeout: 0\n peer_credits: 0\n peer_buffer_credits: 0\n credits: 0\n lnd tunables:\n dev cpt: 0\n CPT: \"[0,1,2,3]\"\n - net type: tcp\n local NI(s):\n - nid: 10.73.20.11@tcp\n status: up\n interfaces:\n 0: bond0\n statistics:\n send_count: 445690\n recv_count: 445689\n drop_count: 3\n sent_stats:\n put: 350290\n get: 95400\n reply: 0\n ack: 0\n hello: 0\n received_stats:\n put: 350269\n get: 47699\n reply: 47701\n ack: 20\n hello: 0\n dropped_stats:\n put: 3\n get: 0\n reply: 0\n ack: 0\n hello: 0\n health stats:\n fatal_error: 0\n health value: 1000\n interrupts: 0\n dropped: 0\n aborted: 0\n no route: 0\n timeouts: 0\n error: 0\n ping_count: 0\n next_ping: 0\n tunables:\n peer_timeout: 180\n peer_credits: 8\n peer_buffer_credits: 0\n credits: 256\n lnd tunables:\n conns_per_peer: 1\n dev cpt: -1\n CPT: \"[0,1,2,3]\"\n","stderr":""}
1 change: 1 addition & 0 deletions cassettes/2.14.0_ddn145/lnetctl_stats_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"command":"lnetctl","args":["stats","show"],"stdout":"statistics:\n msgs_alloc: 0\n msgs_max: 20\n rst_alloc: 10\n errors: 0\n send_count: 730537\n resend_count: 0\n response_timeout_count: 0\n local_interrupt_count: 0\n local_dropped_count: 0\n local_aborted_count: 0\n local_no_route_count: 0\n local_timeout_count: 0\n local_error_count: 0\n remote_dropped_count: 0\n remote_error_count: 0\n remote_timeout_count: 0\n network_timeout_count: 0\n recv_count: 730533\n route_count: 0\n drop_count: 6\n send_length: 205930288\n recv_length: 192265344\n route_length: 0\n drop_length: 3120\n","stderr":""}
148 changes: 148 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ mod stats_parser;
mod time;
mod top_level_parser;
pub mod types;
pub mod utils;

pub use crate::error::LustreCollectorError;
use crate::mgs::mgs_fs_parser;
use crate::utils::{get_output, CommandMock, CommandMode};
use combine::parser::EasyParser;
pub use lnetctl_parser::parse as parse_lnetctl_output;
pub use lnetctl_parser::parse_lnetctl_stats;
pub use node_stats_parsers::{parse_cpustats_output, parse_meminfo_output};
use std::panic;
use std::path::PathBuf;
use std::thread;
use std::{io, str};
pub use types::*;

Expand Down Expand Up @@ -79,6 +85,148 @@ pub fn parse_recovery_status_output(
check_output(recovery_statuses, state)
}

fn get_lctl_output(mode: &CommandMode, path: &PathBuf) -> Result<Vec<u8>, LustreCollectorError> {
let mock = CommandMock::new("lctl_output")
.with_mode(*mode)
.with_path(path);
let mut args = vec!["get_param".to_string()];
args.extend(parser::params());
get_output("lctl", args, mock)
}

fn get_lctl_mgs_fs_output(
mode: &CommandMode,
path: &PathBuf,
) -> Result<Vec<u8>, LustreCollectorError> {
let mock = CommandMock::new("lctl_mgs_fs_output")
.with_mode(*mode)
.with_path(path);
let mut args = vec!["get_param".to_string(), "-N".to_string()];
args.extend(mgs_fs_parser::params());
get_output("lctl", args, mock)
}

fn get_recovery_status_output(
mode: &CommandMode,
path: &PathBuf,
) -> Result<Vec<u8>, LustreCollectorError> {
let mock = CommandMock::new("lctl_recovery_status_output")
.with_mode(*mode)
.with_path(path);
let mut args: Vec<String> = vec!["get_param".to_string(), "-N".to_string()];
args.extend(recovery_status_parser::params());
get_output("lctl", args, mock)
}

fn get_lnetctl_stats_output(
mode: &CommandMode,
path: &PathBuf,
) -> Result<Vec<u8>, LustreCollectorError> {
let mock = CommandMock::new("lnetctl_stats_output")
.with_mode(*mode)
.with_path(path);
get_output(
"lnetctl",
["stats", "show"]
.into_iter()
.map(|x| x.to_string())
.collect(),
mock,
)
}

fn get_lnetctl_show_output(
mode: &CommandMode,
path: &PathBuf,
) -> Result<Vec<u8>, LustreCollectorError> {
let mock = CommandMock::new("lnetctl_show_output")
.with_mode(*mode)
.with_path(path);
get_output(
"lnetctl",
["net", "show", "-v", "4"]
.into_iter()
.map(|x| x.to_string())
.collect(),
mock,
)
}

pub fn parse(mode: &CommandMode, path: &PathBuf) -> Result<Vec<Record>, LustreCollectorError> {
let mode_clone = *mode;
let path_clone = path.clone();
let handle = thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lctl_output = get_lctl_output(&mode_clone, &path_clone)?;

let lctl_record = parse_lctl_output(&lctl_output)?;

Ok(lctl_record)
});
let mode_clone = *mode;
let path_clone = path.clone();
let mgs_fs_handle = thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lctl_output = get_lctl_mgs_fs_output(&mode_clone, &path_clone)?;
let lctl_record = parse_mgs_fs_output(&lctl_output)?;

Ok(lctl_record)
});

let mode_clone = *mode;
let path_clone = path.clone();
let lnetctl_stats_handle =
thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lnetctl_stats_output = get_lnetctl_stats_output(&mode_clone, &path_clone)?;
let lnetctl_stats_record = parse_lnetctl_stats(str::from_utf8(&lnetctl_stats_output)?)?;

Ok(lnetctl_stats_record)
});

let mode_clone = *mode;
let path_clone = path.clone();
let recovery_status_handle =
thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let recovery_status_output = get_recovery_status_output(&mode_clone, &path_clone)?;
let recovery_statuses = parse_recovery_status_output(&recovery_status_output)?;

Ok(recovery_statuses)
});

let lnetctl_net_show_output = get_lnetctl_show_output(mode, path)?;

let lnetctl_net_show_stats = str::from_utf8(&lnetctl_net_show_output)
.expect("while converting 'lnetctl net show -v 4' stdout from utf8");

let mut lnet_record = parse_lnetctl_output(lnetctl_net_show_stats)
.expect("while parsing 'lnetctl net show -v 4' stats");

let mut lctl_record = match handle.join() {
Ok(r) => r?,
Err(e) => panic::resume_unwind(e),
};

let mut mgs_fs_record = match mgs_fs_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};

let mut recovery_status_records = match recovery_status_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};

let mut lnetctl_stats_record = match lnetctl_stats_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};

lctl_record.append(&mut lnet_record);
lctl_record.append(&mut mgs_fs_record);
lctl_record.append(&mut recovery_status_records);
lctl_record.append(&mut lnetctl_stats_record);

Ok(lctl_record)
}

#[cfg(test)]
mod tests {
use super::{parse_lctl_output, Record};
Expand Down
130 changes: 16 additions & 114 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
// license that can be found in the LICENSE file.

use clap::{value_parser, Arg, ValueEnum};
use lustre_collector::{
error::LustreCollectorError, mgs::mgs_fs_parser, parse_lctl_output, parse_lnetctl_output,
parse_lnetctl_stats, parse_mgs_fs_output, parse_recovery_status_output, parser,
recovery_status_parser, types::Record,
};
use lustre_collector::{error::LustreCollectorError, parse, utils::CommandMode};
use std::{
fmt, panic,
process::{Command, ExitCode},
fmt,
path::PathBuf,
process::ExitCode,
str::{self, FromStr},
thread,
};
use tracing::debug;

#[derive(ValueEnum, PartialEq, Debug, Clone, Copy)]
enum Format {
Expand Down Expand Up @@ -43,44 +38,6 @@ impl fmt::Display for Format {
}
}

fn get_lctl_output() -> Result<Vec<u8>, LustreCollectorError> {
let lctl_params = parser::params();

debug!(lctl_params = lctl_params.join(" "));

let r = Command::new("lctl")
.arg("get_param")
.args(lctl_params)
.output()?;

Ok(r.stdout)
}

fn get_lctl_mgs_fs_output() -> Result<Vec<u8>, LustreCollectorError> {
let r = Command::new("lctl")
.arg("get_param")
.arg("-N")
.args(mgs_fs_parser::params())
.output()?;

Ok(r.stdout)
}

fn get_recovery_status_output() -> Result<Vec<u8>, LustreCollectorError> {
let r = Command::new("lctl")
.arg("get_param")
.args(recovery_status_parser::params())
.output()?;

Ok(r.stdout)
}

fn get_lnetctl_stats_output() -> Result<Vec<u8>, LustreCollectorError> {
let r = Command::new("lnetctl").arg("stats").arg("show").output()?;

Ok(r.stdout)
}

fn main() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
Expand All @@ -99,86 +56,31 @@ fn run() -> Result<(), LustreCollectorError> {
.version(env!("CARGO_PKG_VERSION"))
.author("Whamcloud")
.about("Grabs various Lustre statistics for display in JSON or YAML")
.arg(
.args(vec![
Arg::new("format")
.short('f')
.long("format")
.value_parser(value_parser!(Format))
.default_value("json")
.help("Sets the output formatting"),
)
Arg::new("mode")
.short('m')
.long("mode")
.value_parser(value_parser!(CommandMode))
.default_value("none")
.help("Record/Plays the command output for integration testing"),
])
.get_matches();

let format = matches
.get_one::<Format>("format")
.expect("Required argument `format` missing");

let handle = thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lctl_output = get_lctl_output()?;

let lctl_record = parse_lctl_output(&lctl_output)?;

Ok(lctl_record)
});

let mgs_fs_handle = thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lctl_output = get_lctl_mgs_fs_output()?;
let lctl_record = parse_mgs_fs_output(&lctl_output)?;

Ok(lctl_record)
});

let lnetctl_stats_handle =
thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lnetctl_stats_output = get_lnetctl_stats_output()?;
let lnetctl_stats_record = parse_lnetctl_stats(str::from_utf8(&lnetctl_stats_output)?)?;

Ok(lnetctl_stats_record)
});

let recovery_status_handle =
thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let recovery_status_output = get_recovery_status_output()?;
let recovery_statuses = parse_recovery_status_output(&recovery_status_output)?;

Ok(recovery_statuses)
});

let lnetctl_net_show_output = Command::new("lnetctl")
.args(["net", "show", "-v", "4"])
.output()
.expect("failed to get lnetctl stats");

let lnetctl_net_show_stats = str::from_utf8(&lnetctl_net_show_output.stdout)
.expect("while converting 'lnetctl net show -v 4' stdout from utf8");

let mut lnet_record = parse_lnetctl_output(lnetctl_net_show_stats)
.expect("while parsing 'lnetctl net show -v 4' stats");

let mut lctl_record = match handle.join() {
Ok(r) => r?,
Err(e) => panic::resume_unwind(e),
};

let mut mgs_fs_record = match mgs_fs_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};

let mut recovery_status_records = match recovery_status_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};

let mut lnetctl_stats_record = match lnetctl_stats_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};
let mode = matches
.get_one::<CommandMode>("mode")
.expect("Required argument `mode` missing");

lctl_record.append(&mut lnet_record);
lctl_record.append(&mut mgs_fs_record);
lctl_record.append(&mut recovery_status_records);
lctl_record.append(&mut lnetctl_stats_record);
let lctl_record = parse(mode, &PathBuf::new())?;

let x = match format {
Format::Json => serde_json::to_string(&lctl_record)?,
Expand Down
Loading

0 comments on commit 6ff4a38

Please sign in to comment.