Skip to content

Commit

Permalink
Merge pull request #39 from EspressoSystems/rm/state-refactor
Browse files Browse the repository at this point in the history
state refactor
  • Loading branch information
rob-maron authored Apr 29, 2024
2 parents ea809bd + 93caeed commit b18d932
Show file tree
Hide file tree
Showing 17 changed files with 667 additions and 415 deletions.
117 changes: 117 additions & 0 deletions cdn-broker/src/connections/broadcast/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! This is where we define routing for broadcast messages.
mod relational_map;

use std::{collections::HashSet, sync::Arc};

use cdn_proto::{
connection::{Bytes, UserPublicKey},
def::RunDef,
discovery::BrokerIdentifier,
message::Topic,
mnemonic,
};
use tokio::spawn;
use tracing::debug;

use crate::Inner;

use self::relational_map::RelationalMap;

/// Our broadcast map is just two associative (bidirectional, multi) maps:
/// one for brokers and one for users.
pub struct BroadcastMap {
pub users: RelationalMap<UserPublicKey, Topic>,
pub brokers: RelationalMap<BrokerIdentifier, Topic>,

pub previous_subscribed_topics: HashSet<Topic>,
}

/// Default for our map just wraps the items with locks.
impl Default for BroadcastMap {
fn default() -> Self {
Self {
users: RelationalMap::new(),
brokers: RelationalMap::new(),
previous_subscribed_topics: HashSet::new(),
}
}
}

/// The new implementation just uses default
impl BroadcastMap {
pub fn new() -> Self {
Self::default()
}
}

impl<Def: RunDef> Inner<Def> {
/// Send a broadcast message to both users and brokers. First figures out where the message
/// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing;
/// if we receive a message from a broker we should only be forwarding it to applicable users.
pub async fn handle_broadcast_message(
self: &Arc<Self>,
mut topics: Vec<Topic>,
message: &Bytes,
to_users_only: bool,
) {
// Deduplicate topics
topics.dedup();

// Aggregate recipients
let mut broker_recipients = HashSet::new();
let mut user_recipients = HashSet::new();

for topic in topics {
// If we can send to brokers, we should do it
if !to_users_only {
broker_recipients.extend(
self.connections
.read()
.await
.broadcast_map
.brokers
.get_keys_by_value(&topic),
);
}
user_recipients.extend(
self.connections
.read()
.await
.broadcast_map
.users
.get_keys_by_value(&topic),
);
}

debug!(
num_brokers = broker_recipients.len(),
num_users = user_recipients.len(),
msg = mnemonic(&**message),
"broadcast",
);

// If we can send to brokers, do so
if !to_users_only {
// Send to all brokers
for broker in broker_recipients {
let self_ = self.clone();
let broker_ = broker.clone();
let message_ = message.clone();
spawn(async move {
let _ = self_.send_to_broker(&broker_, message_).await;
});
}
}

// Send to all aggregated users
for user in user_recipients {
// Send to the corresponding user
let self_ = self.clone();
let message_ = message.clone();
spawn(async move {
let _ = self_.send_to_user(user, message_).await;
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,8 @@
//! This is where we define routing for broadcast messages.
use std::{
collections::{HashMap, HashSet},
hash::Hash,
};

use cdn_proto::{connection::UserPublicKey, discovery::BrokerIdentifier, message::Topic};
use parking_lot::RwLock;

/// Our broadcast map is just two associative (bidirectional, multi) maps:
/// one for brokers and one for users.
pub struct BroadcastMap {
pub users: RwLock<RelationalMap<UserPublicKey, Topic>>,
pub brokers: RwLock<RelationalMap<BrokerIdentifier, Topic>>,

pub previous_subscribed_topics: RwLock<HashSet<Topic>>,
}

/// Default for our map just wraps the items with locks.
impl Default for BroadcastMap {
fn default() -> Self {
Self {
users: RwLock::from(RelationalMap::new()),
brokers: RwLock::from(RelationalMap::new()),
previous_subscribed_topics: RwLock::from(HashSet::new()),
}
}
}

/// The new implementation just uses default
impl BroadcastMap {
pub fn new() -> Self {
Self::default()
}
}

/// A relational, bidirectional multimap that relates keys to a set of values,
/// and values to a set of keys.
pub struct RelationalMap<K: Hash + PartialEq + Eq + Clone, V: Hash + PartialEq + Eq + Clone> {
Expand Down
76 changes: 76 additions & 0 deletions cdn-broker/src/connections/broker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::sync::Arc;

use cdn_proto::{
connection::{protocols::Connection, Bytes},
def::RunDef,
discovery::BrokerIdentifier,
error::{Error, Result},
};
use tokio::spawn;
use tracing::error;

use crate::Inner;

impl<R: RunDef> Inner<R> {
/// Asynchronously send a message to all currently connected brokers. On failure,
/// the broker will be removed.
pub async fn spawn_send_to_brokers(self: &Arc<Self>, message: &Bytes) {
// For each broker,
for connection in &self.connections.read().await.brokers {
// Clone things we will need downstream
let message = message.clone();
let broker_identifier = connection.0.clone();
let connection = connection.1 .0.clone();
let connections = self.connections.clone();

// Spawn a task to send the message
spawn(async move {
if let Err(err) = connection.send_message_raw(message).await {
// If we fail, remove the broker from our map.
error!("failed to send message to broker: {err}");
connections.write().await.remove_broker(&broker_identifier);
};
});
}
}

/// Send a message to a particular broker. If it fails, it removes the broker from all maps.
/// Awaits on message acknowledgement and returns the error if it does
pub async fn send_to_broker(
self: &Arc<Self>,
broker_identifier: &BrokerIdentifier,
message: Bytes,
) -> Result<()> {
// If we are connected to them,
if let Some((connection, _)) = self.connections.read().await.brokers.get(broker_identifier)
{
// Send the message
if let Err(err) = connection.send_message_raw(message).await {
// Remove them if we failed to send it
error!("failed to send message to broker: {err}");
self.connections
.write()
.await
.remove_broker(broker_identifier);

// Return an error
return Err(Error::Connection(
"failed to send message to broker".to_string(),
));
};
} else {
// Remove the broker if they are not connected
self.connections
.write()
.await
.remove_broker(broker_identifier);

// Return an error
return Err(Error::Connection(
"failed to send message to broker".to_string(),
));
}

Ok(())
}
}
10 changes: 0 additions & 10 deletions cdn-broker/src/connections/direct.rs

This file was deleted.

79 changes: 79 additions & 0 deletions cdn-broker/src/connections/direct/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! This is where we define routing for direct messages.
mod versioned_map;

use std::sync::Arc;

use cdn_proto::{
connection::{Bytes, UserPublicKey},
def::RunDef,
discovery::BrokerIdentifier,
mnemonic,
};
use tokio::spawn;
use tracing::{debug, warn};

use crate::Inner;

use self::versioned_map::VersionedMap;

/// We define the direct map as just a type alias of a `VersionedMap`, which
// deals with version vectors.
pub type DirectMap = VersionedMap<UserPublicKey, BrokerIdentifier, BrokerIdentifier>;

impl<Def: RunDef> Inner<Def> {
/// Send a direct message to either a user or a broker. First figures out where the message
/// is supposed to go, and then sends it. We have `to_user_only` bounds so we can stop thrashing;
/// if we receive a message from a broker we should only be forwarding it to applicable users.
pub async fn handle_direct_message(
self: &Arc<Self>,
user_public_key: UserPublicKey,
message: Bytes,
to_user_only: bool,
) {
// Look up from our map
if let Some(broker_identifier) = self
.connections
.read()
.await
.direct_map
.get(&user_public_key)
{
if *broker_identifier == self.connections.read().await.identity {
// We own the user, send it this way
debug!(
user = mnemonic(&user_public_key),
msg = mnemonic(&*message),
"direct",
);

// Send to the corresponding user
let self_ = self.clone();
spawn(async move {
let _ = self_.send_to_user(user_public_key, message).await;
});
} else {
// If we don't have the stipulation to send it to ourselves only
// This is so we don't thrash between brokers
if !to_user_only {
debug!(
broker = %broker_identifier,
msg = mnemonic(&*message),
"direct",
);

// Asynchronously send to the broker responsible
let self_ = self.clone();
let broker_identifier_ = broker_identifier.clone();
spawn(async move {
let _ = self_.send_to_broker(&broker_identifier_, message).await;
});
}
}
} else {
// Warning if the recipient user did not exist.
// TODO: Add sync in here to prevent forking. This is likely a problem.
warn!(id = mnemonic(&user_public_key), "user did not exist in map");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<
entry.version += 1;
}

// Set the versional equal to the one we supplied
// Set the version equal to the one we supplied
entry.value = v;
}

Expand Down Expand Up @@ -124,6 +124,24 @@ impl<
}
}

/// Remove all entries from the map where the value equals `V`.
/// Does not count as a local modification.
/// TODO: see if we can remove some `.clone()`s here.
pub fn remove_by_value_no_modify(&mut self, v: &V) {
// Get all the keys that have the value we want to purge
let keys: Vec<K> = self
.underlying_map
.iter()
.filter(|(_, vv)| vv.value == Some(v.clone()))
.map(|(k, _)| k.clone())
.collect();

// Remove all associated keys
for key in keys {
self.underlying_map.remove(&key);
}
}

/// Get the full underlying map, cloning it. Returns our own conflict identity,
/// but does not return any of the locally modified keys.
pub fn get_full(&self) -> Self {
Expand Down Expand Up @@ -308,5 +326,35 @@ pub mod tests {
assert!(map_0.get(&"user0").is_none());
}

// TODO: fuzzy tests for this
#[test]
fn test_purge() {
// Create the map under test
let mut map: VersionedMap<&str, &str, u64> = VersionedMap::new(0);

// Insert "user0" as having "broker0" value
map.insert("user0", "broker0");
assert!(map.get(&"user0") == Some(&"broker0"));

// Insert "user0" as having "broker0" value
map.insert("user1", "broker0");
assert!(map.get(&"user1") == Some(&"broker0"));

// Insert "user2" as having "broker1" value
map.insert("user2", "broker1");
assert!(map.get(&"user2") == Some(&"broker1"));

// Purge all values that are "broker0"
map.remove_by_value_no_modify(&"broker0");

// Expect user0 and user1 to be gone
assert!(map.get(&"user0").is_none());
assert!(map.get(&"user1").is_none());

// Expect user2 to still be present
assert!(map.get(&"user2") == Some(&"broker1"));

// Test that the removes didn't count as local modifications
let diff = map.diff();
assert!(diff.underlying_map.len() == 1);
}
}
Loading

0 comments on commit b18d932

Please sign in to comment.