diff --git a/http/src/main/kotlin/kafkasnoop/KafkaClientFactory.kt b/http/src/main/kotlin/kafkasnoop/KafkaClientFactory.kt index 8524378..ab1f6a0 100644 --- a/http/src/main/kotlin/kafkasnoop/KafkaClientFactory.kt +++ b/http/src/main/kotlin/kafkasnoop/KafkaClientFactory.kt @@ -3,13 +3,8 @@ package kafkasnoop import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.Properties -class KafkaClientFactory(props: Properties) : AutoCloseable { - private val consumer by lazy { - KafkaConsumer(props) - } - - fun getOrCreateConsumer(): KafkaConsumer = consumer - override fun close() { - consumer.close() +class KafkaClientFactory(private val props: Properties) { + fun createConsumer(): KafkaConsumer { + return KafkaConsumer(props) } } diff --git a/http/src/main/kotlin/kafkasnoop/StartSnoop.kt b/http/src/main/kotlin/kafkasnoop/StartSnoop.kt index 27d9eff..b5b8be3 100644 --- a/http/src/main/kotlin/kafkasnoop/StartSnoop.kt +++ b/http/src/main/kotlin/kafkasnoop/StartSnoop.kt @@ -26,7 +26,6 @@ class StartSnoop : CliktCommand() { val kafkaClientOptions = ( mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to brokerAddress, - ConsumerConfig.GROUP_ID_CONFIG to "kafkasnoop", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.name, @@ -39,7 +38,7 @@ class StartSnoop : CliktCommand() { logger.info("Starting with $kafkaClientOptions") - KafkaClientFactory(kafkaClientOptions).use { + KafkaClientFactory(kafkaClientOptions).let { Server(it).start(port) } } diff --git a/http/src/main/kotlin/kafkasnoop/dto/Message.kt b/http/src/main/kotlin/kafkasnoop/dto/Message.kt index c4562af..0706109 100644 --- a/http/src/main/kotlin/kafkasnoop/dto/Message.kt +++ b/http/src/main/kotlin/kafkasnoop/dto/Message.kt @@ -1,8 +1,6 @@ package kafkasnoop.dto -data class Message(val key: String, val value: String) - -data class WebSocketMessage(val partition: String, val key: String, val value: String) { +data class Message(val partition: String, val key: String, val value: String) { override fun toString(): String { return "$partition|$key|$value" } diff --git a/http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt b/http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt index 722dea2..70f4eea 100644 --- a/http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt +++ b/http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt @@ -1,44 +1,50 @@ package kafkasnoop.routes -import kafkasnoop.KafkaClientFactory import kafkasnoop.dto.Message +import kotlinx.coroutines.yield +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition import java.time.Duration class MessageProcessor( - private val kafkaClientFactory: KafkaClientFactory, + private val kafkaConsumer: KafkaConsumer, private val topicName: String, // todo: take offset from query string & filter partitions private val startOffset: Long = 0L, ) { - val partitions: List + private val partitions: List init { logger.debug("Getting messages for $topicName") - kafkaClientFactory.getOrCreateConsumer().let { - partitions = it.partitionsFor(topicName).map { - TopicPartition(it.topic(), it.partition()) - } - it.assign(partitions) - val beggingOffsets = it.beginningOffsets(partitions) + partitions = kafkaConsumer.partitionsFor(topicName).map { + TopicPartition(it.topic(), it.partition()) + } + kafkaConsumer.assign(partitions) + val beggingOffsets = kafkaConsumer.beginningOffsets(partitions) - partitions.forEach { p -> - val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset) - logger.info("Begging offset for partition $p is $pOffset") - it.seek(p, pOffset) - } + partitions.forEach { p -> + val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset) + logger.info("Begging offset for partition $p is $pOffset") + kafkaConsumer.seek(p, pOffset) } } - fun getMessages(partition: TopicPartition): List { - val polled = kafkaClientFactory.getOrCreateConsumer() - .poll(Duration.ofMillis(100)).records(partition) - - return polled.map { - // TODO: deserialising - val key = String(it.key(), Charsets.UTF_8) - val value = String(it.value(), Charsets.UTF_8) - Message(key, value) + fun startProcess(maxMsgCount: Int = Int.MAX_VALUE) = + sequence { + var msgCount = 0 + while (msgCount < maxMsgCount) { + partitions.forEach { partition -> + kafkaConsumer + .poll(Duration.ofMillis(100)).records(partition) + .forEach { record -> + val key = String(record.key(), Charsets.UTF_8) + val value = String(record.value(), Charsets.UTF_8) + val msg = Message(partition.toString(), key, value) + yield(msg) + msgCount += 1 + } + } + } + logger.debug("stopping to process") } - } } diff --git a/http/src/main/kotlin/kafkasnoop/routes/messages.kt b/http/src/main/kotlin/kafkasnoop/routes/messages.kt index 5131db5..9d2659b 100644 --- a/http/src/main/kotlin/kafkasnoop/routes/messages.kt +++ b/http/src/main/kotlin/kafkasnoop/routes/messages.kt @@ -6,8 +6,6 @@ import io.ktor.response.* import io.ktor.routing.* import io.ktor.websocket.* import kafkasnoop.KafkaClientFactory -import kafkasnoop.dto.Message -import kafkasnoop.dto.WebSocketMessage fun Route.messages(kafkaClientFactory: KafkaClientFactory) { @@ -18,16 +16,13 @@ fun Route.messages(kafkaClientFactory: KafkaClientFactory) { // todo: take partition, limit and offset from query string val offset = 0L - val msgProcessor = MessageProcessor(kafkaClientFactory, topicName, offset) - - while (true) { - msgProcessor.partitions.forEach { p -> - msgProcessor.getMessages(p).forEach { m -> + kafkaClientFactory.createConsumer().use { consumer -> + MessageProcessor(consumer, topicName, offset) + .startProcess().forEach { send( - Frame.Text(WebSocketMessage(p.toString(), m.key, m.value).toString()) + Frame.Text(it.toString()) ) } - } } } } @@ -40,21 +35,10 @@ fun Route.messages(kafkaClientFactory: KafkaClientFactory) { val maxMsg = 100 val offset = 0L - val msgProcessor = MessageProcessor(kafkaClientFactory, topicName, offset) - - kafkaClientFactory.getOrCreateConsumer().let { consumer -> - val endOffsets = consumer.endOffsets(msgProcessor.partitions) - - val response = msgProcessor.partitions.map { p -> - val messages = mutableListOf() - val latestOffset = endOffsets[p]?.minus(1) ?: 0 - while (messages.count() < maxMsg && consumer.position(p) < latestOffset) { - messages.addAll(msgProcessor.getMessages(p)) - } - - p to messages - } - respond(response.toMap()) + kafkaClientFactory.createConsumer().use { consumer -> + val msgs = MessageProcessor(consumer, topicName, offset) + .startProcess(maxMsg).toList() + respond(msgs) } } } diff --git a/http/src/main/kotlin/kafkasnoop/routes/topics.kt b/http/src/main/kotlin/kafkasnoop/routes/topics.kt index f408eb0..fc3eca0 100644 --- a/http/src/main/kotlin/kafkasnoop/routes/topics.kt +++ b/http/src/main/kotlin/kafkasnoop/routes/topics.kt @@ -12,7 +12,7 @@ fun Route.topics(kafkaClientFactory: KafkaClientFactory) { call.run { respond( kafkaClientFactory - .getOrCreateConsumer().let { + .createConsumer().use { it.listTopics() .map { t -> Topic(