Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into anna_v2
Browse files Browse the repository at this point in the history
# Conflicts:
#	Cargo.lock
  • Loading branch information
rohitkulshreshtha committed Oct 29, 2024
2 parents cf64859 + f7e740f commit 206c2a1
Show file tree
Hide file tree
Showing 72 changed files with 5,098 additions and 8,248 deletions.
205 changes: 83 additions & 122 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"stageleft_tool",
"topolotree",
"variadics",
"variadics_macro",
"website_playground",
]

Expand Down
6 changes: 3 additions & 3 deletions docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ stream.for_each(q!(|x| println!("{}", x)))
## Sending Data
Because clusters represent a set of instances, adding networking requires us to specify _which_ instance(s) to send data to. Clusters provide different types depending on if the source or receiver is a cluster or a process.

Elements in a cluster are identified by a **cluster ID** (a `u32`). To get the IDs of all instances in a cluster, use the `ids` method on cluster, which returns a runtime expression of type `&Vec<u32>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector.
Elements in a cluster are identified by a **cluster ID** (a `ClusterId<C>` where `C` is the typetag of the cluster). To get the IDs of all instances in a cluster, use the `members` method on cluster, which returns a runtime expression of type `&Vec<ClusterId<_>>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector.

This can then be passed into `source_iter` to load the IDs into the graph.
```rust
let stream = process.source_iter(cluster.members()).cloned();
```

### One-to-Many
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(u32, T)` and sends each `T` element to the instance with the matching `u32` ID.
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(ClusterId<_>, T)` and sends each `T` element to the instance with the matching ID.

This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using `enumerate` to add a sequence number to each element, then using `send_bincode` to send each element to the instance with the matching sequence number:
```rust
let cluster_ids = cluster.members();
let stream = process.source_iter(q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
i % cluster_ids.len() as u32,
ClusterId::from_raw(i % cluster_ids.len() as u32),
x
)))
.send_bincode(&cluster);
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/compiled/pull/anti_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ where
pos_state: &'a mut FxHashSet<(Key, V)>,
}

impl<'a, Key, V, Ipos> Iterator for AntiJoin<'a, Key, V, Ipos>
impl<Key, V, Ipos> Iterator for AntiJoin<'_, Key, V, Ipos>
where
Key: Eq + std::hash::Hash + Clone,
V: Eq + std::hash::Hash + Clone,
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/compiled/pull/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
state: &'a mut CrossJoinState<V1, V2>,
}

impl<'a, I1, V1: 'static, I2, V2: 'static> Iterator for CrossJoin<'a, I1, V1, I2, V2>
impl<I1, V1: 'static, I2, V2: 'static> Iterator for CrossJoin<'_, I1, V1, I2, V2>
where
V1: Eq + Clone,
V2: Eq + Clone,
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/src/compiled/pull/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ where
rhs_state: &'a mut RhsState,
}

impl<'a, Key, I1, V1, I2, V2, LhsState, RhsState> Iterator
for SymmetricHashJoin<'a, Key, I1, V1, I2, V2, LhsState, RhsState>
impl<Key, I1, V1, I2, V2, LhsState, RhsState> Iterator
for SymmetricHashJoin<'_, Key, I1, V1, I2, V2, LhsState, RhsState>
where
Key: Eq + std::hash::Hash + Clone,
V1: Clone,
Expand Down
8 changes: 4 additions & 4 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct Hydroflow<'a> {
/// See [`Self::diagnostics()`].
diagnostics: Option<Vec<Diagnostic<SerdeSpan>>>,
}
impl<'a> Default for Hydroflow<'a> {
impl Default for Hydroflow<'_> {
fn default() -> Self {
let stratum_queues = vec![Default::default()]; // Always initialize stratum #0.
let (event_queue_send, event_queue_recv) = mpsc::unbounded_channel();
Expand All @@ -69,7 +69,7 @@ impl<'a> Default for Hydroflow<'a> {
}

/// Methods for [`TeeingHandoff`] teeing and dropping.
impl<'a> Hydroflow<'a> {
impl Hydroflow<'_> {
/// Tees a [`TeeingHandoff`].
pub fn teeing_handoff_tee<T>(
&mut self,
Expand Down Expand Up @@ -794,7 +794,7 @@ impl<'a> Hydroflow<'a> {
}
}

impl<'a> Hydroflow<'a> {
impl Hydroflow<'_> {
/// Alias for [`Context::request_task`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Expand All @@ -814,7 +814,7 @@ impl<'a> Hydroflow<'a> {
}
}

impl<'a> Drop for Hydroflow<'a> {
impl Drop for Hydroflow<'_> {
fn drop(&mut self) {
self.abort_tasks();
}
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/graph_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub trait GraphExt {
W: 'static + Handoff + CanReceive<T>;
}

impl<'a> GraphExt for Hydroflow<'a> {
impl GraphExt for Hydroflow<'_> {
subgraph_ext!(impl add_subgraph_sink, (recv_port: R), ());
subgraph_ext!(
impl add_subgraph_2sink,
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/scheduled/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Message {
}
}

impl<'a> Hydroflow<'a> {
impl Hydroflow<'_> {
fn register_read_tcp_stream(&mut self, reader: OwnedReadHalf) -> RecvPort<VecHandoff<Message>> {
let reader = FramedRead::new(reader, LengthDelimitedCodec::new());
let (send_port, recv_port) = self.make_edge("tcp ingress handoff");
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/src/util/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
})
}

impl<'context> TransducerBuilderContext<'context> {
impl TransducerBuilderContext<'_> {
/// Create a new inbox on the host with the given interface name. Returns a stream that can
/// be read by the transducer using the source_stream hydroflow operator.
pub fn new_inbox<T: 'static>(
Expand Down
207 changes: 130 additions & 77 deletions hydroflow/src/util/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#![cfg(not(target_arch = "wasm32"))]

use std::cell::RefCell;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::pin::pin;
use std::rc::Rc;

use futures::{SinkExt, StreamExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::select;
use tokio::task::spawn_local;
use tokio_stream::StreamMap;
use tokio_util::codec::{
BytesCodec, Decoder, Encoder, FramedRead, FramedWrite, LengthDelimitedCodec, LinesCodec,
};
Expand Down Expand Up @@ -74,107 +74,160 @@ pub type TcpFramedSink<T> = Sender<(T, SocketAddr)>;
pub type TcpFramedStream<Codec: Decoder> =
Receiver<Result<(<Codec as Decoder>::Item, SocketAddr), <Codec as Decoder>::Error>>;

// TODO(mingwei): this temporary code should be replaced with a properly thought out networking system.
/// Create a listening tcp socket, and then as new connections come in, receive their data and forward it to a queue.
pub async fn bind_tcp<T: 'static, Codec: 'static + Clone + Decoder + Encoder<T>>(
pub async fn bind_tcp<Item, Codec>(
endpoint: SocketAddr,
codec: Codec,
) -> Result<(TcpFramedSink<T>, TcpFramedStream<Codec>, SocketAddr), std::io::Error> {
) -> Result<(TcpFramedSink<Item>, TcpFramedStream<Codec>, SocketAddr), std::io::Error>
where
Item: 'static,
Codec: 'static + Clone + Decoder + Encoder<Item>,
<Codec as Encoder<Item>>::Error: Debug,
{
let listener = TcpListener::bind(endpoint).await?;

let bound_endpoint = listener.local_addr()?;

let (tx_egress, mut rx_egress) = unsync_channel(None);
let (tx_ingress, rx_ingress) = unsync_channel(None);

let clients = Rc::new(RefCell::new(HashMap::new()));

spawn_local({
let clients = clients.clone();

async move {
while let Some((payload, addr)) = rx_egress.next().await {
let client = clients.borrow_mut().remove(&addr);

if let Some(mut sender) = client {
let _ = SinkExt::send(&mut sender, payload).await;
clients.borrow_mut().insert(addr, sender);
}
}
}
});
let (send_egress, mut recv_egress) = unsync_channel::<(Item, SocketAddr)>(None);
let (send_ingres, recv_ingres) = unsync_channel(None);

spawn_local(async move {
let send_ingress = send_ingres;
// Map of `addr -> peers`, to send messages to.
let mut peers_send = HashMap::new();
// `StreamMap` of `addr -> peers`, to receive messages from. Automatically removes streams
// when they disconnect.
let mut peers_recv = StreamMap::<SocketAddr, FramedRead<OwnedReadHalf, Codec>>::new();

loop {
let (stream, peer_addr) = if let Ok((stream, _)) = listener.accept().await {
if let Ok(peer_addr) = stream.peer_addr() {
(stream, peer_addr)
} else {
continue;
// Calling methods in a loop, futures must be cancel-safe.
select! {
// `biased` means the cases will be prioritized in the order they are listed.
// First we accept any new connections
// This is not strictly neccessary, but lets us do our internal work (send outgoing
// messages) before accepting more work (receiving more messages, accepting new
// clients).
biased;
// Send outgoing messages.
msg_send = recv_egress.next() => {
let Some((payload, peer_addr)) = msg_send else {
// `None` if the send side has been dropped (no more send messages will ever come).
continue;
};
let Some(stream) = peers_send.get_mut(&peer_addr) else {
tracing::warn!("Dropping message to non-connected peer: {}", peer_addr);
continue;
};
if let Err(err) = SinkExt::send(stream, payload).await {
tracing::error!("IO or codec error sending message to peer {}, disconnecting: {:?}", peer_addr, err);
peers_send.remove(&peer_addr); // `Drop` disconnects.
};
}
} else {
continue;
};

let mut tx_ingress = tx_ingress.clone();

let (send, recv) = tcp_framed(stream, codec.clone());

// TODO: Using peer_addr here as the key is a little bit sketchy.
// It's possible that a client could send a message, disconnect, then another client connects from the same IP address (and the same src port), and then the response could be sent to that new client.
// This can be solved by using monotonically increasing IDs for each new client, but would break the similarity with the UDP versions of this function.
clients.borrow_mut().insert(peer_addr, send);

spawn_local({
let clients = clients.clone();
async move {
let mapped = recv.map(|x| Ok(x.map(|x| (x, peer_addr))));
let _ = tx_ingress.send_all(&mut pin!(mapped)).await;

clients.borrow_mut().remove(&peer_addr);
// Receive incoming messages.
msg_recv = peers_recv.next(), if !peers_recv.is_empty() => {
// If `peers_recv` is empty then `next()` will immediately return `None` which
// would cause the loop to spin.
let Some((peer_addr, payload_result)) = msg_recv else {
continue; // => `peers_recv.is_empty()`.
};
if let Err(err) = send_ingress.send(payload_result.map(|payload| (payload, peer_addr))).await {
tracing::error!("Error passing along received message: {:?}", err);
}
}
});
// Accept new clients.
new_peer = listener.accept() => {
let Ok((stream, _addr)) = new_peer else {
continue;
};
let Ok(peer_addr) = stream.peer_addr() else {
continue;
};
let (peer_send, peer_recv) = tcp_framed(stream, codec.clone());

// TODO: Using peer_addr here as the key is a little bit sketchy.
// It's possible that a peer could send a message, disconnect, then another peer connects from the
// same IP address (and the same src port), and then the response could be sent to that new client.
// This can be solved by using monotonically increasing IDs for each new peer, but would break the
// similarity with the UDP versions of this function.
peers_send.insert(peer_addr, peer_send);
peers_recv.insert(peer_addr, peer_recv);
}
}
}
});

Ok((tx_egress, rx_ingress, bound_endpoint))
Ok((send_egress, recv_ingres, bound_endpoint))
}

/// The inverse of [`bind_tcp`].
///
/// When messages enqueued into the returned sender, tcp sockets will be created and connected as
/// necessary to send out the requests. As the responses come back, they will be forwarded to the
/// returned receiver.
pub fn connect_tcp<T: 'static, Codec: 'static + Clone + Decoder + Encoder<T>>(
codec: Codec,
) -> (TcpFramedSink<T>, TcpFramedStream<Codec>) {
let (tx_egress, mut rx_egress) = unsync_channel(None);
let (tx_ingress, rx_ingress) = unsync_channel(None);
pub fn connect_tcp<Item, Codec>(codec: Codec) -> (TcpFramedSink<Item>, TcpFramedStream<Codec>)
where
Item: 'static,
Codec: 'static + Clone + Decoder + Encoder<Item>,
<Codec as Encoder<Item>>::Error: Debug,
{
let (send_egress, mut recv_egress) = unsync_channel(None);
let (send_ingres, recv_ingres) = unsync_channel(None);

spawn_local(async move {
let mut streams = HashMap::new();
let send_ingres = send_ingres;
// Map of `addr -> peers`, to send messages to.
let mut peers_send = HashMap::new();
// `StreamMap` of `addr -> peers`, to receive messages from. Automatically removes streams
// when they disconnect.
let mut peers_recv = StreamMap::new();

while let Some((payload, addr)) = rx_egress.next().await {
let stream = match streams.entry(addr) {
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let socket = TcpSocket::new_v4().unwrap();
let stream = socket.connect(addr).await.unwrap();

let (send, recv) = tcp_framed(stream, codec.clone());

let mut tx_ingress = tx_ingress.clone();
spawn_local(async move {
let mapped = recv.map(|x| Ok(x.map(|x| (x, addr))));
let _ = tx_ingress.send_all(&mut pin!(mapped)).await;
});

entry.insert(send)
loop {
// Calling methods in a loop, futures must be cancel-safe.
select! {
// `biased` means the cases will be prioritized in the order they are listed.
// This is not strictly neccessary, but lets us do our internal work (send outgoing
// messages) before accepting more work (receiving more messages).
biased;
// Send outgoing messages.
msg_send = recv_egress.next() => {
let Some((payload, peer_addr)) = msg_send else {
// `None` if the send side has been dropped (no more send messages will ever come).
continue;
};

let stream = match peers_send.entry(peer_addr) {
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let socket = TcpSocket::new_v4().unwrap();
let stream = socket.connect(peer_addr).await.unwrap();

let (peer_send, peer_recv) = tcp_framed(stream, codec.clone());

peers_recv.insert(peer_addr, peer_recv);
entry.insert(peer_send)
}
};

if let Err(err) = stream.send(payload).await {
tracing::error!("IO or codec error sending message to peer {}, disconnecting: {:?}", peer_addr, err);
peers_send.remove(&peer_addr); // `Drop` disconnects.
}
}
};

let _ = stream.send(payload).await;
// Receive incoming messages.
msg_recv = peers_recv.next(), if !peers_recv.is_empty() => {
// If `peers_recv` is empty then `next()` will immediately return `None` which
// would cause the loop to spin.
let Some((peer_addr, payload_result)) = msg_recv else {
continue; // => `peers_recv.is_empty()`.
};
if let Err(err) = send_ingres.send(payload_result.map(|payload| (payload, peer_addr))).await {
tracing::error!("Error passing along received message: {:?}", err);
}
}
}
}
});

(tx_egress, rx_ingress)
(send_egress, recv_ingres)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ Hello 6
Hello 7
Hello 8
Hello 9

Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,3 @@ digraph {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,3 @@ subgraph sg_4v1 ["sg_4v1 stratum 1"]
9v1
end
end

Loading

0 comments on commit 206c2a1

Please sign in to comment.