Skip to content

Commit

Permalink
chore(bors): merge pull request #407
Browse files Browse the repository at this point in the history
407: refactor(kubectl-plugin): tidy up error handling and command execution r=tiagolobocastro a=tiagolobocastro

Separate command execution from error handling.
Make use of common cli args and execution trait to remove redundant code. Remove excessing error mapping code.
todo: make execution trait even more generic (in the rest-plugin crate side) Currently the cli args for supportability rely on the fact that clap global allows us to specify "duplicates". I'm not sure if this is a good idea.. todo: Build dump cli args explicitly?

<!

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jan 24, 2024
2 parents 379c909 + ab912c1 commit c0e3a2f
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 416 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions k8s/plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio = { version = "1.33.0" }
anyhow = "1.0.75"
clap = { version = "4.4.6", features = ["color", "derive"] }
humantime = "2.1.0"
async-trait = "0.1.73"
# Tracing
opentelemetry = { version = "0.20.0", features = ["rt-tokio-current-thread"] }
shutdown = { path = "../../dependencies/control-plane/utils/shutdown" }
274 changes: 49 additions & 225 deletions k8s/plugin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,266 +1,89 @@
use crate::resources::GetResourcesK8s;
use anyhow::Result;
use clap::Parser;
use openapi::tower::client::Url;
use opentelemetry::global;
use plugin::{
operations::{
Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, RebuildHistory,
ReplicaTopology, Scale,
},
resources::{
blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources,
GetCordonArgs, GetDrainArgs, GetResources, ScaleResources,
},
rest_wrapper::RestClient,
};
use plugin::{rest_wrapper::RestClient, ExecuteOperation};
use resources::Operations;
use upgrade::plugin::{preflight_validations, upgrade::DeleteResources};

use std::{env, path::PathBuf};
use std::{env, ops::Deref};

mod resources;

#[derive(Parser, Debug)]
#[clap(name = utils::package_description!(), version = utils::version_info_str!())]
#[group(skip)]
struct CliArgs {
/// The rest endpoint to connect to.
#[clap(global = true, long, short)]
rest: Option<Url>,

/// Path to kubeconfig file.
#[clap(global = true, long, short = 'k')]
kube_config_path: Option<PathBuf>,

/// The operation to be performed.
#[clap(subcommand)]
operations: Operations,

/// The Output, viz yaml, json.
#[clap(global = true, default_value = plugin::resources::utils::OutputFormat::None.as_ref(), short, long)]
output: plugin::resources::utils::OutputFormat,

/// Trace rest requests to the Jaeger endpoint agent.
#[clap(global = true, long, short)]
jaeger: Option<String>,

/// Timeout for the REST operations.
#[clap(long, short, default_value = "10s")]
timeout: humantime::Duration,

/// Kubernetes namespace of mayastor service
#[clap(global = true, long, short = 'n', default_value = "mayastor")]
namespace: String,
#[clap(flatten)]
args: resources::CliArgs,
}

impl CliArgs {
fn args() -> Self {
CliArgs::parse()
}
}
impl Deref for CliArgs {
type Target = plugin::CliArgs;

#[tokio::main]
async fn main() {
plugin::init_tracing(CliArgs::args().jaeger.as_ref());

execute(CliArgs::args()).await;

global::shutdown_tracer_provider();
}

async fn execute(cli_args: CliArgs) {
// Initialise the REST client.
if let Err(e) = init_rest(&cli_args).await {
eprintln!("Failed to initialise the REST client. Error {e}");
std::process::exit(1);
fn deref(&self) -> &Self::Target {
&self.args
}
}

// Perform the operations based on the subcommand, with proper output format.
let fut = async move {
let result: std::result::Result<(), Error> = match cli_args.operations {
Operations::Get(resource) => match resource {
GetResourcesK8s::Rest(resource) => match resource {
GetResources::Cordon(get_cordon_resource) => match get_cordon_resource {
GetCordonArgs::Node { id: node_id } => {
cordon::NodeCordon::get(&node_id, &cli_args.output)
.await
.map_err(|e| e.into())
}
GetCordonArgs::Nodes => cordon::NodeCordons::list(&cli_args.output)
.await
.map_err(|e| e.into()),
},
GetResources::Drain(get_drain_resource) => match get_drain_resource {
GetDrainArgs::Node { id: node_id } => {
drain::NodeDrain::get(&node_id, &cli_args.output)
.await
.map_err(|e| e.into())
}
GetDrainArgs::Nodes => drain::NodeDrains::list(&cli_args.output)
.await
.map_err(|e| e.into()),
},
GetResources::Volumes(volume_args) => {
volume::Volumes::list(&cli_args.output, &volume_args)
.await
.map_err(|e| e.into())
}
GetResources::Volume { id } => volume::Volume::get(&id, &cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::VolumeReplicaTopologies(volume_args) => {
volume::Volume::topologies(&cli_args.output, &volume_args)
.await
.map_err(|e| e.into())
}
GetResources::VolumeReplicaTopology { id } => {
volume::Volume::topology(&id, &cli_args.output)
.await
.map_err(|e| e.into())
}
GetResources::Pools => pool::Pools::list(&cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::Pool { id } => pool::Pool::get(&id, &cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::Nodes => node::Nodes::list(&cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::Node(args) => node::Node::get(&args.node_id(), &cli_args.output)
.await
.map_err(|e| e.into()),
GetResources::BlockDevices(bdargs) => {
blockdevice::BlockDevice::get_blockdevices(
&bdargs.node_id(),
&bdargs.all(),
&cli_args.output,
)
.await
.map_err(|e| e.into())
}
GetResources::VolumeSnapshots(snapargs) => {
snapshot::VolumeSnapshots::get_snapshots(
&snapargs.volume(),
&snapargs.snapshot(),
&cli_args.output,
)
.await
.map_err(|e| e.into())
}
GetResources::RebuildHistory { id } => {
volume::Volume::rebuild_history(&id, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
GetResourcesK8s::UpgradeStatus(resources) => resources
.get_upgrade(&cli_args.namespace)
.await
.map_err(|e| e.into()),
},
Operations::Drain(resource) => match resource {
DrainResources::Node(drain_node_args) => node::Node::drain(
&drain_node_args.node_id(),
drain_node_args.label(),
drain_node_args.drain_timeout(),
&cli_args.output,
)
.await
.map_err(|e| e.into()),
},
Operations::Scale(resource) => match resource {
ScaleResources::Volume { id, replica_count } => {
volume::Volume::scale(&id, replica_count, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
Operations::Cordon(resource) => match resource {
CordonResources::Node { id, label } => {
node::Node::cordon(&id, &label, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
Operations::Uncordon(resource) => match resource {
CordonResources::Node { id, label } => {
node::Node::uncordon(&id, &label, &cli_args.output)
.await
.map_err(|e| e.into())
}
},
Operations::Dump(resources) => {
resources
.dump(cli_args.kube_config_path)
.await
.map_err(|e| {
println!("Partially collected dump information: ");
e.into()
})
#[tokio::main]
async fn main() {
let cli_args = CliArgs::args();
let _tracer_flusher = cli_args.init_tracing();

if let Err(error) = cli_args.execute().await {
let mut exit_code = 1;
match error {
Error::RestPlugin(error) => eprintln!("{error}"),
Error::RestClient(error) => {
eprintln!("Failed to initialise the REST client. Error {error}")
}
Operations::Upgrade(resources) => {
match preflight_validations::preflight_check(
&cli_args.namespace,
cli_args.kube_config_path.clone(),
cli_args.timeout,
&resources,
)
.await
{
Ok(_) => {
if resources.dry_run {
resources
.dummy_apply(&cli_args.namespace)
.await
.map_err(|e| e.into())
} else {
resources
.apply(&cli_args.namespace)
.await
.map_err(|e| e.into())
}
}
Err(e) => Err(e.into()),
}
Error::Upgrade(error) => {
eprintln!("{error}");
exit_code = error.into();
}
Error::Generic(error) => eprintln!("{error}"),
}
std::process::exit(exit_code);
}
}

Operations::Delete(resource) => match resource {
DeleteResources::Upgrade(res) => {
res.delete(&cli_args.namespace).await.map_err(|e| e.into())
}
},
};
impl CliArgs {
async fn execute(self) -> Result<(), Error> {
// Initialise the REST client.
init_rest(&self).await?;

if let Err(error) = result {
let mut exit_code = 1;
match error {
Error::RestPlugin(err) => eprintln!("{err}"),
Error::Upgrade(err) => {
eprintln!("{err}");
exit_code = err.into();
}
Error::Generic(err) => eprintln!("{err}"),
tokio::select! {
shutdown = shutdown::Shutdown::wait_sig() => {
Err(anyhow::anyhow!("Interrupted by {shutdown:?}").into())
},
done = self.operations.execute(&self.args) => {
done
}
std::process::exit(exit_code);
};
};

tokio::select! {
_shutdown = shutdown::Shutdown::wait_sig() => {},
_done = fut => {}
}
}
}

/// Initialise the REST client.
async fn init_rest(args: &CliArgs) -> Result<()> {
async fn init_rest(cli_args: &CliArgs) -> Result<(), Error> {
// Use the supplied URL if there is one otherwise obtain one from the kubeconfig file.
match args.rest.clone() {
Some(url) => RestClient::init(url, *args.timeout),
match cli_args.rest.clone() {
Some(url) => RestClient::init(url, *cli_args.timeout).map_err(Error::RestClient),
None => {
let config = kube_proxy::ConfigBuilder::default_api_rest()
.with_kube_config(args.kube_config_path.clone())
.with_timeout(*args.timeout)
.with_target_mod(|t| t.with_namespace(&args.namespace))
.with_kube_config(cli_args.args.kube_config_path.clone())
.with_timeout(*cli_args.timeout)
.with_target_mod(|t| t.with_namespace(&cli_args.args.namespace))
.build()
.await?;
RestClient::init_with_config(config)?;
Expand All @@ -269,9 +92,10 @@ async fn init_rest(args: &CliArgs) -> Result<()> {
}
}

enum Error {
pub enum Error {
Upgrade(upgrade::error::Error),
RestPlugin(plugin::resources::error::Error),
RestClient(anyhow::Error),
Generic(anyhow::Error),
}

Expand Down
Loading

0 comments on commit c0e3a2f

Please sign in to comment.