diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index 7f8f93d..b9979a5 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -484,34 +484,30 @@ impl ConsumerEngine { } }; - match payload.metadata.num_messages_in_batch { - Some(_) => { - let it = BatchedMessageIterator::new(message.message_id, payload)?; - for (id, payload) in it { - // TODO: Dead letter policy for batched messages - self.send_to_consumer(id, payload).await?; - } - } - None => match (message.redelivery_count, self.dead_letter_policy.as_ref()) { - (Some(redelivery_count), Some(dead_letter_policy)) => { + let payloads = if payload.metadata.num_messages_in_batch.is_some() { + BatchedMessageIterator::new(message.message_id, payload)?.collect() + } else { + vec![(message.message_id, payload)] + }; + for (message_id, payload) in payloads { + match (message.redelivery_count, &self.dead_letter_policy) { + (Some(redelivery_count), Some(dead_letter_policy)) + if redelivery_count as usize >= dead_letter_policy.max_redeliver_count => + { // Send message to Dead Letter Topic and ack message in original topic - if redelivery_count as usize >= dead_letter_policy.max_redeliver_count { - self.client - .send(&dead_letter_policy.dead_letter_topic, payload.data) - .await? - .await - .map_err(|e| { - error!("One shot cancelled {:?}", e); - Error::Custom("DLQ send error".to_string()) - })?; - - self.ack(message.message_id, false); - } else { - self.send_to_consumer(message.message_id, payload).await? - } + self.client + .send(&dead_letter_policy.dead_letter_topic, payload.data) + .await? + .await + .map_err(|e| { + error!("One shot cancelled {:?}", e); + Error::Custom("DLQ send error".to_string()) + })?; + + self.ack(message_id, false); } - _ => self.send_to_consumer(message.message_id, payload).await?, - }, + _ => self.send_to_consumer(message_id, payload).await?, + } } Ok(()) } diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 849071d..b850a8c 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -737,6 +737,101 @@ mod tests { dlq_consumer.ack(&dlq_msg).await.unwrap(); } + #[tokio::test] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] + async fn dead_letter_queue_batched() { + use crate::ProducerOptions; + + let _result = log::set_logger(&TEST_LOGGER); + log::set_max_level(LevelFilter::Debug); + let addr = "pulsar://127.0.0.1:6650"; + + let test_id: u16 = rand::random(); + let topic = format!("dead_letter_queue_batched_test_{test_id}"); + + let dead_letter_topic = format!("{topic}_dlq"); + + let dead_letter_policy = DeadLetterPolicy { + max_redeliver_count: 1, + dead_letter_topic: dead_letter_topic.clone(), + }; + + let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap(); + + println!("creating consumer"); + let mut consumer: Consumer = client + .consumer() + .with_topic(topic.clone()) + .with_subscription("nack") + .with_subscription_type(SubType::Shared) + .with_dead_letter_policy(dead_letter_policy) + .build() + .await + .unwrap(); + + println!("created consumer"); + + println!("creating second consumer that consumes from the DLQ"); + let mut dlq_consumer: Consumer = client + .clone() + .consumer() + .with_topic(dead_letter_topic) + .with_subscription("dead_letter_topic") + .with_subscription_type(SubType::Shared) + .build() + .await + .unwrap(); + + println!("created second consumer"); + + let mut producer = client + .producer() + .with_topic(&topic) + .with_options(ProducerOptions { + batch_size: Some(2), + ..Default::default() + }) + .build() + .await + .unwrap(); + + let messages = vec![ + TestData { + topic: topic.clone(), + msg: rand::random(), + }, + TestData { + topic: topic.clone(), + msg: rand::random(), + }, + ]; + let receipts = producer.send_all(&messages).await.unwrap(); + producer.send_batch().await.unwrap(); + try_join_all(receipts).await.unwrap(); + println!("producer sends done"); + + for message in messages { + let msg = consumer.next().await.unwrap().unwrap(); + println!("got message: {:?}", msg.payload); + assert_eq!( + message, + msg.deserialize().unwrap(), + "we probably received a message from a previous run of the test" + ); + // Nacking message to send it to DLQ + consumer.nack(&msg).await.unwrap(); + + let dlq_msg = dlq_consumer.next().await.unwrap().unwrap(); + println!("got message: {:?}", dlq_msg.payload); + assert_eq!( + message, + dlq_msg.deserialize().unwrap(), + "we probably received a message from a previous run of the test" + ); + dlq_consumer.ack(&dlq_msg).await.unwrap(); + } + } + #[tokio::test] #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn failover() {