Skip to content

Commit

Permalink
Merge pull request #4 from driessamyn/open-api
Browse files Browse the repository at this point in the history
Open api
  • Loading branch information
driessamyn authored Feb 12, 2022
2 parents 6a6cf79 + 4bbaffe commit 5179aa7
Show file tree
Hide file tree
Showing 16 changed files with 330 additions and 103 deletions.
1 change: 0 additions & 1 deletion buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ plugins {
}

repositories {
// Use the plugin portal to apply community plugins in convention plugins.
gradlePluginPortal()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.7.+")
}

version = "0.0.1"
version = "0.0.2"

tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
kotlinOptions {
Expand Down
1 change: 1 addition & 0 deletions examples/simple-json/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:19092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:19092
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
depends_on:
- zookeeper
healthcheck:
Expand Down
3 changes: 3 additions & 0 deletions examples/simple-json/publisher/publish_messages.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
echo 'Create topic'
kafka-topics.sh --bootstrap-server kafka:9092 --partitions 4 --replication-factor 1 \
--create --topic super-heros
echo 'Create Kafka Messages'
counter=0
while true
Expand Down
6 changes: 5 additions & 1 deletion http/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ plugins {
id("com.google.cloud.tools.jib")
}

repositories {
maven { setUrl("https://jitpack.io") }
}

dependencies {
implementation(project(":avro"))
implementation("com.github.papsign:Ktor-OpenAPI-Generator:0.3-beta.2")
}

jib.to.image = "driessamyn/kafkasnoop"
Expand Down
10 changes: 10 additions & 0 deletions http/src/main/kotlin/kafkasnoop/KafkaClientFactory.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package kafkasnoop

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.util.Properties
import java.util.UUID

class KafkaClientFactory(private val props: Properties) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaClientFactory::class.java)
}

fun createConsumer(): KafkaConsumer<ByteArray, ByteArray> {
val consumerId = "kafkasnoop-consumer-${UUID.randomUUID()}"
logger.info("Create consumer $consumerId")
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerId)
return KafkaConsumer<ByteArray, ByteArray>(props)
}
}
45 changes: 41 additions & 4 deletions http/src/main/kotlin/kafkasnoop/Server.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package kafkasnoop

import com.google.gson.JsonElement
import com.google.gson.JsonPrimitive
import com.google.gson.JsonSerializationContext
import com.google.gson.JsonSerializer
import com.papsign.ktor.openapigen.OpenAPIGen
import com.papsign.ktor.openapigen.route.apiRouting
import com.papsign.ktor.openapigen.route.route
import io.ktor.application.*
import io.ktor.features.*
import io.ktor.gson.*
Expand All @@ -8,30 +15,60 @@ import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.websocket.*
import kafkasnoop.routes.messages
import kafkasnoop.routes.messagesWs
import kafkasnoop.routes.openApi
import kafkasnoop.routes.topics
import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.slf4j.LoggerFactory
import java.lang.reflect.Type
import java.time.Instant
import java.time.ZoneId
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter

class Server(private val kafkaClientFactory: KafkaClientFactory) {
companion object {
private val logger = LoggerFactory.getLogger(StartSnoop::class.java)
}

@ExperimentalCoroutinesApi
fun start(port: Int) {
logger.info("Starting HTTP server")
embeddedServer(Netty, port = port) {
install(ContentNegotiation) {
gson {
setPrettyPrinting()
disableHtmlEscaping()
registerTypeAdapter(
Instant::class.java,
object : JsonSerializer<Instant> {
val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC))

override fun serialize(instant: Instant, typeOfSrc: Type, context: JsonSerializationContext):
JsonElement {
return JsonPrimitive(formatter.format(instant))
}
}
)
}
}
install(WebSockets)
install(OpenAPIGen) {
// basic info
info {
version = "0.0.2"
title = "KafkaSnoop API"
description = "HTTP API for Snooping on Kafka messages"
}
}

routing {
topics(kafkaClientFactory)
messages(kafkaClientFactory)
webSocket {
}
openApi()
messagesWs(kafkaClientFactory)
}
apiRouting {
route("/api").topics(kafkaClientFactory)
route("/api/{topic}").messages(kafkaClientFactory)
}
}.start(wait = true)
}
Expand Down
1 change: 0 additions & 1 deletion http/src/main/kotlin/kafkasnoop/StartSnoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class StartSnoop : CliktCommand() {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.name,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.name,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to "100",
ConsumerConfig.CLIENT_ID_CONFIG to "kafkasnoop-consumer",
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed"
) + kafkaCliOptions
).toProperties()
Expand Down
8 changes: 6 additions & 2 deletions http/src/main/kotlin/kafkasnoop/dto/Message.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package kafkasnoop.dto

data class Message(val partition: String, val key: String, val value: String) {
import com.papsign.ktor.openapigen.annotations.Response
import java.time.Instant

@Response("A Kafka Message")
data class Message(val offset: Long, val partition: String, val key: String, val value: String, val timestamp: Instant) {
override fun toString(): String {
return "$partition|$key|$value"
return "$offset|$partition|$key|$value|$timestamp"
}
}
13 changes: 12 additions & 1 deletion http/src/main/kotlin/kafkasnoop/dto/Topic.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
package kafkasnoop.dto

data class Partition(val index: Int, val inSyncReplicas: Int, val offlineReplicas: Int)
import com.papsign.ktor.openapigen.annotations.Response

@Response("Kafka Topic Partition")
data class Partition(
val index: Int,
val beginOffset: Long,
val endOffset: Long,
val inSyncReplicas: Int,
val offlineReplicas: Int,
)

@Response("Kafka Topic")
data class Topic(val name: String, val partitions: List<Partition>)
93 changes: 64 additions & 29 deletions http/src/main/kotlin/kafkasnoop/routes/MessageProcessor.kt
Original file line number Diff line number Diff line change
@@ -1,50 +1,85 @@
package kafkasnoop.routes

import kafkasnoop.KafkaClientFactory
import kafkasnoop.dto.Message
import kotlinx.coroutines.yield
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.time.Instant
import kotlin.math.max

/**
* Message processor
*
* NOTE: changed this to use 1 consumer per partition.
* According to the docs, reading assigning multiple partitions to a consumer without a cosnumer group should
* be fine, but it appeared unreliable. This probably means something was wrong in my code, but will come back to this.
*/
class MessageProcessor(
private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
private val kafkaClientFactory: KafkaClientFactory,
private val topicName: String,
// todo: take offset from query string & filter partitions
private val startOffset: Long = 0L,
) {
private val partitions: List<TopicPartition>
) : AutoCloseable {
val partitions: List<TopicPartition>
@Volatile
private var isClosed = false
init {
logger.debug("Getting messages for $topicName")

partitions = kafkaConsumer.partitionsFor(topicName).map {
TopicPartition(it.topic(), it.partition())
}
kafkaConsumer.assign(partitions)
val beggingOffsets = kafkaConsumer.beginningOffsets(partitions)

partitions.forEach { p ->
val pOffset = java.lang.Long.max(beggingOffsets[p] ?: 0, startOffset)
logger.info("Begging offset for partition $p is $pOffset")
kafkaConsumer.seek(p, pOffset)
kafkaClientFactory.createConsumer().use {
partitions = it.partitionsFor(topicName).map {
TopicPartition(it.topic(), it.partition())
}
}
}

fun startProcess(maxMsgCount: Int = Int.MAX_VALUE) =
fun startProcess(partition: TopicPartition, maxMsgCount: Int = Int.MAX_VALUE, minOffset: Long = 0L) =
sequence {
var msgCount = 0
while (msgCount < maxMsgCount) {
partitions.forEach { partition ->
kafkaConsumer
.poll(Duration.ofMillis(100)).records(partition)
.forEach { record ->
kafkaClientFactory.createConsumer().use { kafkaConsumer ->
kafkaConsumer.assign(listOf(partition))
val beggingOffsets = kafkaConsumer.beginningOffsets(partitions)
val endOffsets = kafkaConsumer.endOffsets(partitions)

// default to rewinding to 5 or max msg count
val offsetDiff = if (maxMsgCount == Int.MAX_VALUE) 5 else maxMsgCount
logger.debug("Min offset for partition $partition is ${beggingOffsets[partition]}")
logger.debug("Max offset for partition $partition is ${endOffsets[partition]}")
val startOffset = max(endOffsets[partition]?.minus(offsetDiff) ?: 0L, 0L)
val offset = max(startOffset, minOffset)
val messageCount = max(endOffsets.getOrDefault(partition, 0) - offset, maxMsgCount.toLong())
logger.info("Loading $messageCount from $partition starting at $offset")
kafkaConsumer.seek(partition, offset)

var messagesLoaded = 0
var emptyPolls = 0
// TODO: tidy-up this logic.
while (!isClosed && (maxMsgCount == Int.MAX_VALUE || emptyPolls <= 5) && messagesLoaded < messageCount) {
logger.debug("Polling $partition from ${kafkaConsumer.position(partition)}")
val msgs = kafkaConsumer
.poll(Duration.ofMillis(200)).records(partition)
.map { record ->
logger.debug("Found message $partition: ${record.offset()}")
val key = String(record.key(), Charsets.UTF_8)
val value = String(record.value(), Charsets.UTF_8)
val msg = Message(partition.toString(), key, value)
yield(msg)
msgCount += 1
Message(record.offset(), partition.toString(), key, value, Instant.ofEpochMilli(record.timestamp()))
}

if (msgs.isEmpty()) {
emptyPolls += 1
logger.debug("Empty polls: $emptyPolls")
Thread.sleep(200)
} else {
logger.debug("Found ${msgs.count()} on $topicName: ${msgs.groupBy { it.partition }.map { it.key to it.value.maxOf { it.offset } }.toMap()}")
val sortedMsgs = msgs.sortedBy { it.offset }
logger.debug("Found $sortedMsgs on $partition")
yieldAll(sortedMsgs)
emptyPolls = 0
}
messagesLoaded += msgs.count()
logger.debug("Loaded $messagesLoaded out of $messageCount")
}
logger.debug("stopping to process")
}
logger.debug("stopping to process")
}

override fun close() {
isClosed = true
}
}
84 changes: 45 additions & 39 deletions http/src/main/kotlin/kafkasnoop/routes/messages.kt
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
package kafkasnoop.routes

import io.ktor.application.*
import io.ktor.http.cio.websocket.*
import io.ktor.response.*
import io.ktor.routing.*
import io.ktor.websocket.*
import com.papsign.ktor.openapigen.annotations.parameters.PathParam
import com.papsign.ktor.openapigen.annotations.parameters.QueryParam
import com.papsign.ktor.openapigen.route.info
import com.papsign.ktor.openapigen.route.path.normal.NormalOpenAPIRoute
import com.papsign.ktor.openapigen.route.path.normal.get
import com.papsign.ktor.openapigen.route.response.respond
import kafkasnoop.KafkaClientFactory

fun Route.messages(kafkaClientFactory: KafkaClientFactory) {

webSocket("/ws/{topic}") {
call.run {
val topicName = call.parameters["topic"] ?: throw IllegalArgumentException("Topic must be provided")

// todo: take partition, limit and offset from query string
val offset = 0L

kafkaClientFactory.createConsumer().use { consumer ->
MessageProcessor(consumer, topicName, offset)
.startProcess().forEach {
send(
Frame.Text(it.toString())
)
}
}
}
}

get("/api/{topic}") {
call.run {
val topicName = call.parameters["topic"] ?: throw IllegalArgumentException("Topic must be provided")

// todo: take partition, limit and offset from query string
val maxMsg = 100
val offset = 0L

kafkaClientFactory.createConsumer().use { consumer ->
val msgs = MessageProcessor(consumer, topicName, offset)
.startProcess(maxMsg).toList()
respond(msgs)
}
import kafkasnoop.dto.Message
import java.time.Instant

private const val MAX_MESSAGES_DEFAULT = 10
private const val MIN_OFFSET_DEFAULT = 0L
data class GetTopicMessagesParams(
@PathParam("Name of the topic")
val topic: String,
@QueryParam("Partition filter (Optional)")
val partition: Int?,
@QueryParam("Maximum number of messages to return per partition - Optional, default: $MAX_MESSAGES_DEFAULT")
val max: Int?,
@QueryParam("Minimum offset to start returning messages from - Optional, default: $MIN_OFFSET_DEFAULT")
val minOffset: Long?
)
fun NormalOpenAPIRoute.messages(kafkaClientFactory: KafkaClientFactory) {
get<GetTopicMessagesParams, List<Message>>(
info("Messages", "Get Messages from given topic"),
example = listOf(
Message(
0,
"topic-parition",
"message-key",
"message-value", Instant.now()
)
)
) { params ->

MessageProcessor(kafkaClientFactory, params.topic).use { processor ->
val msgs = processor.partitions
.filter { null == params.partition || it.partition() == params.partition }
.map { p ->
processor.startProcess(
p,
params.max ?: MAX_MESSAGES_DEFAULT,
params.minOffset ?: MIN_OFFSET_DEFAULT
).toList().sortedBy { it.offset }
}.flatten().sortedByDescending { it.timestamp }
respond(msgs)
}
}
}
Loading

0 comments on commit 5179aa7

Please sign in to comment.