Skip to content

Commit

Permalink
better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Apr 29, 2024
1 parent b18d932 commit 31a2f7f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
9 changes: 6 additions & 3 deletions cdn-broker/src/connections/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl<R: RunDef> Inner<R> {
if let Err(err) = connection.send_message_raw(message).await {
// If we fail, remove the broker from our map.
error!("failed to send message to broker: {err}");
connections.write().await.remove_broker(&broker_identifier);
connections
.write()
.await
.remove_broker(&broker_identifier, "failed to send message");
};
});
}
Expand All @@ -51,7 +54,7 @@ impl<R: RunDef> Inner<R> {
self.connections
.write()
.await
.remove_broker(broker_identifier);
.remove_broker(broker_identifier, "failed to send message");

// Return an error
return Err(Error::Connection(
Expand All @@ -63,7 +66,7 @@ impl<R: RunDef> Inner<R> {
self.connections
.write()
.await
.remove_broker(broker_identifier);
.remove_broker(broker_identifier, "not connected");

// Return an error
return Err(Error::Connection(
Expand Down
18 changes: 11 additions & 7 deletions cdn-broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<Def: RunDef> Connections<Def> {

// We should remove the users that are different, if they exist locally.
for user in users_to_remove {
self.remove_user(user);
self.remove_user(user, "user connected elsewhere");
}
}

Expand Down Expand Up @@ -127,7 +127,7 @@ impl<Def: RunDef> Connections<Def> {
info!(id = %broker_identifier, "broker connected");

// Remove the old broker if it exists
self.remove_broker(&broker_identifier);
self.remove_broker(&broker_identifier, "already existed");

self.brokers.insert(broker_identifier, (connection, handle));
}
Expand All @@ -146,7 +146,7 @@ impl<Def: RunDef> Connections<Def> {
info!(id = mnemonic(user_public_key), "user connected");

// Remove the old user if it exists
self.remove_user(user_public_key.clone());
self.remove_user(user_public_key.clone(), "already existed");

// Add to our map. Remove the old one if it exists
self.users
Expand All @@ -164,12 +164,12 @@ impl<Def: RunDef> Connections<Def> {

/// Remove a broker from our map by their identifier. Also removes them
/// from our broadcast map, in case they were subscribed to any topics.
pub fn remove_broker(&mut self, broker_identifier: &BrokerIdentifier) {
pub fn remove_broker(&mut self, broker_identifier: &BrokerIdentifier, reason: &str) {
// Remove from broker list, cancelling the previous task if it exists
if let Some(previous_handle) = self.brokers.remove(broker_identifier).map(|(_, h)| h) {
// Decrement the metric for the number of brokers connected
metrics::NUM_BROKERS_CONNECTED.dec();
error!(id = %broker_identifier, "broker disconnected");
error!(id = %broker_identifier, reason = reason, "broker disconnected");

// Cancel the broker's task
previous_handle.abort();
Expand All @@ -186,12 +186,16 @@ impl<Def: RunDef> Connections<Def> {
/// from our broadcast map, in case they were subscribed to any topics, and
/// the versioned vector map. This is so other brokers don't keep trying
/// to send us messages for a disconnected user.
pub fn remove_user(&mut self, user_public_key: UserPublicKey) {
pub fn remove_user(&mut self, user_public_key: UserPublicKey, reason: &str) {
// Remove from user list, returning the previous handle if it exists
if let Some(previous_handle) = self.users.remove(&user_public_key).map(|(_, h)| h) {
// Decrement the metric for the number of users connected
metrics::NUM_USERS_CONNECTED.dec();
warn!(id = mnemonic(&user_public_key), "user disconnected");
warn!(
id = mnemonic(&user_public_key),
reason = reason,
"user disconnected"
);

// Cancel the user's task
previous_handle.abort();
Expand Down
10 changes: 8 additions & 2 deletions cdn-broker/src/connections/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl<Def: RunDef> Inner<Def> {
if let Err(err) = connection.send_message_raw(message).await {
// If we fail to send the message, remove the user.
warn!("failed to send message to user: {err}");
connections.write().await.remove_user(user_public_key);
connections
.write()
.await
.remove_user(user_public_key, "failed to send message");

// Return an error
return Err(Error::Connection(
Expand All @@ -37,7 +40,10 @@ impl<Def: RunDef> Inner<Def> {
};
} else {
// Remove the user if they are not connected
self.connections.write().await.remove_user(user_public_key);
self.connections
.write()
.await
.remove_user(user_public_key, "not connected");

// Return an error
return Err(Error::Connection(
Expand Down
8 changes: 4 additions & 4 deletions cdn-broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<Def: RunDef> Inner<Def> {
.connections
.write()
.await
.remove_broker(&broker_identifier_);
.remove_broker(&broker_identifier_, "failed to receive message");
};
})
.abort_handle();
Expand All @@ -84,6 +84,9 @@ impl<Def: RunDef> Inner<Def> {
receive_handle,
);

// Notify the broker receive loop that we are initialized
notify_initialized.notify_one();

// Send a full user sync
if let Err(err) = self.full_user_sync(&broker_identifier).await {
error!("failed to perform full user sync: {err}");
Expand All @@ -107,9 +110,6 @@ impl<Def: RunDef> Inner<Def> {
}
}

// Notify the broker receive loop that we are initialized
notify_initialized.notify_one();

// Once we have added the broker, drop the authentication guard
drop(auth_guard);
}
Expand Down
17 changes: 16 additions & 1 deletion cdn-broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use cdn_proto::discovery::DiscoveryClient;
use cdn_proto::error::{Error, Result};
use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message, mnemonic};
use tokio::spawn;
use tokio::sync::Notify;
use tokio::time::timeout;
use tracing::{error, warn};

Expand Down Expand Up @@ -38,19 +39,30 @@ impl<Def: RunDef> Inner<Def> {
let public_key = UserPublicKey::from(public_key);
let user_identifier = mnemonic(&public_key);

// Create a notifier for the user receive loop to wait for
let notify_initialized = Arc::new(Notify::new());
let wait_initialized = notify_initialized.clone();

// Clone the necessary data for the broker receive loop
let self_ = self.clone();
let public_key_ = public_key.clone();
let connection_ = connection.clone();

// Spawn the user receive loop
let receive_handle = spawn(async move {
// Wait for the handler to have finished initialization
wait_initialized.notified().await;

// If we error, come back to the callback so we can remove the connection from the list.
if let Err(err) = self_.user_receive_loop(&public_key_, connection_).await {
warn!(id = user_identifier, error = err.to_string(), "user error");

// Remove the user from the map
self_.connections.write().await.remove_user(public_key_);
self_
.connections
.write()
.await
.remove_user(public_key_, "failed to receive message");
};
})
.abort_handle();
Expand All @@ -61,6 +73,9 @@ impl<Def: RunDef> Inner<Def> {
.await
.add_user(&public_key, connection, &topics, receive_handle);

// Notify the user receive loop that we are initialized
notify_initialized.notify_one();

// If we have `strong-consistency` enabled,
#[cfg(feature = "strong-consistency")]
{
Expand Down

0 comments on commit 31a2f7f

Please sign in to comment.