Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ldc #158

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open

Ldc #158

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.39</version>
<version>1.1.40</version>
<packaging>pom</packaging>

<name>qmq</name>
Expand Down
2 changes: 1 addition & 1 deletion qmq-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.39</version>
<version>1.1.40</version>
</parent>

<artifactId>qmq-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.39</version>
<version>1.1.40</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion qmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.39</version>
<version>1.1.40</version>
</parent>

<artifactId>qmq-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,28 @@
*/
package qunar.tc.qmq.consumer;

import java.util.concurrent.Executor;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import qunar.tc.qmq.*;
import qunar.tc.qmq.ListenerHolder;
import qunar.tc.qmq.MessageConsumer;
import qunar.tc.qmq.MessageListener;
import qunar.tc.qmq.PullConsumer;
import qunar.tc.qmq.SubscribeParam;
import qunar.tc.qmq.common.ClientIdProvider;
import qunar.tc.qmq.common.ClientIdProviderFactory;
import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.common.EnvProvider;
import qunar.tc.qmq.config.NettyClientConfigManager;
import qunar.tc.qmq.consumer.handler.MessageDistributor;
import qunar.tc.qmq.consumer.pull.PullConsumerFactory;
import qunar.tc.qmq.consumer.pull.PullRegister;
import qunar.tc.qmq.netty.client.NettyClient;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executor;

/**
* @author miao.yang [email protected]
* @date 2012-12-28
Expand All @@ -47,6 +53,7 @@ public class MessageConsumerProvider implements MessageConsumer {

private final PullRegister pullRegister;
private String appCode;
private ClientInfo clientInfo;
private String metaServer;
private int destroyWaitInSeconds;

Expand Down Expand Up @@ -79,6 +86,7 @@ public void init() {
this.pullRegister.setEnvProvider(envProvider);
this.pullRegister.setClientId(clientId);
this.pullRegister.setAppCode(appCode);
this.pullRegister.setClientInfo(clientInfo);
this.pullRegister.init();

distributor = new MessageDistributor(pullRegister);
Expand Down Expand Up @@ -159,6 +167,15 @@ public void setAppCode(String appCode) {
this.appCode = appCode;
}

/**
*
* 客户端信息。用户客户端上报等。
* @param clientInfo
*/
public void setClientInfo(ClientInfo clientInfo) {
this.clientInfo = clientInfo;
}

/**
* 用于发现meta server集群的地址
* 格式: http://<meta server address>/meta/address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
import static qunar.tc.qmq.common.StatusSource.HEALTHCHECKER;
import static qunar.tc.qmq.common.StatusSource.OPS;

import com.google.common.base.Strings;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.PullConsumer;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.BrokerServiceImpl;
import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.common.EnvProvider;
import qunar.tc.qmq.common.MapKeyBuilder;
import qunar.tc.qmq.common.StatusSource;
Expand Down Expand Up @@ -70,6 +70,7 @@ public class PullRegister implements ConsumerRegister, ConsumerStateChangedListe
private String clientId;
private String metaServer;
private String appCode;
private ClientInfo clientInfo;
private int destroyWaitInSeconds;

private EnvProvider envProvider;
Expand All @@ -91,6 +92,7 @@ public void init() {
this.metaInfoService.setConsumerStateChangedListener(this);

this.brokerService.setAppCode(appCode);
this.brokerService.setClientInfo(clientInfo);
}

@Override
Expand Down Expand Up @@ -279,6 +281,10 @@ public void setAppCode(String appCode) {
this.appCode = appCode;
}

public void setClientInfo(ClientInfo clientInfo) {
this.clientInfo = clientInfo;
}

@Override
public void destroy() {
for (PullEntry pullEntry : pullEntryMap.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
*/
package qunar.tc.qmq.producer;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.opentracing.Scope;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
Expand All @@ -28,6 +33,7 @@
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.common.ClientIdProvider;
import qunar.tc.qmq.common.ClientIdProviderFactory;
import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.common.EnvProvider;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
Expand All @@ -38,13 +44,6 @@
import qunar.tc.qmq.producer.tx.MessageTracker;
import qunar.tc.qmq.tracing.TraceUtil;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author miao.yang [email protected]
* @date 2013-1-5
Expand All @@ -64,6 +63,7 @@ public class MessageProducerProvider implements MessageProducer {
private final Tracer tracer;

private String appCode;
private ClientInfo clientInfo;
private String metaServer;

private MessageTracker messageTracker;
Expand All @@ -85,6 +85,7 @@ public void init() {

this.routerManager.setMetaServer(this.metaServer);
this.routerManager.setAppCode(appCode);
this.routerManager.setClientInfo(clientInfo);

if (STARTED.compareAndSet(false, true)) {
routerManager.init(clientIdProvider.get());
Expand Down Expand Up @@ -254,6 +255,15 @@ public void setAppCode(String appCode) {
this.appCode = appCode;
}

/**
*
* 客户端信息用户补充appCode 不足
* @param clientInfo
*/
public void setClientInfo(ClientInfo clientInfo) {
this.clientInfo = clientInfo;
}

/**
* 用于发现meta server集群的地址
* 格式: http://<meta server address>/meta/address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package qunar.tc.qmq.producer.sender;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.BrokerServiceImpl;
import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.metainfoclient.MetaInfoService;
import qunar.tc.qmq.tracing.TraceUtil;

import java.util.Map;

/**
* @author zhenyu.nie created on 2017 2017/7/3 14:21
*/
Expand All @@ -39,6 +40,7 @@ public class NettyRouterManager extends AbstractRouterManager {

private String metaServer;
private String appCode;
private ClientInfo clientInfo;

public NettyRouterManager() {
this.metaInfoService = new MetaInfoService();
Expand All @@ -48,6 +50,7 @@ public NettyRouterManager() {
@Override
public void doInit(String clientId) {
this.brokerService.setAppCode(appCode);
this.brokerService.setClientInfo(clientInfo);

this.metaInfoService.setMetaServer(metaServer);
this.metaInfoService.setClientId(clientId);
Expand Down Expand Up @@ -87,4 +90,8 @@ public void setMetaServer(String metaServer) {
public void setAppCode(String appCode) {
this.appCode = appCode;
}

public void setClientInfo(ClientInfo clientInfo) {
this.clientInfo = clientInfo;
}
}
11 changes: 5 additions & 6 deletions qmq-client/src/test/java/qunar/tc/qmq/producer/ConsumerTest.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package qunar.tc.qmq.producer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.ListenerHolder;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageListener;
import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.consumer.MessageConsumerProvider;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author xufeng.deng [email protected]
* @since 2019/6/3
Expand All @@ -26,6 +24,7 @@ public static void main(String[] args) throws Exception {
final MessageConsumerProvider provider = new MessageConsumerProvider();
provider.setMetaServer("http://127.0.0.1:8080/meta/address");
provider.setAppCode("consumer_test");
provider.setClientInfo(ClientInfo.of("consumer_test","ldc1"));
provider.init();
provider.online();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package qunar.tc.qmq.producer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageSendStateListener;
import qunar.tc.qmq.common.ClientInfo;

public class ProducerTest {

Expand All @@ -14,6 +15,7 @@ public class ProducerTest {
public static void main(String[] args) throws Exception {
final MessageProducerProvider provider = new MessageProducerProvider();
provider.setAppCode("producer_test");
provider.setClientInfo(ClientInfo.of("consumer_test","ldc1"));
provider.setMetaServer("http://127.0.0.1:8080/meta/address");
provider.init();

Expand Down
2 changes: 1 addition & 1 deletion qmq-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.39</version>
<version>1.1.40</version>
</parent>

<artifactId>qmq-common</artifactId>
Expand Down
37 changes: 35 additions & 2 deletions qmq-common/src/main/java/qunar/tc/qmq/base/ClientRequestType.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,53 @@
package qunar.tc.qmq.base;

/**
* The enum Client request type.
* @author yunfeng.yang
* @since 2017/10/16
* @since 2017 /10/16
*/
public enum ClientRequestType {
ONLINE(1), HEARTBEAT(2);
/**
*Online client request type.
*/
ONLINE(1),
/**
*Heartbeat client request type.
*/
HEARTBEAT(2);

private int code;

ClientRequestType(int code) {
this.code = code;
}

/**
* Gets code.
*
* @return the code
*/
public int getCode() {
return code;
}

/**
* Is online boolean.
*
* @param code the code
* @return the boolean
*/
public static boolean isOnline(int code){
return HEARTBEAT.code == code;
}

/**
* Is heart beat boolean.
*
* @param code the code
* @return the boolean
*/
public static boolean isHeartBeat(int code){
return HEARTBEAT.code == code;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package qunar.tc.qmq.broker;

import qunar.tc.qmq.common.ClientInfo;
import qunar.tc.qmq.common.ClientType;

/**
Expand All @@ -32,4 +33,6 @@ public interface BrokerService {
void refresh(ClientType clientType, String subject, String group);

void setAppCode(String appCode);

void setClientInfo(ClientInfo clientInfo);
}
Loading