Skip to content

Commit

Permalink
flowd-rs: Add GraphInOutPortHolder impl (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
ERnsTL committed Sep 1, 2022
1 parent 6c60cbd commit 759a1e1
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions flowd-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![feature(map_try_insert)]

use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, RwLock, Mutex};
use std::thread::{self, Thread};
use std::time::Duration;

Expand Down Expand Up @@ -134,12 +134,24 @@ fn main() {
}

//fn handle_client(stream: TcpStream, graph: Arc<RwLock<Graph>>, runtime: Arc<RwLock<RuntimeRuntimePayload>>, components: Arc<RwLock<ComponentLibrary>>, processes: Arc<RwLock<ProcessManager>>) -> Result<()> {
fn handle_client(stream: TcpStream, graph: Arc<RwLock<Graph>>, runtime: Arc<RwLock<RuntimeRuntimePayload>>, components: Arc<RwLock<ComponentLibrary>>) -> Result<()> {
fn handle_client(stream: TcpStream, graph: Arc<RwLock<Graph>>, runtime: Arc<RwLock<RuntimeRuntimePayload>>, components: Arc<RwLock<ComponentLibrary>>, graph_inout: Arc<std::sync::Mutex<GraphInportOutportHolder>>) -> Result<()> {
stream
.set_write_timeout(Some(Duration::SECOND))
.expect("set_write_timeout call failed");
//stream.set_nodelay(true).expect("set_nodelay call failed");

// save stream clone/dup for graph outports process and pack into "cloned" WebSocket
/*
tungstenite::WebSocket::from_raw_socket(
websocket.get_mut().try_clone().expect("clone of tcp stream failed for graph outports handler thread"),
tungstenite::protocol::Role::Server,
None
*/
let peer_addr = stream.peer_addr().expect("could not get peer socketaddr");
{
graph_inout.lock().expect("could not acquire lock for saving TcpStream for graph outport process").websockets.insert(peer_addr, tungstenite::WebSocket::from_raw_socket(stream.try_clone().expect("could not try_clone() TcpStream"), tungstenite::protocol::Role::Server, None));
}

let callback = |req: &Request, mut response: Response| {
debug!("Received a new ws handshake");
debug!("The request's path is: {}", req.uri().path());
Expand Down Expand Up @@ -1741,6 +1753,21 @@ impl RuntimeRuntimePayload {
//TODO runtime: command to connect an outport to a remote runtime as remote subgraph.
}

// runtime state of graph inports and outports
#[derive(Debug)]
struct GraphInportOutportHolder {
// inports
// the edge sinks are stored here because the connection handler in handle_client() needs to send into these
inports: Option<HashMap<String, ProcessEdgeSink>>,

// outports are handled by 1 special component that needs to be signaled and joined on network stop()
// sink and wakeup are given to the processes that write into the graph outport process, so they are not stored here
outports: Option<Process>,

// connected client websockets ready to send responses to connected clients, for graphout process
websockets: HashMap<std::net::SocketAddr, tungstenite::WebSocket<TcpStream>>
}

#[derive(Serialize, Debug)]
enum Capability {
// spec: deprecated. Implies capabilities network:status, network:data, network:control. Does not imply capability network:persist.
Expand Down

0 comments on commit 759a1e1

Please sign in to comment.