Skip to content

Commit

Permalink
Use dedicated runtime for KafkaSinkConnector to ensure true parallelism.
Browse files Browse the repository at this point in the history
Fix thread parallelism issue where tasks were running on main thread due to
actix-tokio-runtime being single-threaded.
  • Loading branch information
hippalus committed Jan 2, 2025
1 parent aade3a8 commit 9ba40b5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
12 changes: 12 additions & 0 deletions src/connectors/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use thiserror::Error;
use tokio::runtime;
use tokio::runtime::Builder;

pub mod config;
pub mod processor;
Expand Down Expand Up @@ -74,3 +76,13 @@ impl ConnectorError {
)
}
}

pub fn build_runtime(worker_threads: usize, thread_name: &str) -> anyhow::Result<runtime::Runtime> {
Builder::new_multi_thread()
.enable_all()
.thread_name(thread_name)
.worker_threads(worker_threads)
.max_blocking_threads(worker_threads)
.build()
.map_err(|e| anyhow::anyhow!(e))
}
12 changes: 12 additions & 0 deletions src/connectors/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ pub struct KafkaConfig {
)]
pub client_id: String,

#[arg(
long = "partition-listener-concurrency",
env = "P_KAFKA_PARTITION_LISTENER_CONCURRENCY",
value_name = "concurrency",
required = false,
default_value_t = 1,
help = "Number of parallel threads for Kafka partition listeners. Each partition gets processed on a dedicated thread."
)]
pub partition_listener_concurrency: usize,

#[command(flatten)]
#[group(id = "consumer", required = false)]
pub consumer: Option<ConsumerConfig>,
Expand Down Expand Up @@ -846,6 +856,8 @@ impl Default for KafkaConfig {
// Common configuration with standard broker port
bootstrap_servers: "localhost:9092".to_string(),
client_id: "parseable-connect".to_string(),
// Single threaded listener for all assigned partitions
partition_listener_concurrency: 1,
// Component-specific configurations with production-ready defaults
consumer: Some(ConsumerConfig::default()),
producer: Some(ProducerConfig::default()),
Expand Down
42 changes: 27 additions & 15 deletions src/connectors/kafka/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,65 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use crate::connectors::common::build_runtime;
use crate::connectors::common::processor::Processor;
use crate::connectors::kafka::consumer::KafkaStreams;
use crate::connectors::kafka::processor::StreamWorker;
use crate::connectors::kafka::ConsumerRecord;
use anyhow::Result;
use futures_util::StreamExt;
use rdkafka::consumer::Consumer;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tracing::{error, info};

pub struct KafkaSinkConnector<P>
where
P: Processor<Vec<ConsumerRecord>, ()>,
{
kafka_streams: KafkaStreams,
worker: Arc<StreamWorker<P>>,
streams: KafkaStreams,
stream_processor: Arc<StreamWorker<P>>,
runtime: Runtime,
}

impl<P> KafkaSinkConnector<P>
where
P: Processor<Vec<ConsumerRecord>, ()> + Send + Sync + 'static,
{
pub fn new(kafka_streams: KafkaStreams, processor: P) -> Self {
let worker = Arc::new(StreamWorker::new(
let consumer = kafka_streams.consumer();
let stream_processor = Arc::new(StreamWorker::new(
Arc::new(processor),
kafka_streams.consumer(),
Arc::clone(&consumer),
));

let runtime = build_runtime(
consumer.context().config.partition_listener_concurrency,
"kafka-sink-worker",
)
.expect("Failed to build runtime");
let _ = runtime.enter();

Self {
kafka_streams,
worker,
streams: kafka_streams,
stream_processor,
runtime,
}
}

pub async fn run(self) -> Result<()> {
self.kafka_streams
self.streams
.partitioned()
.map(|partition_queue| {
let worker = Arc::clone(&self.worker);
let tp = partition_queue.topic_partition().clone();
tokio::spawn(async move {
partition_queue
.run_drain(|record_stream| async {
.map(|partition_stream| {
let worker = Arc::clone(&self.stream_processor);
let tp = partition_stream.topic_partition().clone();
self.runtime.spawn(async move {
partition_stream
.run_drain(|partition_records| async {
info!("Starting task for partition: {:?}", tp);

worker
.process_partition(tp.clone(), record_stream)
.process_partition(tp.clone(), partition_records)
.await
.unwrap();
})
Expand Down

0 comments on commit 9ba40b5

Please sign in to comment.