Skip to content

Commit

Permalink
Fix rocketmq isProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Jun 3, 2024
1 parent f082384 commit 10881d7
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 149 deletions.
52 changes: 52 additions & 0 deletions docs/cn/livespace.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
多活治理模型
===

应用多活通常包括同城多活和异地多活,异地多活可采用单元化技术来实现。

## 1.多活空间

一个组合可以有多个多活空间,多活空间构成如下所示:
```
.
└── 多活空间
├── 单元路由变量(*)
├── 单元(*)
│   ├── 分区(*)
├── 单元规则(*)
├── 多活域名(*)
│   ├── 单元子域名(*)
│   ├── 路径(*)
│   │   ├── 业务参数(*)
```
## 2.单元

单元是逻辑的,一般对应一个地域,单元化常用于异地多活场景。
1. 单元化通常按用户维度来进行核心业务和数据拆分,每个单元内拥有自己的数据,尽量在单元内闭环调用;
2. 用户请求尽量就近路由到所属单元进行访问;
3. 当一个单元出现故障的时候,所影响的用户范围减少,其它单元还可以正常工作;
4. 单元间的数据可以双向同步,可以把故障单元的用户,一键调拨到到其它单元,大大减少了RTO。

单元分为中心单元和普通单元,其中:
1. 中心单元,除了承接区域性用户流量,还提供全局、强一致性和数据分析的服务,目前限定有且只有一个中心单元。
2. 普通单元只能承接区域性的流量。

## 2.1 分区

分区是逻辑的,一般对应一个云上的可用区或物理数据中心。
1. 单元化通常按用户维度来进行核心业务和数据拆分,每个单元内拥有自己的数据,尽量在单元内闭环调用;
2. 用户请求尽量就近路由到所属单元进行访问;
3. 当一个单元出现故障的时候,所影响的用户范围减少,其它单元还可以正常工作;
4. 单元间的数据可以双向同步,可以把故障单元的用户,一键调拨到到其它单元,大大减少了RTO。

单元分为中心单元和普通单元,其中:
1. 中心单元,除了承接区域性用户流量,还提供全局、强一致性和数据分析的服务,目前限定有且只有一个中心单元。
2. 普通单元只能承接区域性的流量。

## 3.单元路由变量

单元路由变量是单元间进行流量路由的依据,通常指的用户账号,可以包括多个取值方式,对应在不同业务的入口域名的获取方式。




Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
*/
public class Cargo extends Tag {

/**
* Key for identifying the mq producer.
*/
public static final String KEY_MQ_PRODUCER = "mq-producer";

/**
* Key for identifying the entity that restored the cargo.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class EchoController {
@Resource
private FeignService feignService;

@GetMapping({"/echo-rest/{str}","/echo/{str}"})
@GetMapping({"/echo-rest/{str}", "/echo/{str}"})
public String echoRest(@PathVariable String str, HttpServletRequest request) {
String message = restService.echo(str);
return new EchoResponse("spring-consumer", "header", request::getHeader, message).toString();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import com.jd.live.agent.core.plugin.definition.PluginDefinition;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.plugin.transmission.rocketmq.v4.interceptor.MQProducerSendInterceptor;
import com.jd.live.agent.plugin.transmission.rocketmq.v4.interceptor.MQProducerStartInterceptor;
import com.jd.live.agent.plugin.transmission.rocketmq.v4.interceptor.MQProducerInterceptor;

@Extension(value = "MQProducerDefinition_v4", order = PluginDefinition.ORDER_TRANSMISSION)
@ConditionalOnProperties(value = {
Expand All @@ -35,8 +34,6 @@ public class MQProducerDefinition extends PluginDefinitionAdapter {

protected static final String TYPE_MQ_PRODUCER = "org.apache.rocketmq.client.producer.MQProducer";

private static final String METHOD_START = "start";

private static final String METHOD_SEND = "send";

private static final String METHOD_REQUEST = "request";
Expand All @@ -47,10 +44,9 @@ public class MQProducerDefinition extends PluginDefinitionAdapter {

public MQProducerDefinition() {
super(MatcherBuilder.isImplement(TYPE_MQ_PRODUCER),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_START), new MQProducerStartInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_REQUEST), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_ONEWAY), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_MESSAGE_IN_TRANSACTION), new MQProducerSendInterceptor()));
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND), new MQProducerInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_REQUEST), new MQProducerInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_ONEWAY), new MQProducerInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_MESSAGE_IN_TRANSACTION), new MQProducerInterceptor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.context.bag.Cargo;
import org.apache.rocketmq.common.message.Message;

import java.util.Collection;

public class MQProducerSendInterceptor extends InterceptorAdaptor {
public class MQProducerInterceptor extends InterceptorAdaptor {

@SuppressWarnings("unchecked")
@Override
public void onEnter(ExecutableContext ctx) {
Object argument = ctx.getArguments()[0];
RequestContext.setAttribute(Cargo.KEY_MQ_PRODUCER, Boolean.TRUE);
if (argument instanceof Message) {
attachTag((Message) argument);
} else if (argument instanceof Collection) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.jd.live.agent.governance.context.bag.Cargo;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.governance.context.bag.CargoRequires;
import com.jd.live.agent.plugin.transmission.rocketmq.v4.context.RocketmqContext;
import org.apache.rocketmq.common.message.Message;

import java.util.List;
Expand All @@ -41,7 +40,8 @@ public MessageInterceptor(Application application, List<CargoRequire> requires)

@Override
public void onEnter(ExecutableContext ctx) {
if (!RocketmqContext.isProducer()) {
Boolean isProducer = RequestContext.getAttribute(Cargo.KEY_MQ_PRODUCER);
if (isProducer == null || !isProducer) {
restoreTag((Message) ctx.getTarget());
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import com.jd.live.agent.core.plugin.definition.PluginDefinition;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQProducerSendInterceptor;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQProducerStartInterceptor;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.interceptor.MQProducerInterceptor;

@Extension(value = "MQProducerDefinition_v5", order = PluginDefinition.ORDER_TRANSMISSION)
@ConditionalOnProperties(value = {
Expand All @@ -35,8 +34,6 @@ public class MQProducerDefinition extends PluginDefinitionAdapter {

protected static final String TYPE_MQ_PRODUCER = "org.apache.rocketmq.client.producer.MQProducer";

private static final String METHOD_START = "start";

private static final String METHOD_SEND = "send";

private static final String METHOD_REQUEST = "request";
Expand All @@ -47,10 +44,9 @@ public class MQProducerDefinition extends PluginDefinitionAdapter {

public MQProducerDefinition() {
super(MatcherBuilder.isImplement(TYPE_MQ_PRODUCER),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_START), new MQProducerStartInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_REQUEST), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_ONEWAY), new MQProducerSendInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_MESSAGE_IN_TRANSACTION), new MQProducerSendInterceptor()));
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND), new MQProducerInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_REQUEST), new MQProducerInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_ONEWAY), new MQProducerInterceptor()),
new InterceptorDefinitionAdapter(MatcherBuilder.named(METHOD_SEND_MESSAGE_IN_TRANSACTION), new MQProducerInterceptor()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
import com.jd.live.agent.governance.context.RequestContext;
import com.jd.live.agent.governance.context.bag.Cargo;
import org.apache.rocketmq.common.message.Message;

import java.util.Collection;

public class MQProducerSendInterceptor extends InterceptorAdaptor {
public class MQProducerInterceptor extends InterceptorAdaptor {

@SuppressWarnings("unchecked")
@Override
public void onEnter(ExecutableContext ctx) {
Object argument = ctx.getArguments()[0];
RequestContext.setAttribute(Cargo.KEY_MQ_PRODUCER, Boolean.TRUE);
if (argument instanceof Message) {
attachTag((Message) argument);
} else if (argument instanceof Collection) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.jd.live.agent.governance.context.bag.Cargo;
import com.jd.live.agent.governance.context.bag.CargoRequire;
import com.jd.live.agent.governance.context.bag.CargoRequires;
import com.jd.live.agent.plugin.transmission.rocketmq.v5.context.RocketmqContext;
import org.apache.rocketmq.common.message.Message;

import java.util.List;
Expand All @@ -41,7 +40,8 @@ public MessageInterceptor(Application application, List<CargoRequire> requires)

@Override
public void onEnter(ExecutableContext ctx) {
if (!RocketmqContext.isProducer()) {
Boolean isProducer = RequestContext.getAttribute(Cargo.KEY_MQ_PRODUCER);
if (isProducer == null || !isProducer) {
restoreTag((Message) ctx.getTarget());
}
}
Expand Down

0 comments on commit 10881d7

Please sign in to comment.