Skip to content

Commit

Permalink
Merge pull request #26 from asynchronics/feature/cbor-instead-of-msgpack
Browse files Browse the repository at this point in the history
Replace MessagePack by CBOR
  • Loading branch information
sbarral authored Jun 19, 2024
2 parents 4039d96 + 8ec5cd9 commit cb7caa1
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 66 deletions.
4 changes: 2 additions & 2 deletions asynchronix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ autotests = false

[features]
# Remote procedure call API.
rpc = ["dep:rmp-serde", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"]
rpc = ["dep:ciborium", "dep:serde", "dep:tonic", "dep:prost", "dep:prost-types", "dep:bytes"]
# This feature forces protobuf/gRPC code (re-)generation.
rpc-codegen = ["dep:tonic-build"]
# gRPC service.
Expand Down Expand Up @@ -57,7 +57,7 @@ tai-time = "0.3"
bytes = { version = "1", default-features = false, optional = true }
prost = { version = "0.12", optional = true }
prost-types = { version = "0.12", optional = true }
rmp-serde = { version = "1.1", optional = true }
ciborium = { version = "0.2.2", optional = true }
serde = { version = "1", optional = true }

# gRPC service dependencies.
Expand Down
13 changes: 8 additions & 5 deletions asynchronix/src/registry/event_sink_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;

use rmp_serde::encode::Error as RmpEncodeError;
use ciborium;
use serde::Serialize;

use crate::ports::EventSinkStream;

type SerializationError = ciborium::ser::Error<std::io::Error>;

/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
Expand Down Expand Up @@ -58,7 +60,7 @@ pub(crate) trait EventSinkStreamAny: Send + 'static {
fn close(&mut self);

/// Encode and collect all events in a vector.
fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError>;
fn collect(&mut self) -> Result<Vec<Vec<u8>>, SerializationError>;
}

impl<E> EventSinkStreamAny for E
Expand All @@ -78,10 +80,11 @@ where
self.close();
}

fn collect(&mut self) -> Result<Vec<Vec<u8>>, RmpEncodeError> {
fn collect(&mut self) -> Result<Vec<Vec<u8>>, SerializationError> {
self.__try_fold(Vec::new(), |mut encoded_events, event| {
rmp_serde::to_vec_named(&event).map(|encoded_event| {
encoded_events.push(encoded_event);
let mut buffer = Vec::new();
ciborium::into_writer(&event, &mut buffer).map(|_| {
encoded_events.push(buffer);

encoded_events
})
Expand Down
53 changes: 30 additions & 23 deletions asynchronix/src/registry/event_source_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use std::collections::HashMap;
use std::fmt;
use std::time::Duration;

use rmp_serde::decode::Error as RmpDecodeError;
use ciborium;
use serde::de::DeserializeOwned;

use crate::ports::EventSource;
use crate::simulation::{Action, ActionKey};

type DeserializationError = ciborium::de::Error<std::io::Error>;

/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
Expand Down Expand Up @@ -50,41 +52,43 @@ impl fmt::Debug for EventSourceRegistry {
}
}

/// A type-erased `EventSource` that operates on MessagePack-encoded serialized
/// events.
/// A type-erased `EventSource` that operates on CBOR-encoded serialized events.
pub(crate) trait EventSourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts an event to all
/// connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError>;
/// The argument is expected to conform to the serde CBOR encoding.
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError>;

/// Returns a cancellable action and a cancellation key; when processed, the
/// action broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError>;
/// The argument is expected to conform to the serde CBOR encoding.
fn keyed_event(
&mut self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;

/// Returns a periodically recurring action which, when processed,
/// broadcasts an event to all connected input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
/// The argument is expected to conform to the serde CBOR encoding.
fn periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<Action, RmpDecodeError>;
serialized_arg: &[u8],
) -> Result<Action, DeserializationError>;

/// Returns a cancellable, periodically recurring action and a cancellation
/// key; when processed, the action broadcasts an event to all connected
/// input ports.
///
/// The argument is expected to conform to the serde MessagePack encoding.
/// The argument is expected to conform to the serde CBOR encoding.
fn keyed_periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError>;
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError>;

/// Human-readable name of the event type, as returned by
/// `any::type_name()`.
Expand All @@ -95,25 +99,28 @@ impl<T> EventSourceAny for EventSource<T>
where
T: DeserializeOwned + Clone + Send + 'static,
{
fn event(&mut self, msgpack_arg: &[u8]) -> Result<Action, RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.event(arg))
fn event(&mut self, serialized_arg: &[u8]) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.event(arg))
}
fn keyed_event(&mut self, msgpack_arg: &[u8]) -> Result<(Action, ActionKey), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_event(arg))
fn keyed_event(
&mut self,
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_event(arg))
}
fn periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<Action, RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.periodic_event(period, arg))
serialized_arg: &[u8],
) -> Result<Action, DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.periodic_event(period, arg))
}
fn keyed_periodic_event(
&mut self,
period: Duration,
msgpack_arg: &[u8],
) -> Result<(Action, ActionKey), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| self.keyed_periodic_event(period, arg))
serialized_arg: &[u8],
) -> Result<(Action, ActionKey), DeserializationError> {
ciborium::from_reader(serialized_arg).map(|arg| self.keyed_periodic_event(period, arg))
}
fn event_type_name(&self) -> &'static str {
std::any::type_name::<T>()
Expand Down
31 changes: 17 additions & 14 deletions asynchronix/src/registry/query_source_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;

use rmp_serde::decode::Error as RmpDecodeError;
use rmp_serde::encode::Error as RmpEncodeError;
use ciborium;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::ports::{QuerySource, ReplyReceiver};
use crate::simulation::Action;

type DeserializationError = ciborium::de::Error<std::io::Error>;
type SerializationError = ciborium::ser::Error<std::io::Error>;

/// A registry that holds all sources and sinks meant to be accessed through
/// remote procedure calls.
#[derive(Default)]
Expand Down Expand Up @@ -52,18 +54,18 @@ impl fmt::Debug for QuerySourceRegistry {
}
}

/// A type-erased `QuerySource` that operates on MessagePack-encoded serialized
/// queries and returns MessagePack-encoded replies.
/// A type-erased `QuerySource` that operates on CBOR-encoded serialized queries
/// and returns CBOR-encoded replies.
pub(crate) trait QuerySourceAny: Send + 'static {
/// Returns an action which, when processed, broadcasts a query to all
/// connected replier ports.
///
///
/// The argument is expected to conform to the serde MessagePack encoding.
/// The argument is expected to conform to the serde CBOR encoding.
fn query(
&mut self,
msgpack_arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError>;
arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError>;

/// Human-readable name of the request type, as returned by
/// `any::type_name()`.
Expand All @@ -81,9 +83,9 @@ where
{
fn query(
&mut self,
msgpack_arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), RmpDecodeError> {
rmp_serde::from_read(msgpack_arg).map(|arg| {
arg: &[u8],
) -> Result<(Action, Box<dyn ReplyReceiverAny>), DeserializationError> {
ciborium::from_reader(arg).map(|arg| {
let (action, reply_recv) = self.query(arg);
let reply_recv: Box<dyn ReplyReceiverAny> = Box::new(reply_recv);

Expand All @@ -100,20 +102,21 @@ where
}
}

/// A type-erased `ReplyReceiver` that returns MessagePack-encoded replies..
/// A type-erased `ReplyReceiver` that returns CBOR-encoded replies.
pub(crate) trait ReplyReceiverAny {
/// Take the replies, if any, encode them and collect them in a vector.
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>>;
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, SerializationError>>;
}

impl<R: Serialize + 'static> ReplyReceiverAny for ReplyReceiver<R> {
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, RmpEncodeError>> {
fn take_collect(&mut self) -> Option<Result<Vec<Vec<u8>>, SerializationError>> {
let replies = self.take()?;

let encoded_replies = (move || {
let mut encoded_replies = Vec::new();
for reply in replies {
let encoded_reply = rmp_serde::to_vec_named(&reply)?;
let mut encoded_reply = Vec::new();
ciborium::into_writer(&reply, &mut encoded_reply)?;
encoded_replies.push(encoded_reply);
}

Expand Down
42 changes: 23 additions & 19 deletions asynchronix/src/rpc/services/controller_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl ControllerService {
..
} => move || -> Result<Option<KeyRegistryId>, Error> {
let source_name = &request.source_name;
let msgpack_event = &request.event;
let event = &request.event;
let with_key = request.with_key;
let period = request
.period
Expand Down Expand Up @@ -188,23 +188,24 @@ impl ControllerService {
))?;

let (action, action_key) = match (with_key, period) {
(false, None) => source.event(msgpack_event).map(|action| (action, None)),
(false, None) => source.event(event).map(|action| (action, None)),
(false, Some(period)) => source
.periodic_event(period, msgpack_event)
.periodic_event(period, event)
.map(|action| (action, None)),
(true, None) => source
.keyed_event(msgpack_event)
.keyed_event(event)
.map(|(action, key)| (action, Some(key))),
(true, Some(period)) => source
.keyed_periodic_event(period, msgpack_event)
.keyed_periodic_event(period, event)
.map(|(action, key)| (action, Some(key))),
}
.map_err(|_| {
.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}'",
source.event_type_name()
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
Expand Down Expand Up @@ -296,19 +297,20 @@ impl ControllerService {
..
} => move || -> Result<(), Error> {
let source_name = &request.source_name;
let msgpack_event = &request.event;
let event = &request.event;

let source = event_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;

let event = source.event(msgpack_event).map_err(|_| {
let event = source.event(event).map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be deserialized as type '{}'",
source.event_type_name()
"the event could not be deserialized as type '{}': {}",
source.event_type_name(),
e
),
)
})?;
Expand Down Expand Up @@ -340,19 +342,20 @@ impl ControllerService {
..
} => move || -> Result<Vec<Vec<u8>>, Error> {
let source_name = &request.source_name;
let msgpack_request = &request.request;
let request = &request.request;

let source = query_source_registry.get_mut(source_name).ok_or(to_error(
ErrorCode::SourceNotFound,
"no source is registered with the name '{}'".to_string(),
))?;

let (query, mut promise) = source.query(msgpack_request).map_err(|_| {
let (query, mut promise) = source.query(request).map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the request could not be deserialized as type '{}'",
source.request_type_name()
"the request could not be deserialized as type '{}': {}",
source.request_type_name(),
e
),
)
})?;
Expand All @@ -364,12 +367,13 @@ impl ControllerService {
"a reply to the query was expected but none was available".to_string(),
))?;

replies.map_err(|_| {
replies.map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the reply could not be serialized as type '{}'",
source.reply_type_name()
"the reply could not be serialized as type '{}': {}",
source.reply_type_name(),
e
),
)
})
Expand Down
7 changes: 4 additions & 3 deletions asynchronix/src/rpc/services/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ impl MonitorService {
format!("no sink is registered with the name '{}'", sink_name),
))?;

sink.collect().map_err(|_| {
sink.collect().map_err(|e| {
to_error(
ErrorCode::InvalidMessage,
format!(
"the event could not be serialized from type '{}'",
sink.event_type_name()
"the event could not be serialized from type '{}': {}",
sink.event_type_name(),
e
),
)
})
Expand Down

0 comments on commit cb7caa1

Please sign in to comment.