Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Oct 15, 2019
1 parent 65b5325 commit dd75543
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ public void run() {
*
* @param partitionsNum 分区数
* @param pkHashConfigs 分区库表主键正则表达式
* @param databaseHash 是否取消根据database进行hash
* @param databaseHash 是否取消根据database进行hash
* @return 分区message数组
*/
@SuppressWarnings("unchecked")
public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum, String pkHashConfigs,boolean databaseHash) {
public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum,
String pkHashConfigs, boolean databaseHash) {
if (partitionsNum == null) {
partitionsNum = 1;
}
Expand Down Expand Up @@ -281,7 +282,7 @@ public static Message[] messagePartition(EntryRowData[] datas, long id, Integer

for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
int hashCode = 0;
if(databaseHash){
if (databaseHash) {
hashCode = database.hashCode();
}
CanalEntry.EventType eventType = rowChange.getEventType();
Expand Down Expand Up @@ -448,10 +449,11 @@ public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id)
* @param flatMessage flatMessage
* @param partitionsNum 分区数量
* @param pkHashConfigs hash映射
* @param databaseHash 是否取消根据database进行hash
* @param databaseHash 是否取消根据database进行hash
* @return 拆分后的flatMessage数组
*/
public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,boolean databaseHash) {
public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,
boolean databaseHash) {
if (partitionsNum == null) {
partitionsNum = 1;
}
Expand Down Expand Up @@ -482,7 +484,7 @@ public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer pa
int idx = 0;
for (Map<String, String> row : flatMessage.getData()) {
int hashCode = 0;
if(databaseHash){
if (databaseHash) {
hashCode = database.hashCode();
}
if (pkNames != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ public class MQProperties {
private String exchange = "";
// 消息发送的并行度
private int parallelThreadSize = 8;
//是否取消根据database进行hash
// 是否取消根据database进行hash
private boolean databaseHash = true;


public static class CanalDestination {

private String canalDestination;
Expand Down Expand Up @@ -349,6 +348,7 @@ public boolean getDatabaseHash() {
public void setDatabaseHash(boolean databaseHash) {
this.databaseHash = databaseHash;
}

@Override
public String toString() {
return "MQProperties [servers=" + servers + ", retries=" + retries + ", batchSize=" + batchSize + ", lingerMs="
Expand All @@ -362,9 +362,7 @@ public String toString() {
+ kerberosEnable + ", kerberosKrb5FilePath=" + kerberosKrb5FilePath + ", kerberosJaasFilePath="
+ kerberosJaasFilePath + ", username=" + username + ", password=" + password + ", vhost=" + vhost
+ ", aliyunUID=" + aliyunUID + ", exchange=" + exchange + ", parallelThreadSize=" + parallelThreadSize
+ ",databaseHash=" + databaseHash +"]";
+ ",databaseHash=" + databaseHash + "]";
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQPro
private Producer<String, byte[]> producer;
private MQProperties kafkaProperties;

private boolean databaseHash;
private boolean databaseHash;

@Override
public void init(MQProperties kafkaProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import com.alibaba.mq.amqp.utils.UserUtils;
import com.rabbitmq.client.impl.CredentialsProvider;


public class AliyunCredentialsProvider implements CredentialsProvider {


/**
* Access Key ID
*/
Expand All @@ -27,16 +25,14 @@ public class AliyunCredentialsProvider implements CredentialsProvider {
/**
* 资源主账号ID
*/
private final long resourceOwnerId;

private final long resourceOwnerId;

public AliyunCredentialsProvider(final String accessKey, final String accessSecret, final long resourceOwnerId) {
public AliyunCredentialsProvider(final String accessKey, final String accessSecret, final long resourceOwnerId){
this.AliyunAccessKey = accessKey;
this.AliyunAccessSecret = accessSecret;
this.resourceOwnerId = resourceOwnerId;
}


@Override
public String getUsername() {
return UserUtils.getUserName(AliyunAccessKey, resourceOwnerId);
Expand All @@ -51,5 +47,4 @@ public String getPassword() {
return null;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
private MQProperties mqProperties;
private static final String CLOUD_ACCESS_CHANNEL = "cloud";

private boolean databaseHash;
private boolean databaseHash;

@Override
public void init(MQProperties rocketMQProperties) {
Expand Down Expand Up @@ -128,7 +128,8 @@ public void send(final MQProperties.CanalDestination destination, String topicNa
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
destination.getPartitionsNum(),
destination.getPartitionHash(),databaseHash);
destination.getPartitionHash(),
databaseHash);
int length = messages.length;

ExecutorTemplate template = new ExecutorTemplate(executor);
Expand Down Expand Up @@ -171,7 +172,8 @@ public void run() {
for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
destination.getPartitionsNum(),
destination.getPartitionHash(),databaseHash);
destination.getPartitionHash(),
databaseHash);
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
Expand Down

0 comments on commit dd75543

Please sign in to comment.