diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index a2789ac2..d506b5b3 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -45,6 +45,9 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { + private static final String ZONE_ID_CONFIG = "zone.id"; + private static final String ZONE_ID_TEMPLATE = "{zone.id}"; + private static final String KAFKA_CLIENT_ID = "client.id"; private Config config; private List producers = Collections.synchronizedList(new ArrayList<>()); @@ -63,6 +66,13 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I Properties commonProperties = new Properties(); commonProperties.load(new StringReader(config.commonConfig)); + if (commonProperties.containsKey(KAFKA_CLIENT_ID)) { + commonProperties.put( + KAFKA_CLIENT_ID, + applyZoneId( + commonProperties.getProperty(KAFKA_CLIENT_ID), System.getProperty(ZONE_ID_CONFIG))); + } + producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); @@ -151,6 +161,10 @@ public void close() throws Exception { admin.close(); } + private static String applyZoneId(String clientId, String zoneId) { + return clientId.replace(ZONE_ID_TEMPLATE, zoneId); + } + private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);