Skip to content

Commit

Permalink
fix: allow each port to only be used as part of one virtual link
Browse files Browse the repository at this point in the history
Depending on the type of "port", the port may be bidirectional. This is
the case for "network interfaces".

Fixes #5
  • Loading branch information
dadada committed Apr 8, 2024
1 parent 1e8b725 commit 159b5aa
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 83 deletions.
6 changes: 3 additions & 3 deletions a653rs-router-linux/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl<const MTU: usize> PlatformNetworkInterface for UdpNetworkInterface<MTU> {
fn platform_interface_receive_unchecked(
id: NetworkInterfaceId,
buffer: &'_ mut [u8],
) -> Result<(VirtualLinkId, &'_ [u8]), InterfaceError> {
) -> Result<&'_ [u8], InterfaceError> {
let sock = get_interface(id)?;
match sock.sock.recv(buffer) {
Ok(read) => {
Expand All @@ -20,10 +20,10 @@ impl<const MTU: usize> PlatformNetworkInterface for UdpNetworkInterface<MTU> {
let mut vl_id_buf = [0u8; size_of::<VirtualLinkId>()];
vl_id_buf.copy_from_slice(vl_id);
let vl_id = u32::from_be_bytes(vl_id_buf);
let vl_id = VirtualLinkId::from_u32(vl_id);
let _vl_id = VirtualLinkId::from_u32(vl_id);
let msg = &buffer[vl_id_len..read];
router_trace!("Received message from UDP socket for VL {vl_id}: {:?}", msg);
Ok((vl_id, msg))
Ok(msg)
}
Err(_) => Err(InterfaceError::NoData),
}
Expand Down
9 changes: 3 additions & 6 deletions a653rs-router-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use a653rs::bindings::{
QueuingPortId, Validity,
};
use a653rs_router::prelude::{
CreateNetworkInterfaceId, NetworkInterfaceId, PlatformNetworkInterface, VirtualLinkId,
CreateNetworkInterfaceId, NetworkInterfaceId, PlatformNetworkInterface,
};

#[derive(Debug)]
Expand Down Expand Up @@ -181,11 +181,8 @@ impl PlatformNetworkInterface for DummyNetIntf {
fn platform_interface_receive_unchecked(
_id: NetworkInterfaceId,
buffer: &'_ mut [u8],
) -> Result<
(a653rs_router::prelude::VirtualLinkId, &'_ [u8]),
a653rs_router::prelude::InterfaceError,
> {
Ok((VirtualLinkId::from(1u32), buffer))
) -> Result<&'_ [u8], a653rs_router::prelude::InterfaceError> {
Ok(buffer)
}
}

Expand Down
6 changes: 3 additions & 3 deletions a653rs-router-zynq7000/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
fn platform_interface_receive_unchecked(
id: NetworkInterfaceId,
buffer: &'_ mut [u8],
) -> Result<(VirtualLinkId, &'_ [u8]), InterfaceError> {
) -> Result<&'_ [u8], InterfaceError> {
if unsafe { !UART.uart.is_data_ready() } {
return Err(InterfaceError::NoData);
}
Expand Down Expand Up @@ -198,11 +198,11 @@ where
}
}
match UartFrame::<MTU>::decode(&mut buf) {
Ok((vl, pl)) => {
Ok((_vl, pl)) => {
let rpl = &mut buffer[0..pl.len()];
rpl.copy_from_slice(pl);
trace!(end_network_receive, id.0 as u16);
Ok((vl, rpl))
Ok(rpl)
}
_ => {
trace!(end_network_receive, id.0 as u16);
Expand Down
13 changes: 3 additions & 10 deletions a653rs-router/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ impl<H: PlatformNetworkInterface> NetworkInterface<H> {
}

/// Receives data from the interface.
pub fn receive<'a>(
&self,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), InterfaceError> {
pub fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], InterfaceError> {
if buf.len() < self.mtu {
return Err(InterfaceError::InsufficientBuffer);
}
Expand All @@ -81,7 +78,7 @@ pub trait PlatformNetworkInterface {
fn platform_interface_receive_unchecked(
id: NetworkInterfaceId,
buffer: &'_ mut [u8],
) -> Result<(VirtualLinkId, &'_ [u8]), InterfaceError>;
) -> Result<&'_ [u8], InterfaceError>;
}

/// Creates a network interface id.
Expand Down Expand Up @@ -148,11 +145,7 @@ impl InterfaceConfig {
}

impl<H: PlatformNetworkInterface> RouterInput for NetworkInterface<H> {
fn receive<'a>(
&self,
_vl: &VirtualLinkId,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), PortError> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
NetworkInterface::receive(self, buf).map_err(|e| {
router_debug!("Failed to receive from network interface: {:?}", e);
PortError::Receive
Expand Down
40 changes: 12 additions & 28 deletions a653rs-router/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,13 @@ pub(crate) struct QueuingIn<H: ApexQueuingPortP4> {
}

impl<H: ApexSamplingPortP4> RouterInput for SamplingIn<H> {
fn receive<'a>(
&self,
vl: &VirtualLinkId,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), PortError> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
let buf = buf.validate_read(self.inner.msg_size)?;
let (_val, len) = unsafe {
<H as ApexSamplingPortP4>::read_sampling_message(self.inner.id, buf)
.map_err(|_e| PortError::Receive)
}?;
Ok((*vl, &buf[..(len as usize)]))
Ok(&buf[..(len as usize)])
}

fn mtu(&self) -> PayloadSize {
Expand All @@ -70,18 +66,14 @@ impl<H: ApexSamplingPortP4> RouterInput for SamplingIn<H> {
}

impl<H: ApexQueuingPortP4> RouterInput for QueuingIn<H> {
fn receive<'a>(
&self,
vl: &VirtualLinkId,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), PortError> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
let buf = buf.validate_read(self.inner.msg_size)?;
let timeout = Duration::from_micros(10).as_nanos() as ApexSystemTime;
let (val, _overflow) = unsafe {
<H as ApexQueuingPortP4>::receive_queuing_message(self.inner.id, timeout, buf)
.map_err(|_e| PortError::Receive)
}?;
Ok((*vl, &buf[..val as usize]))
Ok(&buf[..val as usize])
}

fn mtu(&self) -> PayloadSize {
Expand Down Expand Up @@ -204,16 +196,12 @@ impl<H: ApexQueuingPortP4> QueuingOut<H> {
}

impl<const M: MessageSize, S: ApexSamplingPortP4> RouterInput for SamplingPortDestination<M, S> {
fn receive<'a>(
&self,
vl: &VirtualLinkId,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), PortError> {
router_bench!(begin_apex_receive, vl.0 as u16);
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
router_bench!(begin_apex_receive, self.id() as u16);
let res = self.receive(buf);
router_bench!(end_apex_receive, vl.0 as u16);
router_bench!(end_apex_receive, self.id() as u16);
let (_val, data) = res.map_err(|_e| PortError::Receive)?;
Ok((*vl, data))
Ok(data)
}

fn mtu(&self) -> PayloadSize {
Expand All @@ -238,17 +226,13 @@ impl<const M: MessageSize, S: ApexSamplingPortP4> RouterOutput for SamplingPortS
impl<const M: MessageSize, const R: MessageRange, Q: ApexQueuingPortP4> RouterInput
for QueuingPortReceiver<M, R, Q>
{
fn receive<'a>(
&self,
vl: &VirtualLinkId,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), PortError> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
let timeout = SystemTime::Normal(Duration::from_micros(10));
router_bench!(begin_apex_send, vl.0 as u16);
router_bench!(begin_apex_send, self.id() as u16);
let res = self.receive(buf, timeout);
router_bench!(end_apex_send, vl.0 as u16);
router_bench!(end_apex_send, self.id() as u16);
let (buf, _overflow) = res.map_err(|_e| PortError::Receive)?;
Ok((*vl, buf))
Ok(buf)
}

fn mtu(&self) -> PayloadSize {
Expand Down
74 changes: 41 additions & 33 deletions a653rs-router/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,9 @@ impl<'a, const IN: usize, const OUT: usize> Router<'a, IN, OUT> {
pub trait RouterInput {
/// Receives a message and store it into `buf`.
///
/// The returned `VirtualLinkId` indicates for which virtual link the
/// received data is destined. Usually, this will be the same virtual
/// link as has been specified by `VirtualLinkId`, but when the `Input`
/// multiplexes multiple virtual links, the next received message may be
/// (unexpectedly) for another virtual link. Implementations may choose to
/// indicate this or return an error, if they should do not receive
/// multiple virtual links.
fn receive<'a>(
&self,
vl: &VirtualLinkId,
buf: &'a mut [u8],
) -> Result<(VirtualLinkId, &'a [u8]), PortError>;
/// # Errors
/// May return an error if receiving the message failed.
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError>;

/// Maximum transfer unit
fn mtu(&self) -> PayloadSize;
Expand Down Expand Up @@ -198,12 +189,11 @@ impl<'a, const I: usize, const O: usize> RouteTable<'a, I, O> {
fn route<const B: usize>(&self, vl: &VirtualLinkId) -> Result<(), Error> {
let buf = &mut [0u8; B];
let input = self.inputs.get(vl).ok_or(RouteError::InvalidVl)?;
// This vl may be different, than the one specified.
let (vl, buf) = input.receive(vl, buf)?;
let buf = input.receive(buf)?;
router_debug!("Received from {vl:?}: {buf:?}");
let outs = self.outputs.get(&vl).ok_or(RouteError::InvalidVl)?;
let outs = self.outputs.get(vl).ok_or(RouteError::InvalidVl)?;
for out in outs.into_iter() {
out.send(&vl, buf).map_err(|e| {
out.send(vl, buf).map_err(|e| {
router_debug!("Failed to route {:?}", vl);
e
})?;
Expand Down Expand Up @@ -261,28 +251,37 @@ impl<'a, const I: usize, const O: usize> RouteTable<'a, I, O> {
}
let mut b = &mut StateBuilder::default();
for (v, cfg) in virtual_links_cfg.into_iter() {
// Check for multiple uses of same source
if virtual_links_cfg
.iter()
.filter(|(_, c)| c.src == cfg.src)
.count()
> 1
{
return Err(RouterConfigError::Source);
}
let inp = inputs.get(&cfg.src).ok_or_else(|| {
router_debug!("Unknown input: {}", cfg.src.deref());
RouterConfigError::Destination
})?;
let input_msg_size = inp.mtu();
let outs: Result<Vec<_, O>, RouterConfigError> = cfg
.dsts
.iter()
.map(|d| {
outputs
.get(d)
.ok_or_else(|| {
router_debug!("Unknown output {}", d.deref());
RouterConfigError::Source
})
.and_then(|outp| {
if outp.mtu() == input_msg_size {
Ok(outp)
} else {
Err(RouterConfigError::Destination)
}
})
// Check for multiple uses of same destination
if virtual_links_cfg
.iter()
.flat_map(|(_, c)| c.dsts.iter())
.filter(|d_name| *d_name == d)
.count()
> 1
{
return Err(RouterConfigError::Destination);
}
outputs.get(d).ok_or_else(|| {
router_debug!("Unknown output {}", d.deref());
RouterConfigError::Source
})
})
.map(|d| d.copied())
.collect();
Expand Down Expand Up @@ -317,18 +316,27 @@ impl<'a, const I: usize, const O: usize> Debug for StateBuilder<'a, I, O> {
}

impl<'a, const I: usize, const O: usize> StateBuilder<'a, I, O> {
pub fn route(
fn route(
&mut self,
vl: &VirtualLinkId,
input: &'a dyn RouterInput,
output: &Vec<&'a dyn RouterOutput, O>,
outputs: &Vec<&'a dyn RouterOutput, O>,
) -> Result<&mut Self, RouterConfigError> {
if self.vls.contains_key(vl) {
return Err(RouterConfigError::VirtualLink);
}

// Check if input and output message sizes match
let input_msg_size = input.mtu();
for outp in outputs.iter() {
if outp.mtu() != input_msg_size {
return Err(RouterConfigError::Destination);
}
}

_ = self
.vls
.insert(*vl, (input, output.clone()))
.insert(*vl, (input, outputs.clone()))
.map_err(|_e| RouterConfigError::Storage)?;
Ok(self)
}
Expand Down

0 comments on commit 159b5aa

Please sign in to comment.