From 887e0e5da56054ebce18b0bfe4561293f8cac87e Mon Sep 17 00:00:00 2001 From: Jarrod Overson Date: Mon, 12 Sep 2022 16:13:39 -0400 Subject: [PATCH] fixed shutdown_gracefully execution in wafl run --- crates/bins/wasmflow/src/commands/run.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/bins/wasmflow/src/commands/run.rs b/crates/bins/wasmflow/src/commands/run.rs index 85633bf0c..891f73893 100644 --- a/crates/bins/wasmflow/src/commands/run.rs +++ b/crates/bins/wasmflow/src/commands/run.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::SystemTime; use anyhow::Result; @@ -56,20 +57,20 @@ pub(crate) async fn handle_command(opts: RunCommand) -> Result<()> { inherent_data, }; - let mut channels: Vec> = vec![]; + let mut channels: Vec>> = vec![]; for (name, channel_config) in app_config.channels { debug!("Loading channel {}", name); match wasmflow_runtime::configuration::get_channel_loader(&channel_config.uses) { - Some(loader) => channels.push(loader(context.clone(), channel_config.with)?), + Some(loader) => channels.push(Arc::new(loader(context.clone(), channel_config.with)?)), _ => bail!("could not find channel {}", &channel_config.uses), }; } let mut tasks: Vec>> = vec![]; - for channel in channels { + for channel in &channels { + let task = channel.clone(); let task = tokio::spawn(async move { - channel.run().await?; - // what to do after? + task.run().await?; Ok::<(), anyhow::Error>(()) }); tasks.push(task); @@ -77,12 +78,9 @@ pub(crate) async fn handle_command(opts: RunCommand) -> Result<()> { let (item_resolved, _ready_future_index, _remaining_futures) = futures::future::select_all(tasks).await; - // TODO: Figure out borrow issue here. - // for channel in channels { - // channel.shutdown_gracefully().await; - // } + for channel in channels { + channel.shutdown_gracefully().await?; + } item_resolved? - - // Ok(()) }