Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update a653rs(-linux) #9

Merged
merged 8 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
575 changes: 208 additions & 367 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ default-members = [
]

[workspace.dependencies]
a653rs = { version = "0.5", default-features = false }
a653rs = { version = "0.6", default-features = false }
cobs = { version = "0.2.3", default-features = false }
crc16 = { version = "0.4.0", default-features = false }
heapless = { version = "0.8", default-features = false }
log = "0"
once_cell = { version = "1.19", default-features = false }
postcard = { version = "1.0", default-features = false }
serde = { version = "1.0", default-features = false }
serde_yaml = { version = "0.9", default-feaures = false }
serde_yaml = { version = "0.9", default-features = false }
signal-hook.version = "0.3"
uart_xilinx = "0.2"
volatile-register = "0.2"
# These are not on crates.io
a653rs-linux = { git = "https://github.com/florianhartung/a653rs-linux.git", branch = "queuing-ports", default-features = false }
a653rs-postcard = { git = "https://github.com/DLR-FT/a653rs-postcard.git", branch = "main", default-features = false }
a653rs-xng = { git = "https://github.com/DLR-FT/a653rs-xng.git", branch = "main", default-features = false }
a653rs-linux = { version = "0.2", default-features = false }
a653rs-postcard = { version = "0.4", default-features = false }
a653rs-xng = { version = "0.1", default-features = false }
xng-rs-log = { git = "https://github.com/DLR-FT/xng-rs-log.git", branch = "main", default-features = false }

[profile.dev]
Expand Down
6 changes: 3 additions & 3 deletions a653rs-router-linux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ extern "C" fn entry_point() {
let mut state = router.router::<INPUTS, OUTPUTS, MTU>(cfg).unwrap();
loop {
let res = state.forward::<MTU, _>(&ApexLinuxPartition);
#[cfg(feauture = "log")]
#[cfg(feature = "log")]
{
use a653rs_router::prelude::Error;
use log::{debug, trace};
Expand All @@ -69,7 +69,7 @@ extern "C" fn entry_point() {
Err(e) => debug!("Failed to forward message: {}", e),
}
}
#[cfg(not(feauture = "log"))]
#[cfg(not(feature = "log"))]
let _res = res;
}
}
Expand All @@ -78,7 +78,7 @@ fn main() {
ApexLogger::install_panic_hook();
#[cfg(feature = "log")]
{
ApexLogger::install_logger(log::LevelFilter::Trace).unwrap();
ApexLogger::install_logger(log::LevelFilter::Debug).unwrap();
}
RouterPartition.run()
}
15 changes: 12 additions & 3 deletions a653rs-router/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{prelude::InterfaceConfig, types::VirtualLinkId};
use a653rs::bindings::{
MessageRange, MessageSize, QueuingDiscipline as ApexQueuingDiscipline, StackSize,
use crate::{ports::PortError, prelude::InterfaceConfig, types::VirtualLinkId};
use a653rs::{
bindings::{MessageRange, MessageSize, QueuingDiscipline as ApexQueuingDiscipline, StackSize},
prelude::Name,
};
use core::{ops::Deref, str::FromStr, time::Duration};
use heapless::{LinearMap, String, Vec};
Expand Down Expand Up @@ -40,6 +41,14 @@ impl From<String<20>> for PortName {
}
}

impl TryInto<Name> for PortName {
type Error = PortError;

fn try_into(self) -> Result<Name, Self::Error> {
Name::from_str(&self).map_err(|_e| PortError::Create)
}
}

/// The name of a network interface or socket address.
///
/// Some examples
Expand Down
2 changes: 1 addition & 1 deletion a653rs-router/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
entry_point: extern "C" fn(),
) -> Result<Self, Error> {
Ok(Self {
resources: RouterResources::<I, P, IFS, PS>::create::<C>(interfaces, ports)?,
resources: RouterResources::<I, P, IFS, PS>::create::<C>(ctx, interfaces, ports)?,
process: RouterProcess::create(ctx, name, stack_size, entry_point)
.map_err(Error::Process)?,
})
Expand Down
242 changes: 9 additions & 233 deletions a653rs-router/src/ports.rs
Original file line number Diff line number Diff line change
@@ -1,201 +1,12 @@
use a653rs::{bindings::*, prelude::*};
use core::{fmt::Display, marker::PhantomData, str::FromStr, time::Duration};
use core::{fmt::Display, time::Duration};

use crate::{
config::{QueuingInCfg, QueuingOutCfg, SamplingInCfg, SamplingOutCfg},
network::PayloadSize,
prelude::*,
router::{RouterInput, RouterOutput},
};

#[derive(Debug)]
pub(crate) enum Port<H: ApexQueuingPortP4 + ApexSamplingPortP4> {
SamplingIn(SamplingIn<H>),
SamplingOut(SamplingOut<H>),
QueuingIn(QueuingIn<H>),
QueuingOut(QueuingOut<H>),
}

#[derive(Debug)]
pub(crate) struct PortData {
id: ApexLongInteger,
msg_size: MessageSize,
}

/// Used for accessing a sampling port destination using the bindings
#[derive(Debug)]
pub(crate) struct SamplingIn<H: ApexSamplingPortP4> {
_h: PhantomData<H>,
inner: PortData,
}

/// Used for accessing a sampling port source using the bindings
#[derive(Debug)]
pub(crate) struct SamplingOut<H: ApexSamplingPortP4> {
_h: PhantomData<H>,
inner: PortData,
}

/// Used for accessing a queuing port source using the bindings
#[derive(Debug)]
pub(crate) struct QueuingOut<H: ApexQueuingPortP4> {
_h: PhantomData<H>,
inner: PortData,
}

/// Used for accessing a queuing port destination using the bindings
#[derive(Debug)]
pub(crate) struct QueuingIn<H: ApexQueuingPortP4> {
_h: PhantomData<H>,
inner: PortData,
}

impl<H: ApexSamplingPortP4> RouterInput for SamplingIn<H> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
let buf = buf.validate_read(self.inner.msg_size)?;
let (_val, len) = unsafe {
<H as ApexSamplingPortP4>::read_sampling_message(self.inner.id, buf)
.map_err(|_e| PortError::Receive)
}?;
Ok(&buf[..(len as usize)])
}

fn mtu(&self) -> PayloadSize {
self.inner.msg_size as PayloadSize
}
}

impl<H: ApexQueuingPortP4> RouterInput for QueuingIn<H> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
let buf = buf.validate_read(self.inner.msg_size)?;
let timeout = Duration::from_micros(10).as_nanos() as ApexSystemTime;
let (val, _overflow) = unsafe {
<H as ApexQueuingPortP4>::receive_queuing_message(self.inner.id, timeout, buf)
.map_err(|_e| PortError::Receive)
}?;
Ok(&buf[..val as usize])
}

fn mtu(&self) -> PayloadSize {
self.inner.msg_size as PayloadSize
}
}

impl<H: ApexQueuingPortP4> RouterOutput for QueuingOut<H> {
fn send(&self, buf: &[u8]) -> Result<(), PortError> {
let buf = buf.validate_write(self.inner.msg_size)?;
let timeout = Duration::from_micros(10).as_nanos() as ApexSystemTime;
<H as ApexQueuingPortP4>::send_queuing_message(self.inner.id, buf, timeout)
.map_err(|_e| PortError::Send)
}

fn mtu(&self) -> PayloadSize {
self.inner.msg_size as PayloadSize
}
}

impl<H: ApexSamplingPortP4> RouterOutput for SamplingOut<H> {
fn send(&self, buf: &[u8]) -> Result<(), PortError> {
let buf = buf.validate_write(self.inner.msg_size)?;
<H as ApexSamplingPortP4>::write_sampling_message(self.inner.id, buf)
.map_err(|_e| PortError::Send)
}

fn mtu(&self) -> PayloadSize {
self.inner.msg_size as PayloadSize
}
}

impl<H: ApexSamplingPortP4> SamplingIn<H> {
pub(crate) fn create(name: PortName, value: SamplingInCfg) -> Result<Self, PortError> {
let id = <H as ApexSamplingPortP4>::create_sampling_port(
Name::from_str(&name)
.map_err(|_e| PortError::Create)?
.into(),
value.msg_size,
PortDirection::Destination,
value.refresh_period.as_nanos() as i64,
)
.map_err(|_e| PortError::Create)?;
Ok(Self {
_h: Default::default(),
inner: PortData {
id,
msg_size: value.msg_size,
},
})
}
}

impl<H: ApexSamplingPortP4> SamplingOut<H> {
pub(crate) fn create(name: PortName, value: SamplingOutCfg) -> Result<Self, PortError> {
let id = <H as ApexSamplingPortP4>::create_sampling_port(
Name::from_str(&name)
.map_err(|_e| PortError::Create)?
.into(),
value.msg_size,
PortDirection::Source,
// Use some non-zero duration.
// While refresh_period is ignored for the source
// It may produce an error if set to zero
SystemTime::Normal(Duration::from_nanos(1)).into(),
)
.map_err(|_e| PortError::Create)?;
Ok(Self {
_h: Default::default(),
inner: PortData {
id,
msg_size: value.msg_size,
},
})
}
}

impl<H: ApexQueuingPortP4> QueuingIn<H> {
pub(crate) fn create(name: PortName, value: QueuingInCfg) -> Result<Self, PortError> {
let id = <H as ApexQueuingPortP4>::create_queuing_port(
Name::from_str(&name)
.map_err(|_e| PortError::Create)?
.into(),
value.msg_size,
value.msg_count,
PortDirection::Destination,
value.discipline.into(),
)
.map_err(|_e| PortError::Create)?;
Ok(Self {
_h: Default::default(),
inner: PortData {
id,
msg_size: value.msg_size,
},
})
}
}

impl<H: ApexQueuingPortP4> QueuingOut<H> {
pub(crate) fn create(name: PortName, value: QueuingOutCfg) -> Result<Self, PortError> {
let id = <H as ApexQueuingPortP4>::create_queuing_port(
Name::from_str(&name)
.map_err(|_e| PortError::Create)?
.into(),
value.msg_size,
value.msg_count,
PortDirection::Source,
value.discipline.into(),
)
.map_err(|_e| PortError::Create)?;
Ok(Self {
_h: Default::default(),
inner: PortData {
id,
msg_size: value.msg_size,
},
})
}
}

impl<const M: MessageSize, S: ApexSamplingPortP4> RouterInput for SamplingPortDestination<M, S> {
impl<S: ApexSamplingPortP4> RouterInput for SamplingPortDestination<S> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
router_bench!(begin_apex_receive, self.id() as u16);
let res = self.receive(buf);
Expand All @@ -205,11 +16,11 @@ impl<const M: MessageSize, S: ApexSamplingPortP4> RouterInput for SamplingPortDe
}

fn mtu(&self) -> PayloadSize {
M as PayloadSize
self.size() as PayloadSize
}
}

impl<const M: MessageSize, S: ApexSamplingPortP4> RouterOutput for SamplingPortSource<M, S> {
impl<S: ApexSamplingPortP4> RouterOutput for SamplingPortSource<S> {
fn send(&self, buf: &[u8]) -> Result<(), PortError> {
router_bench!(begin_apex_send, self.id() as u16);
let res = self.send(buf);
Expand All @@ -219,13 +30,11 @@ impl<const M: MessageSize, S: ApexSamplingPortP4> RouterOutput for SamplingPortS
}

fn mtu(&self) -> PayloadSize {
M as PayloadSize
self.size() as PayloadSize
}
}

impl<const M: MessageSize, const R: MessageRange, Q: ApexQueuingPortP4> RouterInput
for QueuingPortReceiver<M, R, Q>
{
impl<Q: ApexQueuingPortP4> RouterInput for QueuingPortReceiver<Q> {
fn receive<'a>(&self, buf: &'a mut [u8]) -> Result<&'a [u8], PortError> {
let timeout = SystemTime::Normal(Duration::from_micros(10));
router_bench!(begin_apex_send, self.id() as u16);
Expand All @@ -236,13 +45,11 @@ impl<const M: MessageSize, const R: MessageRange, Q: ApexQueuingPortP4> RouterIn
}

fn mtu(&self) -> PayloadSize {
M as PayloadSize
self.size()
}
}

impl<const M: MessageSize, const R: MessageRange, Q: ApexQueuingPortP4> RouterOutput
for QueuingPortSender<M, R, Q>
{
impl<Q: ApexQueuingPortP4> RouterOutput for QueuingPortSender<Q> {
fn send(&self, buf: &[u8]) -> Result<(), PortError> {
let timeout = SystemTime::Normal(Duration::from_micros(10));
router_bench!(begin_apex_send, self.id() as u16);
Expand All @@ -253,38 +60,7 @@ impl<const M: MessageSize, const R: MessageRange, Q: ApexQueuingPortP4> RouterOu
}

fn mtu(&self) -> PayloadSize {
M as PayloadSize
}
}

trait BufferExt {
fn validate_read(&mut self, size: MessageSize) -> Result<&mut Self, PortError>;

/// Validate a buffer to be at most as long as the given usize.
/// If not returns [Self] with the length of the passed buffer
fn validate_write(&self, size: MessageSize) -> Result<&Self, PortError>;
}

impl BufferExt for [ApexByte] {
fn validate_read(&mut self, size: MessageSize) -> Result<&mut Self, PortError> {
if usize::try_from(size)
.map(|ss| self.len() < ss)
.unwrap_or(true)
{
return Err(PortError::Receive);
}
Ok(self)
}

fn validate_write(&self, size: MessageSize) -> Result<&Self, PortError> {
if usize::try_from(size)
.map(|ss| self.len() > ss)
.unwrap_or(false)
|| self.is_empty()
{
return Err(PortError::Send);
}
Ok(self)
self.size()
}
}

Expand Down
Loading