Skip to content

Commit

Permalink
flowd-rs: Add edge generation, process generation and thread for grap…
Browse files Browse the repository at this point in the history
…h outports (#207)
  • Loading branch information
ERnsTL committed Sep 1, 2022
1 parent 759a1e1 commit 3c0e569
Showing 1 changed file with 154 additions and 1 deletion.
155 changes: 154 additions & 1 deletion flowd-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,8 @@ impl RuntimeRuntimePayload {
}

//fn start(&mut self, graph: &Graph, process_manager: &mut ProcessManager) -> std::result::Result<&NetworkStartedResponsePayload, std::io::Error> {
fn start(&mut self, graph: &Graph, components: &ComponentLibrary) -> std::result::Result<&NetworkStartedResponsePayload, std::io::Error> {
fn start(&mut self, graph: &Graph, components: &ComponentLibrary, graph_inout_arc: Arc<Mutex<GraphInportOutportHolder>>) -> std::result::Result<&NetworkStartedResponsePayload, std::io::Error> {
let mut graph_inout = graph_inout_arc.lock().expect("could not acquire lock for network start()");
//TODO implement
//TODO implement: what to do with the old running processes, stop using signal channel? What if they dont respond?
//TODO implement: what if the name of the node changes? then the process is not found by that name anymore in the process manager
Expand Down Expand Up @@ -1465,6 +1466,13 @@ impl RuntimeRuntimePayload {
//TODO would be nice to know the name of the process
ports_all.try_insert(proc_name.clone(), ProcPorts::default()).expect("preparing edges for process failed: process name already exists");
}
//TODO using graph name as fake process, but does that imply we cannot change the graph name during runtime?
if graph.inports.len() > 0 {
ports_all.try_insert(format!("{}-IN", graph.properties.name), ProcPorts::default()).expect("preparing inport edges for graph failed: process name already exists");
}
if graph.outports.len() > 0 {
ports_all.try_insert(format!("{}-OUT", graph.properties.name), ProcPorts::default()).expect("preparing outport edges for graph failed: process name already exists");
}
// fill keys with connections
for edge in graph.edges.iter() {
if let Some(iip) = &edge.data {
Expand Down Expand Up @@ -1498,6 +1506,40 @@ impl RuntimeRuntimePayload {
}
}
}
for (public_name, edge) in graph.inports.iter() {
// prepare edge
info!("preparing edge from graph {} to {}.{}", public_name, edge.process, edge.port);
let (sink, source) = ProcessEdge::new(PROCESSEDGE_BUFSIZE);

// insert into inports of target process
let targetproc = ports_all.get_mut(&edge.process).expect("graph target assignment process not found");
if let Some(_) = targetproc.inports.insert(edge.port.clone(), source) {
return Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, String::from("graph target inport insert failed, key exists")));
}
// assign into outports of source process
// source process name = graphname-IN
let sourceproc = ports_all.get_mut(format!("{}-IN", graph.properties.name).as_str()).expect("graph source assignment process not found");
if let Some(_) = sourceproc.outports.insert(public_name.clone(), ProcessEdgeSink { sink: sink, wakeup: None, proc_name: Some(edge.process.clone()) } ) {
return Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, String::from("graph source inport insert failed, key exists")));
}
}
for (public_name, edge) in graph.outports.iter() {
// prepare edge
info!("preparing edge from {}.{} to graph {}", edge.process, edge.port, public_name);
let (sink, source) = ProcessEdge::new(PROCESSEDGE_BUFSIZE);

// insert into inports of target process
// target process name = graphname-OUT
let targetproc = ports_all.get_mut(format!("{}-OUT", graph.properties.name).as_str()).expect("graph target assignment process not found");
if let Some(_) = targetproc.inports.insert(public_name.clone(), source) {
return Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, String::from("graph target outport insert failed, key exists")));
}
// assign into outports of source process
let sourceproc = ports_all.get_mut(&edge.process).expect("graph source assignment process not found");
if let Some(_) = sourceproc.outports.insert(edge.port.clone(), ProcessEdgeSink { sink: sink, wakeup: None, proc_name: Some(format!("{}-OUT", graph.properties.name)) } ) {
return Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, String::from("graph source outport insert failed, key exists")));
}
}

// generate processes and assign prepared connections
let thread_handles: Arc<std::sync::Mutex<HashMap<String, Thread>>> = Arc::new(std::sync::Mutex::new(HashMap::new()));
Expand Down Expand Up @@ -1623,6 +1665,117 @@ impl RuntimeRuntimePayload {
joinhandle: joinhandle,
});
}
// work off graphname-IN and graphname-OUT special processes for graph inports and graph outports###
//TODO the signal channel and joinhandle of the graph outport process/thread could also simply be stored in the processes variable with all other FBP processes
graph_inout.inports = None;
graph_inout.outports = None;
if ports_all.len() > 0 {
if ports_all.contains_key(format!("{}-IN", graph.properties.name).as_str()) {
// target datastructure
let mut outports: HashMap<String, ProcessEdgeSink> = HashMap::new();
// get ports for this special component
let ports_this: ProcPorts = ports_all.remove(format!("{}-IN", graph.properties.name).as_str()).expect("prepared connections for graph inports not found");
// add wakeup handles and sinks of all target processes (translate target proc_name into join_handle)
for (port_name, edge) in ports_this.outports {
// get joinhandle
let thr = thread_handles.lock().expect("acquire lock for graph inport Thread handle replacement").get(edge.proc_name.unwrap().as_str()).expect("target process for graph inport not found").clone();
// insert that port
outports.insert(port_name, ProcessEdgeSink { sink: edge.sink, wakeup: Some(thr), proc_name: None });
}
// save the inports (where we put packets into) as the graph inport channel handles; they are "outport handles" because they are being written into (packet sink)
graph_inout.inports = Some(outports);
}
if ports_all.contains_key(format!("{}-OUT", graph.properties.name).as_str()) {
// get ports for this special component, of interest here are the inports (source channels)
let ports_this: ProcPorts = ports_all.remove(format!("{}-OUT", graph.properties.name).as_str()).expect("prepared connections for graph outports not found");
let mut inports = ports_this.inports;
// prepare process signal channel
let (signalsink, signalsource): (ProcessSignalSink, ProcessSignalSource) = std::sync::mpsc::sync_channel(PROCESSEDGE_SIGNAL_BUFSIZE);
// start thread, will move signalsource, inports
let graph_name = graph.properties.name.clone(); //TODO cannot change graph name during runtime because of this
//TODO optimize; WebSocket is not Copy, but a WebSocket can be re-created from the inner TcpStream, which has a try_clone()
let mut inoutref = graph_inout_arc.clone();
let joinhandle = thread::Builder::new().name(format!("{}-OUT", graph.properties.name)).spawn(move || {
let signals = signalsource;
if inports.len() == 0 {
error!("GraphOutports: no inports found, exiting");
return;
}
//let mut websocket = tungstenite::WebSocket::from_raw_socket(websocket_stream, tungstenite::protocol::Role::Server, None);
debug!("GraphOutports is now run()ning!");
loop {
trace!("GraphOutports: begin of iteration");
// check signals
//TODO optimize, there is also try_recv() and recv_timeout()
if let Ok(ip) = signals.try_recv() {
//TODO optimize string conversions
info!("received signal ip: {}", String::from_utf8(ip.clone()).expect("invalid utf-8"));
// stop signal
if ip == "stop".as_bytes().to_vec() {
info!("GraphOutports: got stop signal, exiting");
break;
}
}
// receive on all inports
for (port_name, inport) in inports.iter_mut() {
//TODO while !inn.is_empty() {
loop {
if let Ok(ip) = inport.pop() {
// output the packet data with newline
debug!("got a packet for graph outport {}", port_name);
trace!("{}", String::from_utf8(ip.clone()).expect("non utf-8 data")); //TODO optimize avoid clone here

// send out to FBP network protocol client
debug!("sending out to client...");
{
let mut websockets = inoutref.lock().unwrap();
for client in websockets.websockets.iter_mut() {
client.1
.write_message(Message::text(
serde_json::to_string(&RuntimePacketResponse::new(RuntimePacketResponsePayload {
port: port_name.clone(), //TODO optimize
event: RuntimePacketEvent::Data,
typ: None, //TODO implement properly, OTOH it is an optional field
schema: None,
graph: graph_name.clone(),
payload: Some(String::from_utf8(ip.clone()).expect("non utf-8 data")), //TODO optimize useless conversions here
}))
.expect("failed to serialize runtime:packet response"),
))
.expect("failed to write message into websocket");
}
}
debug!("done");
} else {
break;
}
}
}
trace!("GraphOutports: -- end of iteration");
thread::park();
}
info!("GraphOutports: exiting");
}).expect("thread start failed");

// store thread handle for wakeup in components
thread_handles.lock().expect("failed to get lock posting graph outport thread handle").insert(format!("{}-OUT", graph.properties.name), joinhandle.thread().clone());
// store process signal channel and join handle so that the other processes writing into this graph outport component can find it
self.processes.insert(format!("{}-OUT", graph.properties.name), Process {
signal: signalsink,
joinhandle: joinhandle,
});//###

// save single joinhandle and signal for that component
//### cannot clone joinhandle
/*
graph_inout.outports = Some(Process {
signal: signalsink,
joinhandle: joinhandle,
})
*/
}
//### self.graph_inout = graph_inout;
}

// sanity check
if ports_all.len() != 0 {
Expand Down

0 comments on commit 3c0e569

Please sign in to comment.