Skip to content

Commit

Permalink
Fix spring cloud & rocketmq transmission error.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Jun 3, 2024
1 parent 923714f commit 8957154
Show file tree
Hide file tree
Showing 25 changed files with 314 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ public boolean isSuccess() {
return throwable == null;
}

public Object getArgument(int index) {
return arguments == null || index < 0 || index >= arguments.length ? null : arguments[index];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public Application(String name, String instance, AppService service, Location lo
this.instance = APP_ID;
}

public String getUniqueThreadName() {
return "thread-" + Thread.currentThread().getId() + "@" + instance;
}

/**
* Retrieves metadata value by key.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ private RouteTarget routeLocal(OutboundInvocation<?> invocation,
private UnitRoute getUnitRoute(OutboundInvocation<?> invocation) {
LiveMetadata liveMetadata = invocation.getLiveMetadata();
UnitRule rule = liveMetadata.getUnitRule();
if (rule == null) {
return null;
}
String variable = liveMetadata.getVariable();
UnitFunction func = invocation.getContext().getUnitFunction(rule.getVariableFunction());
return rule.getUnitRoute(variable, func);
Expand Down
2 changes: 1 addition & 1 deletion joylive-demo/joylive-demo-multilive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<artifactId>joylive-demo-multilive</artifactId>

<properties>
<spring-boot.version>2.2.9.RELEASE</spring-boot.version>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
Expand Down
16 changes: 12 additions & 4 deletions joylive-demo/joylive-demo-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
<artifactId>joylive-demo-rocketmq</artifactId>

<properties>
<spring-boot.version>2.2.9.RELEASE</spring-boot.version>
<spring-cloud.version>2021.0.9</spring-cloud.version>
<spring-boot.version>2.7.18</spring-boot.version>
<spring-cloud-nacos.version>2021.0.6.0</spring-cloud-nacos.version>
<spring-cloud-loadbalancer.version>3.1.8</spring-cloud-loadbalancer.version>
<rocketmq-spring-boot-starter.version>2.3.0</rocketmq-spring-boot-starter.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>

</dependencyManagement>

<dependencies>
Expand All @@ -50,7 +58,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ public RocketmqConsumer(ConsumerService consumerService) {

@Override
public String onMessage(MessageExt message) {
String msg = consumerService.echo(new String(message.getBody(), StandardCharsets.UTF_8));
Map<String, String> properties = message.getProperties();
return new EchoResponse("spring-rocketmq-consumer", "properties", properties::get, msg).toString();
try {
String msg = consumerService.echo(new String(message.getBody(), StandardCharsets.UTF_8));
return new EchoResponse("spring-rocketmq-consumer", "properties", properties::get, msg).toString();
} catch (Throwable e) {
return new EchoResponse("spring-rocketmq-consumer", "properties", properties::get, e.getMessage()).toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

import com.jd.live.agent.demo.rocketmq.config.MqConfig;
import com.jd.live.agent.demo.service.HelloService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@Service
public class ProducerService implements HelloService {
Expand All @@ -35,7 +33,6 @@ public class ProducerService implements HelloService {

@Override
public String echo(String str) {
MessageExt response = rocketMQTemplate.sendAndReceive(mqConfig.getTopic(), str, MessageExt.class, 10000);
return new String(response.getBody(), StandardCharsets.UTF_8);
return rocketMQTemplate.sendAndReceive(mqConfig.getTopic(), str, String.class, 10000);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
spring.application.name=service-rocketmq
server.port=${SERVER_PORT:${random.int[10000,19999]}}

spring.cloud.nacos.discovery.server-addr=${NACOS_ADDR}
spring.cloud.nacos.discovery.fail-fast=true
Expand All @@ -9,7 +10,7 @@ spring.cloud.nacos.discovery.username=${NACOS_USERNAME}
spring.cloud.nacos.discovery.password=${NACOS_PASSWORD}

rocketmq.topic=service-rocketmq
rocketmq.name-server=192.168.171.128:9876
rocketmq.name-server=${ROCKETMQ_ADDR}
rocketmq.producer.group=service-rocketmq
rocketmq.producer.sendMessageTimeout=3000
rocketmq.producer.retryTimesWhenSendFailed=2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,34 @@
<spring-cloud-loadbalancer.version>3.1.8</spring-cloud-loadbalancer.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>${spring-cloud-loadbalancer.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
Expand All @@ -38,11 +61,6 @@
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>${spring-cloud-openfeign.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>${spring-cloud-loadbalancer.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,23 @@ public class DispatcherServletDefinition extends PluginDefinitionAdapter {

protected static final String TYPE_DISPATCHER_SERVLET = "org.springframework.web.servlet.DispatcherServlet";

private static final String METHOD_DO_SERVICE = "doService";
private static final String METHOD_DO_DISPATCH = "doDispatch";

private static final String[] ARGUMENT_DO_SERVICE = new String[]{
private static final String[] ARGUMENT_DO_DISPATCH = new String[]{
"javax.servlet.http.HttpServletRequest",
"javax.servlet.http.HttpServletResponse"
"javax.servlet.http.HttpServletResponse",
};


@Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT)
private InvocationContext context;

public DispatcherServletDefinition() {
this.matcher = () -> MatcherBuilder.named(TYPE_DISPATCHER_SERVLET);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_DO_SERVICE).
and(MatcherBuilder.arguments(ARGUMENT_DO_SERVICE)),
MatcherBuilder.named(METHOD_DO_DISPATCH).
and(MatcherBuilder.arguments(ARGUMENT_DO_DISPATCH)),
() -> new DispatcherServletInterceptor(context)
)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
@ConditionalOnClass(MQClientAPIImplDefinition.TYPE_MQ_CLIENT_API_IMPL)
@ConditionalOnClass(MessageDefinition.TYPE_CLIENT_LOGGER)
public class MQClientAPIImplDefinition extends PluginDefinitionAdapter {

private static final String TYPE_MQ_CLIENT_API_IMPL = "org.apache.rocketmq.client.impl.MQClientAPIImpl";
protected static final String TYPE_MQ_CLIENT_API_IMPL = "org.apache.rocketmq.client.impl.MQClientAPIImpl";

private static final String METHOD_SEND_MESSAGE = "sendMessage";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
@ConditionalOnClass(MQProducerDefinition.TYPE_MQ_PRODUCER)
@ConditionalOnClass(MessageDefinition.TYPE_CLIENT_LOGGER)
public class MQProducerDefinition extends PluginDefinitionAdapter {

private static final String TYPE_MQ_PRODUCER = "org.apache.rocketmq.client.producer.MQProducer";
protected static final String TYPE_MQ_PRODUCER = "org.apache.rocketmq.client.producer.MQProducer";

private static final String METHOD_START = "start";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
@ConditionalOnClass(MessageDefinition.TYPE_MESSAGE)
@ConditionalOnClass(MessageDefinition.TYPE_CLIENT_LOGGER)
public class MessageDefinition extends PluginDefinitionAdapter {

private static final String TYPE_MESSAGE = "org.apache.rocketmq.common.message.Message";
protected static final String TYPE_MESSAGE = "org.apache.rocketmq.common.message.Message";

private static final String METHOD_GET_BODY = "getBody";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,36 @@
import com.jd.live.agent.core.plugin.definition.PluginDefinition;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQProducerInterceptor;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQProducerSendInterceptor;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQProducerStartInterceptor;

@Extension(value = "MQProducerDefinition_v5", order = PluginDefinition.ORDER_TRANSMISSION)
@ConditionalOnProperties(value = {
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
@ConditionalOnClass(MQProducerDefinition.TYPE_MQ_PRODUCER)
@ConditionalOnClass(MessageDefinition.TYPE_ACK_CALLBACK)
public class MQProducerDefinition extends PluginDefinitionAdapter {

private static final String TYPE_MQ_PRODUCER = "org.apache.rocketmq.client.producer.MQProducer";
protected static final String TYPE_MQ_PRODUCER = "org.apache.rocketmq.client.producer.MQProducer";

private static final String METHOD_START = "start";

private static final String METHOD_SEND = "send";

private static final String METHOD_REQUEST = "request";

private static final String METHOD_SEND_MESSAGE_IN_TRANSACTION = "sendMessageInTransaction";

private static final String METHOD_SEND_ONEWAY = "sendOneway";

public MQProducerDefinition() {
super(MatcherBuilder.isImplement(TYPE_MQ_PRODUCER),
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_START).
and(MatcherBuilder.arguments(0)),
new MQProducerInterceptor()));
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_START), new MQProducerStartInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_REQUEST), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_ONEWAY), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_MESSAGE_IN_TRANSACTION), new MQProducerSendInterceptor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
@ConditionalOnClass(MessageDefinition.TYPE_MESSAGE)
@ConditionalOnClass(MessageDefinition.TYPE_ACK_CALLBACK)
public class MessageDefinition extends PluginDefinitionAdapter {

private static final String TYPE_MESSAGE = "org.apache.rocketmq.common.message.Message";
protected static final String TYPE_MESSAGE = "org.apache.rocketmq.common.message.Message";

private static final String METHOD_GET_BODY = "getBody";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,45 @@
*/
package com.jd.live.agent.plugin.transmission.rocketmq.v5.definition;

import com.jd.live.agent.bootstrap.classloader.ResourcerType;
import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
import com.jd.live.agent.core.extension.annotation.*;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.InjectLoader;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinition;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter;
import com.jd.live.agent.core.plugin.definition.PluginDefinition;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQClientAPIImplInterceptor;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MessageUtilInterceptor;

@Extension(value = "MQClientAPIImpl_v5", order = PluginDefinition.ORDER_TRANSMISSION)
import java.util.List;

@Injectable
@Extension(value = "MessageUtilDefinition_v5", order = PluginDefinition.ORDER_TRANSMISSION)
@ConditionalOnProperties(value = {
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
@ConditionalOnClass(MessageUtilDefinition.TYPE_MESSAGE_UTIL)
@ConditionalOnClass(MessageDefinition.TYPE_ACK_CALLBACK)
public class MQClientAPIImplDefinition extends PluginDefinitionAdapter {
public class MessageUtilDefinition extends PluginDefinitionAdapter {

private static final String TYPE_MQ_CLIENT_API_IMPL = "org.apache.rocketmq.client.impl.MQClientAPIImpl";
protected static final String TYPE_MESSAGE_UTIL = "org.apache.rocketmq.client.utils.MessageUtil";

private static final String METHOD_SEND_MESSAGE = "sendMessage";
private static final String METHOD_CREATE_REPLY_MESSAGE = "createReplyMessage";

private static final String[] ARGUMENT_SEND_MESSAGE = {
"java.lang.String",
"java.lang.String",
"org.apache.rocketmq.common.message.Message",
"org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader",
"long",
"org.apache.rocketmq.client.impl.CommunicationMode",
"org.apache.rocketmq.client.producer.SendCallback",
"org.apache.rocketmq.client.impl.producer.TopicPublishInfo",
"org.apache.rocketmq.client.impl.factory.MQClientInstance",
"int",
"org.apache.rocketmq.client.hook.SendMessageContext",
"org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl"
};
@Inject
@InjectLoader(ResourcerType.CORE_IMPL)
private List<CargoRequire> requires;

public MQClientAPIImplDefinition() {
super(TYPE_MQ_CLIENT_API_IMPL,
public MessageUtilDefinition() {
this.matcher = () -> MatcherBuilder.named(TYPE_MESSAGE_UTIL);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_SEND_MESSAGE).
and(MatcherBuilder.arguments(ARGUMENT_SEND_MESSAGE)),
new MQClientAPIImplInterceptor()));
MatcherBuilder.named(METHOD_CREATE_REPLY_MESSAGE), () -> new MessageUtilInterceptor(requires))
};
}

}
Loading

0 comments on commit 8957154

Please sign in to comment.