-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from driessamyn/consumer-improvement
Consumer improvement
- Loading branch information
Showing
7 changed files
with
47 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,6 @@ jobs: | |
- name: Push Docker image | ||
uses: akhilerm/[email protected] | ||
with: | ||
with: | ||
src: driessamyn/kafkasnoop | ||
dst: | | ||
${{ steps.meta.outputs.tags }} | ||
src: driessamyn/kafkasnoop | ||
dst: | | ||
${{ steps.meta.outputs.tags }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 30 additions & 24 deletions
54
http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,50 @@ | ||
package kafkasnoop.routes | ||
|
||
import kafkasnoop.KafkaClientFactory | ||
import kafkasnoop.dto.Message | ||
import kotlinx.coroutines.yield | ||
import org.apache.kafka.clients.consumer.KafkaConsumer | ||
import org.apache.kafka.common.TopicPartition | ||
import java.time.Duration | ||
|
||
class MessageProcessor( | ||
private val kafkaClientFactory: KafkaClientFactory, | ||
private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, | ||
private val topicName: String, | ||
// todo: take offset from query string & filter partitions | ||
private val startOffset: Long = 0L, | ||
) { | ||
val partitions: List<TopicPartition> | ||
private val partitions: List<TopicPartition> | ||
init { | ||
logger.debug("Getting messages for $topicName") | ||
|
||
kafkaClientFactory.getOrCreateConsumer().let { | ||
partitions = it.partitionsFor(topicName).map { | ||
TopicPartition(it.topic(), it.partition()) | ||
} | ||
it.assign(partitions) | ||
val beggingOffsets = it.beginningOffsets(partitions) | ||
partitions = kafkaConsumer.partitionsFor(topicName).map { | ||
TopicPartition(it.topic(), it.partition()) | ||
} | ||
kafkaConsumer.assign(partitions) | ||
val beggingOffsets = kafkaConsumer.beginningOffsets(partitions) | ||
|
||
partitions.forEach { p -> | ||
val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset) | ||
logger.info("Begging offset for partition $p is $pOffset") | ||
it.seek(p, pOffset) | ||
} | ||
partitions.forEach { p -> | ||
val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset) | ||
logger.info("Begging offset for partition $p is $pOffset") | ||
kafkaConsumer.seek(p, pOffset) | ||
} | ||
} | ||
|
||
fun getMessages(partition: TopicPartition): List<Message> { | ||
val polled = kafkaClientFactory.getOrCreateConsumer() | ||
.poll(Duration.ofMillis(100)).records(partition) | ||
|
||
return polled.map { | ||
// TODO: deserialising | ||
val key = String(it.key(), Charsets.UTF_8) | ||
val value = String(it.value(), Charsets.UTF_8) | ||
Message(key, value) | ||
fun startProcess(maxMsgCount: Int = Int.MAX_VALUE) = | ||
sequence { | ||
var msgCount = 0 | ||
while (msgCount < maxMsgCount) { | ||
partitions.forEach { partition -> | ||
kafkaConsumer | ||
.poll(Duration.ofMillis(100)).records(partition) | ||
.forEach { record -> | ||
val key = String(record.key(), Charsets.UTF_8) | ||
val value = String(record.value(), Charsets.UTF_8) | ||
val msg = Message(partition.toString(), key, value) | ||
yield(msg) | ||
msgCount += 1 | ||
} | ||
} | ||
} | ||
logger.debug("stopping to process") | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters