Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol attachment to payload #923

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,12 @@ impl BacktrackableWriter for ZBufWriter<'_> {
#[cfg(feature = "std")]
impl<'a> io::Write for ZBufWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
match <Self as Writer>::write(self, buf) {
Ok(n) => Ok(n.get()),
Err(_) => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"UnexpectedEof",
)),
Err(_) => Err(io::ErrorKind::UnexpectedEof.into()),
}
}

Expand Down
62 changes: 44 additions & 18 deletions zenoh/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl Payload {
Ok(Payload::new(buf))
}

/// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait.
pub fn writer(&mut self) -> PayloadWriter<'_> {
PayloadWriter(self.0.writer())
}

/// Get a [`PayloadReader`] implementing [`std::io::Read`] trait.
pub fn iter<T>(&self) -> PayloadIterator<'_, T>
where
Expand All @@ -104,12 +109,7 @@ impl Payload {
}
}

/// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait.
pub fn writer(&mut self) -> PayloadWriter<'_> {
PayloadWriter(self.0.writer())
}

/// Encode an object of type `T` as a [`Value`] using the [`ZSerde`].
/// Serialize an object of type `T` as a [`Value`] using the [`ZSerde`].
///
/// ```rust
/// use zenoh::payload::Payload;
Expand All @@ -126,7 +126,7 @@ impl Payload {
ZSerde.serialize(t)
}

/// Decode an object of type `T` from a [`Value`] using the [`ZSerde`].
/// Deserialize an object of type `T` from a [`Value`] using the [`ZSerde`].
pub fn deserialize<'a, T>(&'a self) -> ZResult<T>
where
ZSerde: Deserialize<'a, T>,
Expand All @@ -137,7 +137,7 @@ impl Payload {
.map_err(|e| zerror!("{:?}", e).into())
}

/// Decode an object of type `T` from a [`Value`] using the [`ZSerde`].
/// Infallibly deserialize an object of type `T` from a [`Value`] using the [`ZSerde`].
pub fn into<'a, T>(&'a self) -> T
where
ZSerde: Deserialize<'a, T, Error = Infallible>,
Expand Down Expand Up @@ -858,7 +858,7 @@ impl Serialize<&serde_yaml::Value> for ZSerde {

fn serialize(self, t: &serde_yaml::Value) -> Self::Output {
let mut payload = Payload::empty();
serde_yaml::to_writer(payload.0.writer(), t)?;
serde_yaml::to_writer(payload.writer(), t)?;
Ok(payload)
}
}
Expand Down Expand Up @@ -1092,15 +1092,9 @@ impl TryFrom<Payload> for SharedMemoryBuf {
}

// Tuple
impl<A, B> Serialize<(A, B)> for ZSerde
where
A: Into<Payload>,
B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: (A, B)) -> Self::Output {
let (a, b) = t;
macro_rules! impl_tuple {
($t:expr) => {{
let (a, b) = $t;

let codec = Zenoh080::new();
let mut buffer: ZBuf = ZBuf::empty();
Expand All @@ -1117,6 +1111,29 @@ where
}

Payload::new(buffer)
}};
}
impl<A, B> Serialize<(A, B)> for ZSerde
where
A: Into<Payload>,
B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: (A, B)) -> Self::Output {
impl_tuple!(t)
}
}

impl<A, B> Serialize<&(A, B)> for ZSerde
where
for<'a> &'a A: Into<Payload>,
for<'b> &'b B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: &(A, B)) -> Self::Output {
impl_tuple!(t)
}
}

Expand Down Expand Up @@ -1402,5 +1419,14 @@ mod tests {
println!("Deserialize:\t{:?}\n", p);
let o = HashMap::from_iter(p.iter::<(usize, Vec<u8>)>());
assert_eq!(hm, o);

let mut hm: HashMap<String, String> = HashMap::new();
hm.insert(String::from("0"), String::from("a"));
hm.insert(String::from("1"), String::from("b"));
println!("Serialize:\t{:?}", hm);
let p = Payload::from_iter(hm.iter());
println!("Deserialize:\t{:?}\n", p);
let o = HashMap::from_iter(p.iter::<(String, String)>());
assert_eq!(hm, o);
}
}
216 changes: 8 additions & 208 deletions zenoh/src/sample/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,226 +212,26 @@ impl From<Option<DataInfo>> for SourceInfo {
}

mod attachment {
#[zenoh_macros::unstable]
use zenoh_buffers::{
reader::{HasReader, Reader},
writer::HasWriter,
ZBuf, ZBufReader, ZSlice,
};
#[zenoh_macros::unstable]
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use crate::Payload;
#[zenoh_macros::unstable]
use zenoh_protocol::zenoh::ext::AttachmentType;

/// A builder for [`Attachment`]
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct AttachmentBuilder {
pub(crate) inner: Vec<u8>,
}
#[zenoh_macros::unstable]
impl Default for AttachmentBuilder {
fn default() -> Self {
Self::new()
}
}
#[zenoh_macros::unstable]
impl AttachmentBuilder {
pub fn new() -> Self {
Self { inner: Vec::new() }
}
fn _insert(&mut self, key: &[u8], value: &[u8]) {
let codec = Zenoh080;
let mut writer = self.inner.writer();
codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure
codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure
}
/// Inserts a key-value pair to the attachment.
///
/// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key.
pub fn insert<Key: AsRef<[u8]> + ?Sized, Value: AsRef<[u8]> + ?Sized>(
&mut self,
key: &Key,
value: &Value,
) {
self._insert(key.as_ref(), value.as_ref())
}
pub fn build(self) -> Attachment {
Attachment {
inner: self.inner.into(),
}
}
}
#[zenoh_macros::unstable]
impl From<AttachmentBuilder> for Attachment {
fn from(value: AttachmentBuilder) -> Self {
Attachment {
inner: value.inner.into(),
}
}
}
#[zenoh_macros::unstable]
impl From<AttachmentBuilder> for Option<Attachment> {
fn from(value: AttachmentBuilder) -> Self {
if value.inner.is_empty() {
None
} else {
Some(value.into())
}
}
}
pub type Attachment = Payload;

#[zenoh_macros::unstable]
#[derive(Clone)]
pub struct Attachment {
pub(crate) inner: ZBuf,
}
#[zenoh_macros::unstable]
impl Default for Attachment {
fn default() -> Self {
Self::new()
}
}
#[zenoh_macros::unstable]
impl<const ID: u8> From<Attachment> for AttachmentType<ID> {
fn from(this: Attachment) -> Self {
AttachmentType { buffer: this.inner }
AttachmentType {
buffer: this.into(),
}
}
}

#[zenoh_macros::unstable]
impl<const ID: u8> From<AttachmentType<ID>> for Attachment {
fn from(this: AttachmentType<ID>) -> Self {
Attachment { inner: this.buffer }
}
}
#[zenoh_macros::unstable]
impl Attachment {
pub fn new() -> Self {
Self {
inner: ZBuf::empty(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize {
self.iter().count()
}
pub fn iter(&self) -> AttachmentIterator {
self.into_iter()
}
fn _get(&self, key: &[u8]) -> Option<ZSlice> {
self.iter()
.find_map(|(k, v)| (k.as_slice() == key).then_some(v))
}
pub fn get<Key: AsRef<[u8]>>(&self, key: &Key) -> Option<ZSlice> {
self._get(key.as_ref())
}
fn _insert(&mut self, key: &[u8], value: &[u8]) {
let codec = Zenoh080;
let mut writer = self.inner.writer();
codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure
codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure
}
/// Inserts a key-value pair to the attachment.
///
/// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key.
///
/// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`]
pub fn insert<Key: AsRef<[u8]> + ?Sized, Value: AsRef<[u8]> + ?Sized>(
&mut self,
key: &Key,
value: &Value,
) {
self._insert(key.as_ref(), value.as_ref())
}
fn _extend(&mut self, with: Self) -> &mut Self {
for slice in with.inner.zslices().cloned() {
self.inner.push_zslice(slice);
}
self
}
pub fn extend(&mut self, with: impl Into<Self>) -> &mut Self {
let with = with.into();
self._extend(with)
}
}
#[zenoh_macros::unstable]
pub struct AttachmentIterator<'a> {
reader: ZBufReader<'a>,
}
#[zenoh_macros::unstable]
impl<'a> core::iter::IntoIterator for &'a Attachment {
type Item = (ZSlice, ZSlice);
type IntoIter = AttachmentIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
AttachmentIterator {
reader: self.inner.reader(),
}
}
}
#[zenoh_macros::unstable]
impl core::fmt::Debug for Attachment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
for (key, value) in self {
let key = key.as_slice();
let value = value.as_slice();
match core::str::from_utf8(key) {
Ok(key) => write!(f, "\"{key}\": ")?,
Err(_) => {
write!(f, "0x")?;
for byte in key {
write!(f, "{byte:02X}")?
}
}
}
match core::str::from_utf8(value) {
Ok(value) => write!(f, "\"{value}\", ")?,
Err(_) => {
write!(f, "0x")?;
for byte in value {
write!(f, "{byte:02X}")?
}
write!(f, ", ")?
}
}
}
write!(f, "}}")
}
}
#[zenoh_macros::unstable]
impl<'a> core::iter::Iterator for AttachmentIterator<'a> {
type Item = (ZSlice, ZSlice);
fn next(&mut self) -> Option<Self::Item> {
let key = Zenoh080.read(&mut self.reader).ok()?;
let value = Zenoh080.read(&mut self.reader).ok()?;
Some((key, value))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(
(self.reader.remaining() != 0) as usize,
Some(self.reader.remaining() / 2),
)
}
}
#[zenoh_macros::unstable]
impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder {
fn from_iter<T: IntoIterator<Item = (&'a [u8], &'a [u8])>>(iter: T) -> Self {
let codec = Zenoh080;
let mut buffer: Vec<u8> = Vec::new();
let mut writer = buffer.writer();
for (key, value) in iter {
codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures
codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures
}
Self { inner: buffer }
}
}
#[zenoh_macros::unstable]
impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment {
fn from_iter<T: IntoIterator<Item = (&'a [u8], &'a [u8])>>(iter: T) -> Self {
AttachmentBuilder::from_iter(iter).into()
this.buffer.into()
}
}
}
Expand Down Expand Up @@ -468,7 +268,7 @@ impl TryFrom<u64> for SampleKind {
}

#[zenoh_macros::unstable]
pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator};
pub use attachment::Attachment;

/// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields.
pub struct SampleFields {
Expand Down
Loading
Loading