Skip to content

Commit

Permalink
fix(dataflow) wait for kafka topic creation
Browse files Browse the repository at this point in the history
Kafka topic creation happens asynchronously. This means that even when
the return value from `createTopics(...)` indicates that the topic has
been created successfuly, the topic can not be immediately subscribed
to.

Instead of verifying the status of the topic from the `createTopics`
return value, here we're repeatedly calling `describeTopics` until
all of the topics for the pipeline can be described successfully.
This indicates that the topic has been fully created _at least_ on
one broker, and can now be subscribed to.
  • Loading branch information
lc525 committed Feb 25, 2024
1 parent 086870f commit db3eea4
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class PipelineSubscriber(
}
}
.collect()
// TODO - error handling?
// TODO - use supervisor job(s) for spawning coroutines?
}

Expand All @@ -121,8 +120,20 @@ class PipelineSubscriber(
kafkaConsumerGroupIdPrefix: String,
namespace: String,
) {
logger.info("Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}", metadata.name, metadata.version, metadata.id)
val (pipeline, err) = Pipeline.forSteps(metadata, steps, kafkaProperties, kafkaDomainParams, kafkaConsumerGroupIdPrefix, namespace)
logger.info(
"Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
metadata.name,
metadata.version,
metadata.id
)
val (pipeline, err) = Pipeline.forSteps(
metadata,
steps,
kafkaProperties,
kafkaDomainParams,
kafkaConsumerGroupIdPrefix,
namespace
)
if (err != null) {
err.log(logger, Level.ERROR)
client.pipelineUpdateEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,22 @@ the Change License after the Change Date as each is defined in accordance with t

package io.seldon.dataflow.kafka

import com.github.michaelbull.retry.ContinueRetrying
import com.github.michaelbull.retry.policy.RetryPolicy
import com.github.michaelbull.retry.policy.constantDelay
import com.github.michaelbull.retry.policy.limitAttempts
import com.github.michaelbull.retry.policy.plus
import com.github.michaelbull.retry.retry
import io.seldon.mlops.chainer.ChainerOuterClass.PipelineStepUpdate
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import io.klogging.logger as coLogger

class KafkaAdmin(
Expand All @@ -26,6 +36,17 @@ class KafkaAdmin(
suspend fun ensureTopicsExist(
steps: List<PipelineStepUpdate>,
) : Exception? {
val missingTopicRetryPolicy: RetryPolicy<Throwable> = {
when (reason) {
is TimeoutException,
is UnknownTopicOrPartitionException -> ContinueRetrying
else -> {
logger.warn("ignoring exception while waiting for topic creation: ${reason.message}")
ContinueRetrying
}
}
}

try {
steps
.flatMap { step -> step.sourcesList + step.sink + step.triggersList }
Expand All @@ -46,12 +67,17 @@ class KafkaAdmin(
)
}
.run {
adminClient.createTopics(this)
adminClient.createTopics(this, CreateTopicsOptions().timeoutMs(60_000))
}
.values()
.also { topicCreations ->
topicCreations.entries.forEach { creationResult ->
awaitKafkaResult(creationResult)
logger.info("Waiting for kafka topic creation")
// We repeatedly attempt to describe all topics as a way of blocking until they exist at least on
// one broker. This is because the call to createTopics above returns before topics can actually
// be subscribed to.
retry(missingTopicRetryPolicy + limitAttempts(60) + constantDelay(delayMillis = 1000L)) {
logger.debug("Still waiting for all topics to be created...")
adminClient.describeTopics(topicCreations.keys).allTopicNames().get(500, TimeUnit.MILLISECONDS)
}
}
} catch (e: Exception) {
Expand All @@ -62,22 +88,10 @@ class KafkaAdmin(
return e
}

logger.info("All topics created")
return null
}

private suspend fun awaitKafkaResult(result: Map.Entry<String, KafkaFuture<Void>>) {
try {
result.value.get()
logger.info("Topic created ${result.key}")
} catch (e: ExecutionException) {
if (e.cause is TopicExistsException) {
logger.info("Topic already exists ${result.key}")
} else {
throw e
}
}
}

companion object {
private val logger = coLogger(KafkaAdmin::class)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ data class PipelineMetadata(
val version: Int,
)


class Pipeline(
private val metadata: PipelineMetadata,
private val topology: Topology,
Expand Down

0 comments on commit db3eea4

Please sign in to comment.