From 159b5aab17675d9afdf65d49b3e3bc92542e04ab Mon Sep 17 00:00:00 2001 From: Tim Schubert Date: Mon, 8 Apr 2024 20:24:26 +0200 Subject: [PATCH] fix: allow each port to only be used as part of one virtual link Depending on the type of "port", the port may be bidirectional. This is the case for "network interfaces". Fixes #5 --- a653rs-router-linux/src/network.rs | 6 +-- a653rs-router-tests/src/lib.rs | 9 ++-- a653rs-router-zynq7000/src/network.rs | 6 +-- a653rs-router/src/network.rs | 13 ++--- a653rs-router/src/ports.rs | 40 +++++---------- a653rs-router/src/router.rs | 74 +++++++++++++++------------ 6 files changed, 65 insertions(+), 83 deletions(-) diff --git a/a653rs-router-linux/src/network.rs b/a653rs-router-linux/src/network.rs index 96a10a8..cceab63 100644 --- a/a653rs-router-linux/src/network.rs +++ b/a653rs-router-linux/src/network.rs @@ -11,7 +11,7 @@ impl PlatformNetworkInterface for UdpNetworkInterface { fn platform_interface_receive_unchecked( id: NetworkInterfaceId, buffer: &'_ mut [u8], - ) -> Result<(VirtualLinkId, &'_ [u8]), InterfaceError> { + ) -> Result<&'_ [u8], InterfaceError> { let sock = get_interface(id)?; match sock.sock.recv(buffer) { Ok(read) => { @@ -20,10 +20,10 @@ impl PlatformNetworkInterface for UdpNetworkInterface { let mut vl_id_buf = [0u8; size_of::()]; vl_id_buf.copy_from_slice(vl_id); let vl_id = u32::from_be_bytes(vl_id_buf); - let vl_id = VirtualLinkId::from_u32(vl_id); + let _vl_id = VirtualLinkId::from_u32(vl_id); let msg = &buffer[vl_id_len..read]; router_trace!("Received message from UDP socket for VL {vl_id}: {:?}", msg); - Ok((vl_id, msg)) + Ok(msg) } Err(_) => Err(InterfaceError::NoData), } diff --git a/a653rs-router-tests/src/lib.rs b/a653rs-router-tests/src/lib.rs index 9b14169..309b221 100644 --- a/a653rs-router-tests/src/lib.rs +++ b/a653rs-router-tests/src/lib.rs @@ -39,7 +39,7 @@ use a653rs::bindings::{ QueuingPortId, Validity, }; use a653rs_router::prelude::{ - CreateNetworkInterfaceId, NetworkInterfaceId, PlatformNetworkInterface, VirtualLinkId, + CreateNetworkInterfaceId, NetworkInterfaceId, PlatformNetworkInterface, }; #[derive(Debug)] @@ -181,11 +181,8 @@ impl PlatformNetworkInterface for DummyNetIntf { fn platform_interface_receive_unchecked( _id: NetworkInterfaceId, buffer: &'_ mut [u8], - ) -> Result< - (a653rs_router::prelude::VirtualLinkId, &'_ [u8]), - a653rs_router::prelude::InterfaceError, - > { - Ok((VirtualLinkId::from(1u32), buffer)) + ) -> Result<&'_ [u8], a653rs_router::prelude::InterfaceError> { + Ok(buffer) } } diff --git a/a653rs-router-zynq7000/src/network.rs b/a653rs-router-zynq7000/src/network.rs index 4484d70..81d5d04 100644 --- a/a653rs-router-zynq7000/src/network.rs +++ b/a653rs-router-zynq7000/src/network.rs @@ -165,7 +165,7 @@ where fn platform_interface_receive_unchecked( id: NetworkInterfaceId, buffer: &'_ mut [u8], - ) -> Result<(VirtualLinkId, &'_ [u8]), InterfaceError> { + ) -> Result<&'_ [u8], InterfaceError> { if unsafe { !UART.uart.is_data_ready() } { return Err(InterfaceError::NoData); } @@ -198,11 +198,11 @@ where } } match UartFrame::::decode(&mut buf) { - Ok((vl, pl)) => { + Ok((_vl, pl)) => { let rpl = &mut buffer[0..pl.len()]; rpl.copy_from_slice(pl); trace!(end_network_receive, id.0 as u16); - Ok((vl, rpl)) + Ok(rpl) } _ => { trace!(end_network_receive, id.0 as u16); diff --git a/a653rs-router/src/network.rs b/a653rs-router/src/network.rs index bdaa9a3..9538821 100644 --- a/a653rs-router/src/network.rs +++ b/a653rs-router/src/network.rs @@ -55,10 +55,7 @@ impl NetworkInterface { } /// Receives data from the interface. - pub fn receive<'a>( - &self, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), InterfaceError> { + pub fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], InterfaceError> { if buf.len() < self.mtu { return Err(InterfaceError::InsufficientBuffer); } @@ -81,7 +78,7 @@ pub trait PlatformNetworkInterface { fn platform_interface_receive_unchecked( id: NetworkInterfaceId, buffer: &'_ mut [u8], - ) -> Result<(VirtualLinkId, &'_ [u8]), InterfaceError>; + ) -> Result<&'_ [u8], InterfaceError>; } /// Creates a network interface id. @@ -148,11 +145,7 @@ impl InterfaceConfig { } impl RouterInput for NetworkInterface { - fn receive<'a>( - &self, - _vl: &VirtualLinkId, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), PortError> { + fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> { NetworkInterface::receive(self, buf).map_err(|e| { router_debug!("Failed to receive from network interface: {:?}", e); PortError::Receive diff --git a/a653rs-router/src/ports.rs b/a653rs-router/src/ports.rs index 553d864..24e2726 100644 --- a/a653rs-router/src/ports.rs +++ b/a653rs-router/src/ports.rs @@ -51,17 +51,13 @@ pub(crate) struct QueuingIn { } impl RouterInput for SamplingIn { - fn receive<'a>( - &self, - vl: &VirtualLinkId, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), PortError> { + fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> { let buf = buf.validate_read(self.inner.msg_size)?; let (_val, len) = unsafe { ::read_sampling_message(self.inner.id, buf) .map_err(|_e| PortError::Receive) }?; - Ok((*vl, &buf[..(len as usize)])) + Ok(&buf[..(len as usize)]) } fn mtu(&self) -> PayloadSize { @@ -70,18 +66,14 @@ impl RouterInput for SamplingIn { } impl RouterInput for QueuingIn { - fn receive<'a>( - &self, - vl: &VirtualLinkId, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), PortError> { + fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> { let buf = buf.validate_read(self.inner.msg_size)?; let timeout = Duration::from_micros(10).as_nanos() as ApexSystemTime; let (val, _overflow) = unsafe { ::receive_queuing_message(self.inner.id, timeout, buf) .map_err(|_e| PortError::Receive) }?; - Ok((*vl, &buf[..val as usize])) + Ok(&buf[..val as usize]) } fn mtu(&self) -> PayloadSize { @@ -204,16 +196,12 @@ impl QueuingOut { } impl RouterInput for SamplingPortDestination { - fn receive<'a>( - &self, - vl: &VirtualLinkId, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), PortError> { - router_bench!(begin_apex_receive, vl.0 as u16); + fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> { + router_bench!(begin_apex_receive, self.id() as u16); let res = self.receive(buf); - router_bench!(end_apex_receive, vl.0 as u16); + router_bench!(end_apex_receive, self.id() as u16); let (_val, data) = res.map_err(|_e| PortError::Receive)?; - Ok((*vl, data)) + Ok(data) } fn mtu(&self) -> PayloadSize { @@ -238,17 +226,13 @@ impl RouterOutput for SamplingPortS impl RouterInput for QueuingPortReceiver { - fn receive<'a>( - &self, - vl: &VirtualLinkId, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), PortError> { + fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> { let timeout = SystemTime::Normal(Duration::from_micros(10)); - router_bench!(begin_apex_send, vl.0 as u16); + router_bench!(begin_apex_send, self.id() as u16); let res = self.receive(buf, timeout); - router_bench!(end_apex_send, vl.0 as u16); + router_bench!(end_apex_send, self.id() as u16); let (buf, _overflow) = res.map_err(|_e| PortError::Receive)?; - Ok((*vl, buf)) + Ok(buf) } fn mtu(&self) -> PayloadSize { diff --git a/a653rs-router/src/router.rs b/a653rs-router/src/router.rs index 350d691..6f2d1fd 100644 --- a/a653rs-router/src/router.rs +++ b/a653rs-router/src/router.rs @@ -153,18 +153,9 @@ impl<'a, const IN: usize, const OUT: usize> Router<'a, IN, OUT> { pub trait RouterInput { /// Receives a message and store it into `buf`. /// - /// The returned `VirtualLinkId` indicates for which virtual link the - /// received data is destined. Usually, this will be the same virtual - /// link as has been specified by `VirtualLinkId`, but when the `Input` - /// multiplexes multiple virtual links, the next received message may be - /// (unexpectedly) for another virtual link. Implementations may choose to - /// indicate this or return an error, if they should do not receive - /// multiple virtual links. - fn receive<'a>( - &self, - vl: &VirtualLinkId, - buf: &'a mut [u8], - ) -> Result<(VirtualLinkId, &'a [u8]), PortError>; + /// # Errors + /// May return an error if receiving the message failed. + fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError>; /// Maximum transfer unit fn mtu(&self) -> PayloadSize; @@ -198,12 +189,11 @@ impl<'a, const I: usize, const O: usize> RouteTable<'a, I, O> { fn route(&self, vl: &VirtualLinkId) -> Result<(), Error> { let buf = &mut [0u8; B]; let input = self.inputs.get(vl).ok_or(RouteError::InvalidVl)?; - // This vl may be different, than the one specified. - let (vl, buf) = input.receive(vl, buf)?; + let buf = input.receive(buf)?; router_debug!("Received from {vl:?}: {buf:?}"); - let outs = self.outputs.get(&vl).ok_or(RouteError::InvalidVl)?; + let outs = self.outputs.get(vl).ok_or(RouteError::InvalidVl)?; for out in outs.into_iter() { - out.send(&vl, buf).map_err(|e| { + out.send(vl, buf).map_err(|e| { router_debug!("Failed to route {:?}", vl); e })?; @@ -261,28 +251,37 @@ impl<'a, const I: usize, const O: usize> RouteTable<'a, I, O> { } let mut b = &mut StateBuilder::default(); for (v, cfg) in virtual_links_cfg.into_iter() { + // Check for multiple uses of same source + if virtual_links_cfg + .iter() + .filter(|(_, c)| c.src == cfg.src) + .count() + > 1 + { + return Err(RouterConfigError::Source); + } let inp = inputs.get(&cfg.src).ok_or_else(|| { router_debug!("Unknown input: {}", cfg.src.deref()); RouterConfigError::Destination })?; - let input_msg_size = inp.mtu(); let outs: Result, RouterConfigError> = cfg .dsts .iter() .map(|d| { - outputs - .get(d) - .ok_or_else(|| { - router_debug!("Unknown output {}", d.deref()); - RouterConfigError::Source - }) - .and_then(|outp| { - if outp.mtu() == input_msg_size { - Ok(outp) - } else { - Err(RouterConfigError::Destination) - } - }) + // Check for multiple uses of same destination + if virtual_links_cfg + .iter() + .flat_map(|(_, c)| c.dsts.iter()) + .filter(|d_name| *d_name == d) + .count() + > 1 + { + return Err(RouterConfigError::Destination); + } + outputs.get(d).ok_or_else(|| { + router_debug!("Unknown output {}", d.deref()); + RouterConfigError::Source + }) }) .map(|d| d.copied()) .collect(); @@ -317,18 +316,27 @@ impl<'a, const I: usize, const O: usize> Debug for StateBuilder<'a, I, O> { } impl<'a, const I: usize, const O: usize> StateBuilder<'a, I, O> { - pub fn route( + fn route( &mut self, vl: &VirtualLinkId, input: &'a dyn RouterInput, - output: &Vec<&'a dyn RouterOutput, O>, + outputs: &Vec<&'a dyn RouterOutput, O>, ) -> Result<&mut Self, RouterConfigError> { if self.vls.contains_key(vl) { return Err(RouterConfigError::VirtualLink); } + + // Check if input and output message sizes match + let input_msg_size = input.mtu(); + for outp in outputs.iter() { + if outp.mtu() != input_msg_size { + return Err(RouterConfigError::Destination); + } + } + _ = self .vls - .insert(*vl, (input, output.clone())) + .insert(*vl, (input, outputs.clone())) .map_err(|_e| RouterConfigError::Storage)?; Ok(self) }