From 9bb26b9cc466677f13a8d7f84bafca52517ced31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E9=BD=90=E7=AB=8B?= Date: Wed, 27 Jul 2022 18:04:17 +0800 Subject: [PATCH] :bug: fix issue 625, support listener_security_protocol_map --- .../kafka/eagle/common/util/KafkaCacheUtils.java | 8 +++++++- .../kafka/eagle/core/factory/KafkaServiceImpl.java | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/efak-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaCacheUtils.java b/efak-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaCacheUtils.java index 71d5f922..c97d7160 100644 --- a/efak-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaCacheUtils.java +++ b/efak-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaCacheUtils.java @@ -111,6 +111,7 @@ private static void refreshKafkaMetaData(List targets, Stat stat, S if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".efak.sasl.enable") || SystemConfigUtils.getBooleanProperty(clusterAlias + ".efak.ssl.enable")) { String endpoints = JSON.parseObject(data).getString("endpoints"); List endpointsList = JSON.parseArray(endpoints, String.class); + String protocolMap = JSON.parseObject(data).getString("listener_security_protocol_map"); String host = ""; int port = 0; if (endpointsList.size() > 1) { @@ -122,7 +123,12 @@ private static void refreshKafkaMetaData(List targets, Stat stat, S protocol = KConstants.Kafka.SSL; } for (String endpointsStr : endpointsList) { - if (endpointsStr.contains(protocol)) { + String endpointName = endpointsStr.split("://")[0]; + String realProtocol = ""; + if (!protocolMap.isEmpty()) { + realProtocol = JSON.parseObject(protocolMap).getString(endpointName); + } + if (endpointName.equals(protocol) || realProtocol.equals(protocol)) { String tmp = endpointsStr.split("//")[1]; host = tmp.split(":")[0]; port = Integer.parseInt(tmp.split(":")[1]); diff --git a/efak-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java b/efak-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java index aa91f34a..1fbf1e6d 100644 --- a/efak-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java +++ b/efak-core/src/main/java/org/smartloli/kafka/eagle/core/factory/KafkaServiceImpl.java @@ -254,6 +254,7 @@ public List getAllBrokersInfo(String clusterAlias) { if (SystemConfigUtils.getBooleanProperty(clusterAlias + ".efak.sasl.enable") || SystemConfigUtils.getBooleanProperty(clusterAlias + ".efak.ssl.enable")) { String endpoints = JSON.parseObject(tupleString).getString("endpoints"); List endpointsList = JSON.parseArray(endpoints, String.class); + String protocolMap = JSON.parseObject(tupleString).getString("listener_security_protocol_map"); String host = ""; int port = 0; if (endpointsList.size() > 1) { @@ -265,7 +266,12 @@ public List getAllBrokersInfo(String clusterAlias) { protocol = Kafka.SSL; } for (String endpointsStr : endpointsList) { - if (endpointsStr.contains(protocol)) { + String endpointName = endpointsStr.split("://")[0]; + String realProtocol = ""; + if (!protocolMap.isEmpty()) { + realProtocol = JSON.parseObject(protocolMap).getString(endpointName); + } + if (endpointName.equals(protocol) || realProtocol.equals(protocol)) { String tmp = endpointsStr.split("//")[1]; host = tmp.split(":")[0]; port = Integer.parseInt(tmp.split(":")[1]);