Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RingChannel sync/async/blocking/non-blocking #903

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ async fn main() {
println!("Sending Query '{selector}'...");
let replies = session
.get(&selector)
// // By default get receives replies from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
.with(zenoh::handlers::RingChannel::default())
.value(value)
.target(target)
.timeout(timeout)
Expand Down
56 changes: 40 additions & 16 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use clap::Parser;
use std::time::Duration;
use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*};
use zenoh::{config::Config, handlers::RingChannel, prelude::r#async::*};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -29,32 +29,27 @@ async fn main() {
println!("Declaring Subscriber on '{key_expr}'...");
let subscriber = session
.declare_subscriber(&key_expr)
.with(RingBuffer::new(size))
.with(RingChannel::new(size))
.res()
.await
.unwrap();

println!(
"Pulling data every {:#?} seconds. Press CTRL-C to quit...",
interval
);
println!("Press CTRL-C to quit...");

// Blocking recv. If the ring is empty, wait for the first sample to arrive.
loop {
match subscriber.recv() {
Ok(Some(sample)) => {
// Use .recv() for the synchronous version.
match subscriber.recv_async().await {
Ok(sample) => {
let payload = sample
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Pulled {} ('{}': '{}')",
">> [Subscriber] Pulled {} ('{}': '{}')... performing a computation of {:#?}",
sample.kind(),
sample.key_expr().as_str(),
payload,
);
}
Ok(None) => {
println!(
">> [Subscriber] Pulled nothing... sleep for {:#?}",
interval
);
tokio::time::sleep(interval).await;
Expand All @@ -65,6 +60,35 @@ async fn main() {
}
}
}

// Non-blocking recv. This can be usually used to implement a polling mechanism.
// loop {
// match subscriber.try_recv() {
// Ok(Some(sample)) => {
// let payload = sample
// .payload()
// .deserialize::<String>()
// .unwrap_or_else(|e| format!("{}", e));
// println!(
// ">> [Subscriber] Pulled {} ('{}': '{}')",
// sample.kind(),
// sample.key_expr().as_str(),
// payload,
// );
// }
// Ok(None) => {
// println!(
// ">> [Subscriber] Pulled nothing... sleep for {:#?}",
// interval
// );
// tokio::time::sleep(interval).await;
// }
// Err(e) => {
// println!(">> [Subscriber] Pull error: {e}");
// return;
// }
// }
// }
}

#[derive(clap::Parser, Clone, PartialEq, Debug)]
Expand All @@ -73,10 +97,10 @@ struct SubArgs {
/// The Key Expression to subscribe to.
key: KeyExpr<'static>,
/// The size of the ringbuffer.
#[arg(long, default_value = "3")]
#[arg(short, long, default_value = "3")]
size: usize,
/// The interval for pulling the ringbuffer.
#[arg(long, default_value = "5.0")]
#[arg(short, long, default_value = "5.0")]
interval: f32,
#[command(flatten)]
common: CommonArgs,
Expand Down
4 changes: 4 additions & 0 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ async fn main() {
println!("Declaring Queryable on '{key_expr}'...");
let queryable = session
.declare_queryable(&key_expr)
// // By default queryable receives queries from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
// .with(zenoh::handlers::RingChannel::default())
.complete(complete)
.res()
.await
Expand Down
Loading
Loading