From 2c79e778720d68bc2bc070b8ebf77d3b60d13542 Mon Sep 17 00:00:00 2001 From: Lucian Carata Date: Fri, 1 Nov 2024 15:07:17 +0000 Subject: [PATCH] fix(dataflow): make each replica use unique subscription names (#6021) Following #6020, it was no longer possible to have multiple replicas of dataflow-engine subscribing simultaneously to the scheduler, because all were connecting with the same subscriber name, and a lock was added per name, first waiting the disconnection of the old subscriber before allowing a new one to progress. We update the dataflow-engine code so that each replica connects with its own hostname as the subscriber name. If the hostname can not be determined, we subscribe with the name seldon-dataflow-engine- followed by the canonical string representation of a UUID v4. The subscriber name can also be explicitly controlled by passing the --dataflow-replica-id argument or the DATAFLOW_REPLICA_ID environment variable, wich will take precedence, in that order, to setting the value as the hostname. --- .../src/main/kotlin/io/seldon/dataflow/Cli.kt | 23 +++++++++++++- .../main/kotlin/io/seldon/dataflow/Main.kt | 4 ++- .../src/main/resources/local.properties | 1 + .../test/kotlin/io/seldon/dataflow/CliTest.kt | 31 +++++++++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt index 1e5485aaea..94a5e393a5 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt @@ -11,6 +11,7 @@ package io.seldon.dataflow import com.natpryce.konfig.CommandLineOption import com.natpryce.konfig.Configuration +import com.natpryce.konfig.ConfigurationMap import com.natpryce.konfig.ConfigurationProperties import com.natpryce.konfig.EnvironmentVariables import com.natpryce.konfig.Key @@ -25,6 +26,8 @@ import io.klogging.Level import io.klogging.noCoLogger import io.seldon.dataflow.kafka.security.KafkaSaslMechanisms import io.seldon.dataflow.kafka.security.KafkaSecurityProtocols +import java.net.InetAddress +import java.util.UUID object Cli { private const val ENV_VAR_PREFIX = "SELDON_" @@ -34,6 +37,7 @@ object Cli { val logLevelApplication = Key("log.level.app", enumType(*Level.values())) val logLevelKafka = Key("log.level.kafka", enumType(*Level.values())) val namespace = Key("pod.namespace", stringType) + val dataflowReplicaId = Key("dataflow.replica.id", stringType) // Seldon components val upstreamHost = Key("upstream.host", stringType) @@ -75,6 +79,7 @@ object Cli { logLevelApplication, logLevelKafka, namespace, + dataflowReplicaId, upstreamHost, upstreamPort, kafkaBootstrapServers, @@ -105,10 +110,26 @@ object Cli { fun configWith(rawArgs: Array): Configuration { val fromProperties = ConfigurationProperties.fromResource("local.properties") + val fromSystem = getSystemConfig() val fromEnv = EnvironmentVariables(prefix = ENV_VAR_PREFIX) val fromArgs = parseArguments(rawArgs) - return fromArgs overriding fromEnv overriding fromProperties + return fromArgs overriding fromEnv overriding fromSystem overriding fromProperties + } + + private fun getSystemConfig(): Configuration { + val dataflowIdPair = this.dataflowReplicaId to getNewDataflowId() + return ConfigurationMap(dataflowIdPair) + } + + fun getNewDataflowId(assignRandomUuid: Boolean = false): String { + if (!assignRandomUuid) { + try { + return InetAddress.getLocalHost().hostName + } catch (_: Exception) { + } + } + return "seldon-dataflow-engine-" + UUID.randomUUID().toString() } private fun parseArguments(rawArgs: Array): Configuration { diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt index 8d4a899eaa..b064a974f5 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt @@ -102,9 +102,11 @@ object Main { describeRetries = config[Cli.topicDescribeRetries], describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis], ) + val subscriberId = config[Cli.dataflowReplicaId] + val subscriber = PipelineSubscriber( - "seldon-dataflow-engine", + subscriberId, kafkaProperties, kafkaAdminProperties, kafkaStreamsParams, diff --git a/scheduler/data-flow/src/main/resources/local.properties b/scheduler/data-flow/src/main/resources/local.properties index 46a7218380..68b3bd408d 100644 --- a/scheduler/data-flow/src/main/resources/local.properties +++ b/scheduler/data-flow/src/main/resources/local.properties @@ -1,5 +1,6 @@ log.level.app=INFO log.level.kafka=WARN +dataflow.replica.id=seldon-dataflow-engine kafka.bootstrap.servers=localhost:9092 kafka.consumer.prefix= kafka.security.protocol=PLAINTEXT diff --git a/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt b/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt index 9011ff3d4c..52a97fa4b5 100644 --- a/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt +++ b/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt @@ -16,9 +16,15 @@ import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments.arguments import org.junit.jupiter.params.provider.MethodSource import strikt.api.expectCatching +import strikt.api.expectThat +import strikt.assertions.hasLength import strikt.assertions.isEqualTo +import strikt.assertions.isNotEqualTo import strikt.assertions.isSuccess +import strikt.assertions.startsWith +import java.util.UUID import java.util.stream.Stream +import kotlin.test.Test internal class CliTest { @DisplayName("Passing auth mechanism via cli argument") @@ -36,6 +42,31 @@ internal class CliTest { .isEqualTo(expectedMechanism) } + @Test + fun `should handle dataflow replica id`() { + val cliDefault = Cli.configWith(arrayOf()) + val testReplicaId = "dataflow-id-1" + val cli = Cli.configWith(arrayOf("--dataflow-replica-id", testReplicaId)) + + expectThat(cliDefault[Cli.dataflowReplicaId]) { + isNotEqualTo("seldon-dataflow-engine") + } + expectThat(cli[Cli.dataflowReplicaId]) { + isEqualTo(testReplicaId) + } + + // test random Uuid (v4) + val expectedReplicaIdPrefix = "seldon-dataflow-engine-" + val uuidStringLength = 36 + val randomReplicaUuid = Cli.getNewDataflowId(true) + expectThat(randomReplicaUuid) { + startsWith(expectedReplicaIdPrefix) + hasLength(expectedReplicaIdPrefix.length + uuidStringLength) + } + expectCatching { UUID.fromString(randomReplicaUuid.removePrefix(expectedReplicaIdPrefix)) } + .isSuccess() + } + companion object { @JvmStatic private fun saslMechanisms(): Stream {