Skip to content

Commit

Permalink
Improve message processor.
Browse files Browse the repository at this point in the history
  • Loading branch information
driessamyn committed Feb 5, 2022
1 parent 9a7bd45 commit e6e03ba
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 62 deletions.
11 changes: 3 additions & 8 deletions http/src/main/kotlin/kafkasnoop/KafkaClientFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,8 @@ package kafkasnoop
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties

class KafkaClientFactory(props: Properties) : AutoCloseable {
private val consumer by lazy {
KafkaConsumer<ByteArray, ByteArray>(props)
}

fun getOrCreateConsumer(): KafkaConsumer<ByteArray, ByteArray> = consumer
override fun close() {
consumer.close()
class KafkaClientFactory(private val props: Properties) {
fun createConsumer(): KafkaConsumer<ByteArray, ByteArray> {
return KafkaConsumer<ByteArray, ByteArray>(props)
}
}
3 changes: 1 addition & 2 deletions http/src/main/kotlin/kafkasnoop/StartSnoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class StartSnoop : CliktCommand() {
val kafkaClientOptions = (
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to brokerAddress,
ConsumerConfig.GROUP_ID_CONFIG to "kafkasnoop",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.name,
Expand All @@ -39,7 +38,7 @@ class StartSnoop : CliktCommand() {

logger.info("Starting with $kafkaClientOptions")

KafkaClientFactory(kafkaClientOptions).use {
KafkaClientFactory(kafkaClientOptions).let {
Server(it).start(port)
}
}
Expand Down
4 changes: 1 addition & 3 deletions http/src/main/kotlin/kafkasnoop/dto/Message.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kafkasnoop.dto

data class Message(val key: String, val value: String)

data class WebSocketMessage(val partition: String, val key: String, val value: String) {
data class Message(val partition: String, val key: String, val value: String) {
override fun toString(): String {
return "$partition|$key|$value"
}
Expand Down
54 changes: 30 additions & 24 deletions http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt
Original file line number Diff line number Diff line change
@@ -1,44 +1,50 @@
package kafkasnoop.routes

import kafkasnoop.KafkaClientFactory
import kafkasnoop.dto.Message
import kotlinx.coroutines.yield
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import java.time.Duration

class MessageProcessor(
private val kafkaClientFactory: KafkaClientFactory,
private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
private val topicName: String,
// todo: take offset from query string & filter partitions
private val startOffset: Long = 0L,
) {
val partitions: List<TopicPartition>
private val partitions: List<TopicPartition>
init {
logger.debug("Getting messages for $topicName")

kafkaClientFactory.getOrCreateConsumer().let {
partitions = it.partitionsFor(topicName).map {
TopicPartition(it.topic(), it.partition())
}
it.assign(partitions)
val beggingOffsets = it.beginningOffsets(partitions)
partitions = kafkaConsumer.partitionsFor(topicName).map {
TopicPartition(it.topic(), it.partition())
}
kafkaConsumer.assign(partitions)
val beggingOffsets = kafkaConsumer.beginningOffsets(partitions)

partitions.forEach { p ->
val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset)
logger.info("Begging offset for partition $p is $pOffset")
it.seek(p, pOffset)
}
partitions.forEach { p ->
val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset)
logger.info("Begging offset for partition $p is $pOffset")
kafkaConsumer.seek(p, pOffset)
}
}

fun getMessages(partition: TopicPartition): List<Message> {
val polled = kafkaClientFactory.getOrCreateConsumer()
.poll(Duration.ofMillis(100)).records(partition)

return polled.map {
// TODO: deserialising
val key = String(it.key(), Charsets.UTF_8)
val value = String(it.value(), Charsets.UTF_8)
Message(key, value)
fun startProcess(maxMsgCount: Int = Int.MAX_VALUE) =
sequence {
var msgCount = 0
while (msgCount < maxMsgCount) {
partitions.forEach { partition ->
kafkaConsumer
.poll(Duration.ofMillis(100)).records(partition)
.forEach { record ->
val key = String(record.key(), Charsets.UTF_8)
val value = String(record.value(), Charsets.UTF_8)
val msg = Message(partition.toString(), key, value)
yield(msg)
msgCount += 1
}
}
}
logger.debug("stopping to process")
}
}
}
32 changes: 8 additions & 24 deletions http/src/main/kotlin/kafkasnoop/routes/messages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import io.ktor.response.*
import io.ktor.routing.*
import io.ktor.websocket.*
import kafkasnoop.KafkaClientFactory
import kafkasnoop.dto.Message
import kafkasnoop.dto.WebSocketMessage

fun Route.messages(kafkaClientFactory: KafkaClientFactory) {

Expand All @@ -18,16 +16,13 @@ fun Route.messages(kafkaClientFactory: KafkaClientFactory) {
// todo: take partition, limit and offset from query string
val offset = 0L

val msgProcessor = MessageProcessor(kafkaClientFactory, topicName, offset)

while (true) {
msgProcessor.partitions.forEach { p ->
msgProcessor.getMessages(p).forEach { m ->
kafkaClientFactory.createConsumer().use { consumer ->
MessageProcessor(consumer, topicName, offset)
.startProcess().forEach {
send(
Frame.Text(WebSocketMessage(p.toString(), m.key, m.value).toString())
Frame.Text(it.toString())
)
}
}
}
}
}
Expand All @@ -40,21 +35,10 @@ fun Route.messages(kafkaClientFactory: KafkaClientFactory) {
val maxMsg = 100
val offset = 0L

val msgProcessor = MessageProcessor(kafkaClientFactory, topicName, offset)

kafkaClientFactory.getOrCreateConsumer().let { consumer ->
val endOffsets = consumer.endOffsets(msgProcessor.partitions)

val response = msgProcessor.partitions.map { p ->
val messages = mutableListOf<Message>()
val latestOffset = endOffsets[p]?.minus(1) ?: 0
while (messages.count() < maxMsg && consumer.position(p) < latestOffset) {
messages.addAll(msgProcessor.getMessages(p))
}

p to messages
}
respond(response.toMap())
kafkaClientFactory.createConsumer().use { consumer ->
val msgs = MessageProcessor(consumer, topicName, offset)
.startProcess(maxMsg).toList()
respond(msgs)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion http/src/main/kotlin/kafkasnoop/routes/topics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fun Route.topics(kafkaClientFactory: KafkaClientFactory) {
call.run {
respond(
kafkaClientFactory
.getOrCreateConsumer().let {
.createConsumer().use {
it.listTopics()
.map { t ->
Topic(
Expand Down

0 comments on commit e6e03ba

Please sign in to comment.