diff --git a/deployer/src/main/resources/canal.properties b/deployer/src/main/resources/canal.properties index 9b464324d5..424af91d07 100644 --- a/deployer/src/main/resources/canal.properties +++ b/deployer/src/main/resources/canal.properties @@ -136,7 +136,6 @@ canal.mq.exchange= canal.mq.username= canal.mq.password= canal.mq.aliyunuid= - canal.mq.database.hash = true ################################################## ######### Kafka Kerberos Info ############# diff --git a/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java b/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java index 12160eeb9a..0f0381dfc2 100644 --- a/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java +++ b/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java @@ -42,14 +42,11 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro private Producer producer; private MQProperties kafkaProperties; - private boolean databaseHash; - @Override public void init(MQProperties kafkaProperties) { super.init(kafkaProperties); this.kafkaProperties = kafkaProperties; - databaseHash = kafkaProperties.getDatabaseHash(); Properties properties = new Properties(); properties.put("bootstrap.servers", kafkaProperties.getServers()); properties.put("acks", kafkaProperties.getAcks()); @@ -176,7 +173,7 @@ private List send(MQProperties.CanalDestination canalDestination, String message.getId(), canalDestination.getPartitionsNum(), canalDestination.getPartitionHash(), - databaseHash); + kafkaProperties.getDatabaseHash()); int length = messages.length; for (int i = 0; i < length; i++) { Message messagePartition = messages[i]; @@ -206,7 +203,7 @@ private List send(MQProperties.CanalDestination canalDestination, String FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash(), - databaseHash); + kafkaProperties.getDatabaseHash()); int length = partitionFlatMessage.length; for (int i = 0; i < length; i++) { FlatMessage flatMessagePart = partitionFlatMessage[i]; diff --git a/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java b/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java index 73b1e51436..88a22eaaea 100644 --- a/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java +++ b/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java @@ -40,13 +40,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ private MQProperties mqProperties; private static final String CLOUD_ACCESS_CHANNEL = "cloud"; - private boolean databaseHash; - @Override public void init(MQProperties rocketMQProperties) { super.init(rocketMQProperties); this.mqProperties = rocketMQProperties; - this.databaseHash = rocketMQProperties.getDatabaseHash(); RPCHook rpcHook = null; if (rocketMQProperties.getAliyunAccessKey().length() > 0 && rocketMQProperties.getAliyunSecretKey().length() > 0) { @@ -129,7 +126,7 @@ public void send(final MQProperties.CanalDestination destination, String topicNa message.getId(), destination.getPartitionsNum(), destination.getPartitionHash(), - databaseHash); + mqProperties.getDatabaseHash()); int length = messages.length; ExecutorTemplate template = new ExecutorTemplate(executor); @@ -173,7 +170,7 @@ public void run() { FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, destination.getPartitionsNum(), destination.getPartitionHash(), - databaseHash); + mqProperties.getDatabaseHash()); int length = partitionFlatMessage.length; for (int i = 0; i < length; i++) { partitionFlatMessages.get(i).add(partitionFlatMessage[i]);