Skip to content

Commit

Permalink
Run dispatcher in a separate task (#25)
Browse files Browse the repository at this point in the history
* working on moving dispatcher on its own task

* working in progress - dospatcher task

* work in progress - dispa task 3

* create a buffer for dispatcher, and run in a separate task to improve performance

* small refactor of the create new consumer function

* implement dispatcher using dispatcher commands

* remove a unitest from danube-client
  • Loading branch information
danrusei authored Nov 30, 2024
1 parent 60e5ebc commit 1895011
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 334 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Cargo.lock

etcd-data/
temp/
.vscode/
86 changes: 27 additions & 59 deletions danube-broker/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use metrics::counter;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tracing::{info, trace, warn};
use tracing::{trace, warn};

use crate::{
broker_metrics::{CONSUMER_BYTES_OUT_COUNTER, CONSUMER_MSG_OUT_COUNTER},
Expand All @@ -11,16 +11,15 @@ use crate::{

/// Represents a consumer connected and associated with a Subscription.
#[allow(dead_code)]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct Consumer {
consumer_id: u64,
consumer_name: String,
subscription_type: i32,
topic_name: String,
rx_broker: mpsc::Receiver<MessageToSend>,
tx_cons: mpsc::Sender<MessageToSend>,
pub(crate) consumer_id: u64,
pub(crate) consumer_name: String,
pub(crate) subscription_type: i32,
pub(crate) topic_name: String,
pub(crate) tx_cons: mpsc::Sender<MessageToSend>,
// status = true -> consumer OK, status = false -> Close the consumer
status: Arc<Mutex<bool>>,
pub(crate) status: Arc<Mutex<bool>>,
}

#[derive(Debug, Clone)]
Expand All @@ -35,7 +34,6 @@ impl Consumer {
consumer_name: &str,
subscription_type: i32,
topic_name: &str,
rx_broker: mpsc::Receiver<MessageToSend>,
tx_cons: mpsc::Sender<MessageToSend>,
status: Arc<Mutex<bool>>,
) -> Self {
Expand All @@ -44,65 +42,35 @@ impl Consumer {
consumer_name: consumer_name.into(),
subscription_type,
topic_name: topic_name.into(),
rx_broker,
tx_cons,
status,
}
}

// The consumer task runs asynchronously, handling message delivery to the gRPC `ReceiverStream`.
pub(crate) async fn run(&mut self) {
while let Some(messages) = self.rx_broker.recv().await {
// Since u8 is exactly 1 byte, the size in bytes will be equal to the number of elements in the vector.
let payload_size = messages.payload.len();
// Send the message to the other channel
if let Err(err) = self.tx_cons.send(messages).await {
// Log the error and handle the channel closure scenario
warn!(
"Failed to send message to consumer with id: {}. Error: {:?}",
self.consumer_id, err
);
pub(crate) async fn send_message(&mut self, message: MessageToSend) -> Result<()> {
// Since u8 is exactly 1 byte, the size in bytes will be equal to the number of elements in the vector.
let payload_size = message.payload.len();
// Send the message to the other channel
if let Err(err) = self.tx_cons.send(message).await {
// Log the error and handle the channel closure scenario
warn!(
"Failed to send message to consumer with id: {}. Error: {:?}",
self.consumer_id, err
);

*self.status.lock().await = false
} else {
trace!("Sending the message over channel to {}", self.consumer_id);
counter!(CONSUMER_MSG_OUT_COUNTER.name, "topic"=> self.topic_name.clone() , "consumer" => self.consumer_id.to_string()).increment(1);
counter!(CONSUMER_BYTES_OUT_COUNTER.name, "topic"=> self.topic_name.clone() , "consumer" => self.consumer_id.to_string()).increment(payload_size as u64);
}
*self.status.lock().await = false
} else {
trace!("Sending the message over channel to {}", self.consumer_id);
counter!(CONSUMER_MSG_OUT_COUNTER.name, "topic"=> self.topic_name.clone() , "consumer" => self.consumer_id.to_string()).increment(1);
counter!(CONSUMER_BYTES_OUT_COUNTER.name, "topic"=> self.topic_name.clone() , "consumer" => self.consumer_id.to_string()).increment(payload_size as u64);
}
info!("Consumer task ended for consumer_id: {}", self.consumer_id);
}

// pub(crate) async fn get_status(&self) -> bool {
// *self.status.lock().await
// }

// Close the consumer if: a. the connection is dropped
// b. all messages were delivered and there are no pending message acks, graceful close connection
#[allow(dead_code)]
pub(crate) fn close(&self) -> Result<()> {
// subscription.remove_consumer(self)
todo!()
// info!("Consumer task ended for consumer_id: {}", self.consumer_id);
Ok(())
}

// Unsubscribe consumer from the Subscription
#[allow(dead_code)]
pub(crate) fn unsubscribe(&self) -> Result<()> {
// subscription.unsubscribe(self)
todo!()
pub(crate) async fn get_status(&self) -> bool {
*self.status.lock().await
}

// acked message from client
#[allow(dead_code)]
pub(crate) fn message_acked(&self) -> Result<()> {
todo!()
}

// closes the consumer from server-side and inform the client through health_check mechanism
// to disconnect consumer
// pub(crate) async fn disconnect(&mut self) -> u64 {
// gauge!(TOPIC_CONSUMERS.name, "topic" => self.topic_name.to_string()).decrement(1);
// *self.status.lock().await = false;
// self.consumer_id
// }
}
34 changes: 21 additions & 13 deletions danube-broker/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;

use crate::{consumer::MessageToSend, subscription::ConsumerInfo};
use crate::consumer::{Consumer, MessageToSend};

pub(crate) mod dispatcher_multiple_consumers;
pub(crate) mod dispatcher_single_consumer;
Expand All @@ -14,23 +14,32 @@ pub(crate) enum Dispatcher {
MultipleConsumers(DispatcherMultipleConsumers),
}

// Control messages for the dispatcher
enum DispatcherCommand {
AddConsumer(Consumer),
RemoveConsumer(u64),
DisconnectAllConsumers,
DispatchMessage(MessageToSend),
}

impl Dispatcher {
pub(crate) async fn send_messages(&self, messages: MessageToSend) -> Result<()> {
pub(crate) async fn dispatch_message(&self, message: MessageToSend) -> Result<()> {
match self {
Dispatcher::OneConsumer(dispatcher) => Ok(dispatcher.send_messages(messages).await?),
Dispatcher::OneConsumer(dispatcher) => Ok(dispatcher.dispatch_message(message).await?),
Dispatcher::MultipleConsumers(dispatcher) => {
Ok(dispatcher.send_messages(messages).await?)
Ok(dispatcher.dispatch_message(message).await?)
}
}
}
pub(crate) async fn add_consumer(&mut self, consumer: ConsumerInfo) -> Result<()> {
pub(crate) async fn add_consumer(&mut self, consumer: Consumer) -> Result<()> {
match self {
Dispatcher::OneConsumer(dispatcher) => Ok(dispatcher.add_consumer(consumer).await?),
Dispatcher::MultipleConsumers(dispatcher) => {
Ok(dispatcher.add_consumer(consumer).await?)
}
}
}
#[allow(dead_code)]
pub(crate) async fn remove_consumer(&mut self, consumer_id: u64) -> Result<()> {
match self {
Dispatcher::OneConsumer(dispatcher) => {
Expand All @@ -42,15 +51,14 @@ impl Dispatcher {
}
}

pub(crate) fn get_consumers(&self) -> &Vec<ConsumerInfo> {
pub(crate) async fn disconnect_all_consumers(&mut self) -> Result<()> {
match self {
Dispatcher::OneConsumer(dispatcher) => dispatcher.get_consumers(),
Dispatcher::MultipleConsumers(dispatcher) => dispatcher.get_consumers(),
Dispatcher::OneConsumer(dispatcher) => {
Ok(dispatcher.disconnect_all_consumers().await?)
}
Dispatcher::MultipleConsumers(dispatcher) => {
Ok(dispatcher.disconnect_all_consumers().await?)
}
}
}
// pub(crate) fn additional_method(&self) {
// if let Dispatcher::OneConsumer(dispatcher) = self {
// dispatcher.additional_method_single();
// }
// }
}
172 changes: 94 additions & 78 deletions danube-broker/src/dispatcher/dispatcher_multiple_consumers.rs
Original file line number Diff line number Diff line change
@@ -1,105 +1,121 @@
use anyhow::{anyhow, Result};
use std::sync::atomic::AtomicUsize;
use tracing::trace;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tracing::{trace, warn};

use crate::{consumer::MessageToSend, subscription::ConsumerInfo};
use crate::{
consumer::{Consumer, MessageToSend},
dispatcher::DispatcherCommand,
};

#[derive(Debug)]
pub(crate) struct DispatcherMultipleConsumers {
consumers: Vec<ConsumerInfo>,
index_consumer: AtomicUsize,
control_tx: mpsc::Sender<DispatcherCommand>,
}

impl DispatcherMultipleConsumers {
pub(crate) fn new() -> Self {
DispatcherMultipleConsumers {
consumers: Vec::new(),
index_consumer: AtomicUsize::new(0),
}
}

// manage the addition of consumers to the dispatcher
pub(crate) async fn add_consumer(&mut self, consumer: ConsumerInfo) -> Result<()> {
// checks if adding a new consumer would exceed the maximum allowed consumers for the subscription
self.consumers.push(consumer);

Ok(())
}

// manage the removal of consumers from the dispatcher
pub(crate) async fn remove_consumer(&mut self, consumer_id: u64) -> Result<()> {
// Find the position asynchronously
let pos = {
let mut pos = None;
for (index, x) in self.consumers.iter().enumerate() {
if x.consumer_id == consumer_id {
pos = Some(index);
break;
let (control_tx, mut control_rx) = mpsc::channel(16);

// Spawn the dispatcher task
tokio::spawn(async move {
let mut consumers: Vec<Consumer> = Vec::new();
let mut index_consumer = AtomicUsize::new(0);

loop {
if let Some(command) = control_rx.recv().await {
match command {
DispatcherCommand::AddConsumer(consumer) => {
consumers.push(consumer);
trace!("Consumer added. Total consumers: {}", consumers.len());
}
DispatcherCommand::RemoveConsumer(consumer_id) => {
consumers.retain(|c| c.consumer_id != consumer_id);
trace!("Consumer removed. Total consumers: {}", consumers.len());
}
DispatcherCommand::DisconnectAllConsumers => {
consumers.clear();
trace!("All consumers disconnected.");
}
DispatcherCommand::DispatchMessage(message) => {
if let Err(error) = Self::handle_dispatch_message(
&mut consumers,
&mut index_consumer,
message,
)
.await
{
warn!("Failed to dispatch message: {}", error);
}
}
}
}
}
pos
};

// If a position was found, remove the consumer at that position
if let Some(pos) = pos {
self.consumers.remove(pos);
}
});

Ok(())
DispatcherMultipleConsumers { control_tx }
}

pub(crate) fn get_consumers(&self) -> &Vec<ConsumerInfo> {
&self.consumers
/// Dispatch a message to the active consumer
pub(crate) async fn dispatch_message(&self, message: MessageToSend) -> Result<()> {
self.control_tx
.send(DispatcherCommand::DispatchMessage(message))
.await
.map_err(|err| anyhow!("Failed to dispatch the message {}", err))
}

pub(crate) async fn send_messages(&self, messages: MessageToSend) -> Result<()> {
// Attempt to get an active consumer and send messages
if let Ok(consumer) = self.find_next_active_consumer().await {
//let batch_size = 1; // to be calculated
consumer.tx_broker.send(messages).await?;
trace!(
"Dispatcher is sending the message to consumer: {}",
consumer.consumer_id
);
Ok(())
} else {
Err(anyhow!("There are no active consumers on this dispatcher"))
}
/// Add a new consumer to the dispatcher
pub(crate) async fn add_consumer(&self, consumer: Consumer) -> Result<()> {
self.control_tx
.send(DispatcherCommand::AddConsumer(consumer))
.await
.map_err(|_| anyhow!("Failed to send add consumer command"))
}

async fn find_next_active_consumer(&self) -> Result<ConsumerInfo> {
let num_consumers = self.consumers.len();

for _ in 0..num_consumers {
let consumer = self.get_next_consumer()?;

if !consumer.get_status().await {
continue;
}

return Ok(consumer);
}

return Err(anyhow!("unable to find an active consumer"));
/// Remove a consumer by its ID
#[allow(dead_code)]
pub(crate) async fn remove_consumer(&self, consumer_id: u64) -> Result<()> {
self.control_tx
.send(DispatcherCommand::RemoveConsumer(consumer_id))
.await
.map_err(|_| anyhow!("Failed to send remove consumer command"))
}

pub(crate) fn get_next_consumer(&self) -> Result<ConsumerInfo> {
let num_consumers = self.consumers.len();
/// Disconnect all consumers
pub(crate) async fn disconnect_all_consumers(&self) -> Result<()> {
self.control_tx
.send(DispatcherCommand::DisconnectAllConsumers)
.await
.map_err(|_| anyhow!("Failed to send disconnect all consumers command"))
}

/// Dispatch message helper method
async fn handle_dispatch_message(
consumers: &mut [Consumer],
index_consumer: &mut AtomicUsize,
message: MessageToSend,
) -> Result<()> {
let num_consumers = consumers.len();
if num_consumers == 0 {
return Err(anyhow!("There are no consumers left"));
return Err(anyhow!("No consumers available to dispatch the message"));
}

// Use modulo to ensure index wraps around
let index = self
.index_consumer
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
% num_consumers;
for _ in 0..num_consumers {
let index = index_consumer.fetch_add(1, Ordering::SeqCst) % num_consumers;
let consumer = &mut consumers[index];

if consumer.get_status().await {
consumer.send_message(message).await?;
trace!(
"Dispatcher sent the message to consumer: {}",
consumer.consumer_id
);
return Ok(());
}
}

// Get the consumer at the computed index
self.consumers
.get(index)
.cloned() // Clone the Arc<Mutex<Consumer>> to return
.ok_or_else(|| anyhow!("Unable to find the next consumer"))
Err(anyhow!(
"No active consumers available to handle the message"
))
}
}
Loading

0 comments on commit 1895011

Please sign in to comment.