Skip to content

Commit

Permalink
fixed shutdown_gracefully execution in wafl run
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson committed Sep 28, 2022
1 parent 93e5050 commit 887e0e5
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions crates/bins/wasmflow/src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::SystemTime;

use anyhow::Result;
Expand Down Expand Up @@ -56,33 +57,30 @@ pub(crate) async fn handle_command(opts: RunCommand) -> Result<()> {
inherent_data,
};

let mut channels: Vec<Box<dyn Channel + Send + Sync>> = vec![];
let mut channels: Vec<Arc<Box<dyn Channel + Send + Sync>>> = vec![];
for (name, channel_config) in app_config.channels {
debug!("Loading channel {}", name);
match wasmflow_runtime::configuration::get_channel_loader(&channel_config.uses) {
Some(loader) => channels.push(loader(context.clone(), channel_config.with)?),
Some(loader) => channels.push(Arc::new(loader(context.clone(), channel_config.with)?)),
_ => bail!("could not find channel {}", &channel_config.uses),
};
}

let mut tasks: Vec<JoinHandle<Result<(), anyhow::Error>>> = vec![];
for channel in channels {
for channel in &channels {
let task = channel.clone();
let task = tokio::spawn(async move {
channel.run().await?;
// what to do after?
task.run().await?;
Ok::<(), anyhow::Error>(())
});
tasks.push(task);
}

let (item_resolved, _ready_future_index, _remaining_futures) = futures::future::select_all(tasks).await;

// TODO: Figure out borrow issue here.
// for channel in channels {
// channel.shutdown_gracefully().await;
// }
for channel in channels {
channel.shutdown_gracefully().await?;
}

item_resolved?

// Ok(())
}

0 comments on commit 887e0e5

Please sign in to comment.