Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Oct 15, 2019
1 parent dd75543 commit 1fa9020
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 11 deletions.
1 change: 0 additions & 1 deletion deployer/src/main/resources/canal.properties
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ canal.mq.exchange=
canal.mq.username=
canal.mq.password=
canal.mq.aliyunuid=

canal.mq.database.hash = true
##################################################
######### Kafka Kerberos Info #############
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
private Producer<String, byte[]> producer;
private MQProperties kafkaProperties;

private boolean databaseHash;

@Override
public void init(MQProperties kafkaProperties) {
super.init(kafkaProperties);

this.kafkaProperties = kafkaProperties;
databaseHash = kafkaProperties.getDatabaseHash();
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaProperties.getServers());
properties.put("acks", kafkaProperties.getAcks());
Expand Down Expand Up @@ -176,7 +173,7 @@ private List<Future> send(MQProperties.CanalDestination canalDestination, String
message.getId(),
canalDestination.getPartitionsNum(),
canalDestination.getPartitionHash(),
databaseHash);
kafkaProperties.getDatabaseHash());
int length = messages.length;
for (int i = 0; i < length; i++) {
Message messagePartition = messages[i];
Expand Down Expand Up @@ -206,7 +203,7 @@ private List<Future> send(MQProperties.CanalDestination canalDestination, String
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
canalDestination.getPartitionsNum(),
canalDestination.getPartitionHash(),
databaseHash);
kafkaProperties.getDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
FlatMessage flatMessagePart = partitionFlatMessage[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
private MQProperties mqProperties;
private static final String CLOUD_ACCESS_CHANNEL = "cloud";

private boolean databaseHash;

@Override
public void init(MQProperties rocketMQProperties) {
super.init(rocketMQProperties);
this.mqProperties = rocketMQProperties;
this.databaseHash = rocketMQProperties.getDatabaseHash();
RPCHook rpcHook = null;
if (rocketMQProperties.getAliyunAccessKey().length() > 0
&& rocketMQProperties.getAliyunSecretKey().length() > 0) {
Expand Down Expand Up @@ -129,7 +126,7 @@ public void send(final MQProperties.CanalDestination destination, String topicNa
message.getId(),
destination.getPartitionsNum(),
destination.getPartitionHash(),
databaseHash);
mqProperties.getDatabaseHash());
int length = messages.length;

ExecutorTemplate template = new ExecutorTemplate(executor);
Expand Down Expand Up @@ -173,7 +170,7 @@ public void run() {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
destination.getPartitionsNum(),
destination.getPartitionHash(),
databaseHash);
mqProperties.getDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
Expand Down

0 comments on commit 1fa9020

Please sign in to comment.