Skip to content

Commit

Permalink
Add batched_sink utility for flushing sink elements together
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Apr 12, 2023
1 parent b6945d8 commit 2fdb583
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 10 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion hydro_cli/src/core/hydroflow_crate/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ impl ServerConfig {
}

ServerConfig::MergeSelect(underlying, key) => {
dbg!(underlying);
let key = *key;
underlying
.load_instantiated(
Expand Down
16 changes: 10 additions & 6 deletions hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ struct SafeCancelToken {
impl SafeCancelToken {
fn safe_cancel(&mut self) {
if let Some(token) = self.cancel_tx.take() {
eprintln!("Received cancellation, cleaning up...");
token.send(()).unwrap();
if token.send(()).is_ok() {
eprintln!("Received cancellation, cleaning up...");
}
} else {
eprintln!("Already received cancellation, please be patient!");
}
Expand All @@ -52,13 +53,16 @@ async def coroutine_to_safely_cancellable(c, cancel_token):
while True:
try:
ok, cancel = await asyncio.shield(c)
is_done = True
except asyncio.CancelledError:
cancel_token.safe_cancel()
is_done = False
if not cancel:
return ok
else:
raise asyncio.CancelledError()
if is_done:
if not cancel:
return ok
else:
raise asyncio.CancelledError()
"#,
"coro_converter",
"coro_converter",
Expand Down
4 changes: 3 additions & 1 deletion hydro_cli_examples/examples/dedalus_sender/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use hydroflow::{
tokio_stream::wrappers::IntervalStream,
util::{
Expand Down Expand Up @@ -31,7 +33,7 @@ async fn main() {
.input repeated `repeat_iter(to_repeat.iter().cloned())`
.input periodic `source_stream(periodic) -> map(|_| ())`
.input peers `repeat_iter(peers.clone()) -> map(|p| (p,))`
.async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(broadcast_sink)` `null::<(String,)>()`
.async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink_chunked(broadcast_sink, 8, Duration::from_millis(1))` `null::<(String,)>()`
broadcast@n(x) :~ repeated(x), periodic(), peers(n)
"#
Expand Down
3 changes: 2 additions & 1 deletion hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[features]
default = [ "async", "macros" ]
async = [ "futures" ]
async = [ "futures", "futures-batch" ]
macros = [ "hydroflow_macro", "hydroflow_datalog" ]
hydroflow_macro = [ "dep:hydroflow_macro" ]
hydroflow_datalog = [ "dep:hydroflow_datalog" ]
Expand Down Expand Up @@ -52,6 +52,7 @@ bincode = "1.3"
byteorder = "1.4.3"
bytes = "1.1.0"
futures = { version = "0.3", optional = true }
futures-batch = { version = "0.6.1", optional = true }
hydroflow_datalog = { optional = true, path = "../hydroflow_datalog" }
hydroflow_lang = { path = "../hydroflow_lang" }
hydroflow_macro = { optional = true, path = "../hydroflow_macro" }
Expand Down
1 change: 1 addition & 0 deletions hydroflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod util;
pub use bincode;
pub use bytes;
pub use futures;
pub use futures_batch;
pub use pusherator;
pub use rustc_hash;
pub use serde;
Expand Down
33 changes: 32 additions & 1 deletion hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ pub mod cli;

use std::net::SocketAddr;
use std::task::{Context, Poll};
use std::time::Duration;

use bincode;
use futures::Stream;
use futures::{Sink, SinkExt, Stream};
use serde::{Deserialize, Serialize};

pub fn unbounded_channel<T>() -> (
Expand Down Expand Up @@ -131,6 +132,36 @@ where
slice.sort_unstable_by(|a, b| f(a).cmp(f(b)))
}

pub fn batched_sink<I: Send + 'static, S: Sink<I> + Send + 'static>(
s: S,
cap: usize,
timeout: Duration,
) -> impl Sink<I, Error = ()> + Unpin {
let (send, recv) = tokio::sync::mpsc::unbounded_channel::<I>();

use futures::{stream, StreamExt};
use futures_batch::ChunksTimeoutStreamExt;

tokio::spawn(async move {
let recv_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(recv);
let mut batched_recv = recv_stream.chunks_timeout(cap, timeout);
let mut s = Box::pin(s);

while let Some(batch) = batched_recv.next().await {
if s.send_all(&mut stream::iter(batch).map(|v| Ok(v)))
.await
.is_err()
{
panic!("Batched sink failed")
}
}
});

Box::pin(futures::sink::unfold(send, |send, item| async move {
send.send(item).map(|_| send).map_err(|_| ())
}))
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
96 changes: 96 additions & 0 deletions hydroflow_lang/src/graph/ops/dest_sink_chunked.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use super::{make_missing_runtime_msg, FlowProperties, FlowPropertyVal};

use super::{
OperatorConstraints, OperatorInstance, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
};

use quote::quote_spanned;

/// The same as `dest_sink`, but takes two additional parameters controlling
/// when the data is actually flushed.
#[hydroflow_internalmacro::operator_docgen]
pub const DEST_SINK_CHUNKED: OperatorConstraints = OperatorConstraints {
name: "dest_sink_chunked",
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_0,
soft_range_out: RANGE_0,
num_args: 3,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
ports_inn: None,
ports_out: None,
properties: FlowProperties {
deterministic: FlowPropertyVal::Preserve,
monotonic: FlowPropertyVal::Preserve,
inconsistency_tainted: false,
},
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
hydroflow,
op_span,
ident,
op_name,
op_inst: OperatorInstance { arguments, .. },
..
},
_| {
let sink_arg = &arguments[0];
let chunk_size_arg = &arguments[1];
let chunk_delay_arg = &arguments[2];

let send_ident = wc.make_ident("item_send");
let recv_ident = wc.make_ident("item_recv");

let missing_runtime_msg = make_missing_runtime_msg(op_name);

let write_prologue = quote_spanned! {op_span=>
let (#send_ident, #recv_ident) = #root::tokio::sync::mpsc::unbounded_channel();
{
/// Function is needed so `Item` is so no ambiguity for what `Item` is used
/// when calling `.flush()`.
async fn sink_feed_flush<Sink, Item>(
recv: #root::tokio::sync::mpsc::UnboundedReceiver<Item>,
mut sink: Sink,
) where
Sink: ::std::marker::Unpin + #root::futures::Sink<Item>,
Sink::Error: ::std::fmt::Debug,
{
use #root::futures::SinkExt;
use #root::futures::StreamExt;
use #root::futures_batch::ChunksTimeoutStreamExt;

let recv_stream = #root::tokio_stream::wrappers::UnboundedReceiverStream::new(recv);
let mut batched_recv = Box::pin(recv_stream.chunks_timeout(#chunk_size_arg, #chunk_delay_arg));

while let Some(batch) = batched_recv.next().await {
for item in batch {
sink.feed(item)
.await
.expect("Error processing async sink item.");
}

sink.flush().await.expect("Failed to flush sink.");
}
}
#hydroflow
.spawn_task(sink_feed_flush(#recv_ident, #sink_arg))
.expect(#missing_runtime_msg);
}
};

let write_iterator = quote_spanned! {op_span=>
let #ident = #root::pusherator::for_each::ForEach::new(|item| {
#send_ident.send(item).expect("Failed to send async write item for processing.");
});
};

Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
..Default::default()
})
},
};
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ declare_ops![
demux::DEMUX,
dest_file::DEST_FILE,
dest_sink::DEST_SINK,
dest_sink_chunked::DEST_SINK_CHUNKED,
dest_sink_serde::DEST_SINK_SERDE,
difference::DIFFERENCE,
filter::FILTER,
Expand Down

0 comments on commit 2fdb583

Please sign in to comment.