Skip to content

Commit

Permalink
Merge pull request #8 from driessamyn/schema-endpoint
Browse files Browse the repository at this point in the history
Schema registry endpoints
  • Loading branch information
driessamyn authored Feb 25, 2022
2 parents 50ef8e1 + 52f81a9 commit fc2d4b3
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class MessageSchemaOptionsImpl :

logger.info("Initialising schemaRegistry with: $schemaPath")

val schemaLoader = SchemaLoader().createFromDir(schemaPath)
SchemaRegistry(schemaLoader.all)
SchemaLoader().createFromDir(schemaPath)
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package kafkasnoop.serialisation.avro

import com.papsign.ktor.openapigen.OpenAPIGen
import com.papsign.ktor.openapigen.route.apiRouting
import io.ktor.application.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.websocket.*
import kafkasnoop.avro.SchemaRegistry
import kafkasnoop.http.installContentNegotiation
import kafkasnoop.serialisation.avro.routes.serialiserApiRoutes
import kafkasnoop.serialisation.avro.routes.serialiserRoutes
import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -46,6 +48,11 @@ fun Application.serialisationServer(
}

routing {
// NOTE: openAPI routing wasn't working, I think because of a bug in the binary payload handling
// will come back to this later
serialiserRoutes(schemaRegistry, messageEnvelopeOptions)
}
apiRouting {
serialiserApiRoutes(schemaRegistry)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package kafkasnoop.serialisation.avro.dto

import com.papsign.ktor.openapigen.annotations.Response

@Response("AVRO Schema Registry")
data class SchemaRegistry(
val schemas: List<Schema>,
val failedToParse: List<FailedToParseSchema>
)

data class Schema(
val name: String,
val sha256FingerPrint: String,
val aliases: List<String>,
val type: String,
val fields: List<String>,
val contentUrl: String,
)

data class FailedToParseSchema(val name: String, val dependencies: List<String>)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafkasnoop.serialisation.avro.routes

import com.google.gson.JsonElement
import com.google.gson.JsonParser
import com.papsign.ktor.openapigen.annotations.parameters.PathParam
import com.papsign.ktor.openapigen.route.info
import com.papsign.ktor.openapigen.route.path.normal.NormalOpenAPIRoute
import com.papsign.ktor.openapigen.route.path.normal.get
import com.papsign.ktor.openapigen.route.response.respond
import io.ktor.features.*
import kafkasnoop.avro.SchemaRegistry
import kafkasnoop.serialisation.avro.dto.FailedToParseSchema
import kafkasnoop.serialisation.avro.dto.Schema
import java.net.URLEncoder

fun NormalOpenAPIRoute.schemas(
schemaRegistry: SchemaRegistry
) {
get<Unit, kafkasnoop.serialisation.avro.dto.SchemaRegistry>(
info("Schema Registry", "Get details of all schemas in the registry")
) {
respond(
kafkasnoop.serialisation.avro.dto.SchemaRegistry(
schemaRegistry.all.map {
val fields = if (it.type == org.apache.avro.Schema.Type.ENUM) {
emptyList()
} else {
it.fields.map { f -> "${f.name() ?: ""}[${f.schema().fullName}]" }
}
Schema(
it.fullName,
schemaRegistry.getBase64FingerPrintFor(it.fullName) ?: "",
it.aliases.toList(),
it.type.name,
fields,
"/api/schemas/${URLEncoder.encode(it.fullName, Charsets.UTF_8)}",
)
},
schemaRegistry.failedToParse.map { s -> FailedToParseSchema(s.fullName, s.needs) }
)
)
}
}

data class SchemaNameParam(@PathParam("Schema Full Name") val name: String)

fun NormalOpenAPIRoute.schemaByName(schemaRegistry: SchemaRegistry) {
get<SchemaNameParam, JsonElement>(
info("Schema Content", "Get details of all schemas in the registry")
) { param ->
val schema = schemaRegistry.getByName(param.name)
?: throw NotFoundException("Schema with name ${param.name} not found")
respond(
JsonParser.parseString(schema.toString(true))
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kafkasnoop.serialisation.avro.routes

import com.papsign.ktor.openapigen.route.path.normal.NormalOpenAPIRoute
import com.papsign.ktor.openapigen.route.route
import kafkasnoop.avro.SchemaRegistry

fun NormalOpenAPIRoute.serialiserApiRoutes(
schemaRegistry: SchemaRegistry,
) {
route("/api/schemas").schemas(schemaRegistry)
route("/api/schemas/{name}").schemaByName(schemaRegistry)
}
4 changes: 4 additions & 0 deletions http/snoop-wrap/src/main/kotlin/kafkasnoop/wrap/Server.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import kafkasnoop.routes.snoopApiRoutes
import kafkasnoop.serialisation.ByteToStringService
import kafkasnoop.serialisation.MessageDeserialiser
import kafkasnoop.serialisation.avro.MessageSchemaOptions
import kafkasnoop.serialisation.avro.routes.serialiserApiRoutes
import kafkasnoop.serialisation.avro.routes.serialiserRoutes
import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -68,10 +69,13 @@ class Server(
routing {
openApi()
messagesWs(kafkaClientFactory, deserialiser)
// NOTE: openAPI routing wasn't working, I think because of a bug in the binary payload handling
// will come back to this later
serialiserRoutes(schemaRegistry, messageEnvelopeOptions)
}
apiRouting {
snoopApiRoutes(kafkaClientFactory, deserialiser)
serialiserApiRoutes(schemaRegistry)
}
}
}
4 changes: 3 additions & 1 deletion http/snoop/src/main/kotlin/kafkasnoop/dto/Topic.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ data class Partition(
val offlineReplicas: Int,
)

data class ApiUrls(val get: String, val websocket: String)

@Response("Kafka Topic")
data class Topic(val name: String, val partitions: List<Partition>)
data class Topic(val name: String, val partitions: List<Partition>, val urls: ApiUrls)
4 changes: 2 additions & 2 deletions http/snoop/src/main/kotlin/kafkasnoop/routes/apiRoutes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ fun NormalOpenAPIRoute.snoopApiRoutes(
kafkaClientFactory: KafkaClientFactory,
messageDeserialiser: MessageDeserialiser
) {
route("/api").topics(kafkaClientFactory)
route("/api/{topic}").messages(kafkaClientFactory, messageDeserialiser)
route("/api/topics").topics(kafkaClientFactory)
route("/api/topics/{topic}").messages(kafkaClientFactory, messageDeserialiser)
}
15 changes: 12 additions & 3 deletions http/snoop/src/main/kotlin/kafkasnoop/routes/topics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import com.papsign.ktor.openapigen.route.path.normal.NormalOpenAPIRoute
import com.papsign.ktor.openapigen.route.path.normal.get
import com.papsign.ktor.openapigen.route.response.respond
import kafkasnoop.KafkaClientFactory
import kafkasnoop.dto.ApiUrls
import kafkasnoop.dto.Partition
import kafkasnoop.dto.Topic
import org.apache.kafka.common.TopicPartition
import java.net.URLEncoder

fun NormalOpenAPIRoute.topics(kafkaClientFactory: KafkaClientFactory) {
get<Unit, List<Topic>>(
info("Topics", "Get Topics and Partition Details"),
example = listOf(
Topic(
"topic",
"foo-topic",
listOf(
Partition(
0,
Expand All @@ -23,7 +25,8 @@ fun NormalOpenAPIRoute.topics(kafkaClientFactory: KafkaClientFactory) {
2,
1
)
)
),
ApiUrls("/api/topics/foo-topic", "/ws/topics/foo-topic")
)
)
) {
Expand All @@ -38,6 +41,8 @@ fun NormalOpenAPIRoute.topics(kafkaClientFactory: KafkaClientFactory) {
val endOffsets = it.endOffsets(partitions)
.map { o -> o.key.partition() to o.value }.toMap()

val url = "topics/${URLEncoder.encode(t.key, Charsets.UTF_8)}"

Topic(
t.key,
t.value.map { p ->
Expand All @@ -48,7 +53,11 @@ fun NormalOpenAPIRoute.topics(kafkaClientFactory: KafkaClientFactory) {
p.inSyncReplicas().count(),
p.offlineReplicas().count()
)
}
},
ApiUrls(
"/api/$url",
"/ws/$url",
)
)
}
)
Expand Down
8 changes: 8 additions & 0 deletions lib/avro/src/main/kotlin/kafkasnoop/avro/AvroSchema.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ data class AvroSchema(val fullName: String, val needs: List<String>, val schema:
it.asString
}
.filter { !primitive.contains(it) }
.map {
// if the type doesn't contain . we assume the same namespace as the parent type
// TODO: not sure if this is according to the AVRO spec
if (it.contains('.'))
it
else
"$ns.$it"
}
.toList()
} else {
emptyList()
Expand Down
10 changes: 7 additions & 3 deletions lib/avro/src/main/kotlin/kafkasnoop/avro/SchemaLoader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class SchemaLoader(

// NOTE: there's probably a much more efficient way of doing this.
val ordered = mutableListOf<String>()
var remaining = all.keys
// take those with no dependencies first
var remaining = all.values.sortedBy { it.needs.count() }.map { it.fullName }
while (remaining.isNotEmpty()) {
val newRemaining = mutableListOf<String>()
remaining
Expand All @@ -78,7 +79,7 @@ class SchemaLoader(
)
break
}
remaining = newRemaining.toSet()
remaining = newRemaining
}

val schemaParser = Schema.Parser()
Expand All @@ -90,6 +91,9 @@ class SchemaLoader(
null
}
}.filterNotNull()
return schemaRegistryFactory.create(avroSchemas)
val schemaRegistry = schemaRegistryFactory.create(avroSchemas)
schemaRegistry.failedToParse.addAll(remaining.map { all[it] }.filterNotNull())

return schemaRegistry
}
}
36 changes: 26 additions & 10 deletions lib/avro/src/main/kotlin/kafkasnoop/avro/SchemaRegistry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,51 @@ class SchemaRegistry(schemas: Collection<Schema>) {
private val encoder: Base64.Encoder = Base64.getEncoder()
}

val failedToParse = mutableListOf<AvroSchema>()
private val schemasMap: Map<String, Schema>
private val fingerPrintsToSchema = ConcurrentHashMap<String, Map<String, String>>()
private val schemaToFingerprint = ConcurrentHashMap<String, Map<String, String>>()

init {
schemasMap = schemas.associateBy { it.fullName }
}

val all: Collection<Schema>
get() = schemasMap.values

fun getBase64FingerPrintFor(schemaName: String, algoName: String = "SHA-256"): String? {
return schemaToFingerprint.computeIfAbsent(algoName) {
val cache = fingerPrintsToSchema.computeIfAbsent(algoName) {
computeFingerPrints(algoName)
}
cache.map { it.value to it.key }.toMap()
}[schemaName]
}

fun getByName(name: String): Schema? {
return schemasMap[name]
}

fun getByFingerPrint(print: ByteArray, fingerPrintName: String = DEFAULT_FINGERPRINT_ALGORITHM): Schema? {
val cache = fingerPrintsToSchema.computeIfAbsent(fingerPrintName) {
schemasMap.values.map { schema ->
try {
encoder.encodeToString(SchemaNormalization.parsingFingerprint(fingerPrintName, schema)) to schema.fullName
} catch (e: Exception) {
logger.warn("Cannot encode fingerprint for ${schema.fullName}: $e")
null
}
}.filterNotNull().toMap()
fun getByFingerPrint(print: ByteArray, algoName: String = DEFAULT_FINGERPRINT_ALGORITHM): Schema? {
val cache = fingerPrintsToSchema.computeIfAbsent(algoName) {
computeFingerPrints(algoName)
}
val name = cache[encoder.encodeToString(print)] ?: return null

return schemasMap[name]
}

private fun computeFingerPrints(algoName: String): Map<String, String> {
return schemasMap.values.map { schema ->
try {
encoder.encodeToString(SchemaNormalization.parsingFingerprint(algoName, schema)) to schema.fullName
} catch (e: Exception) {
logger.warn("Cannot encode fingerprint for ${schema.fullName}: $e")
null
}
}.filterNotNull().toMap()
}

fun getDeserialiser(): Deserialiser {
return Deserialiser(this)
}
Expand Down

0 comments on commit fc2d4b3

Please sign in to comment.