Skip to content

Commit

Permalink
feat:upgrade api circuit-break.
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyeBeFreeman committed Sep 13, 2024
1 parent b5ef058 commit f5a1ece
Show file tree
Hide file tree
Showing 89 changed files with 5,139 additions and 2,099 deletions.
12 changes: 10 additions & 2 deletions polaris-assembly/polaris-assembly-factory/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>polaris-assembly</artifactId>
<groupId>com.tencent.polaris</groupId>
Expand Down Expand Up @@ -72,6 +72,14 @@
<artifactId>trace-otel</artifactId>
<version>${project.version}</version>
</dependency>

<!--事件上报插件-->
<dependency>
<groupId>com.tencent.polaris</groupId>
<artifactId>event-logger</artifactId>
<version>${project.version}</version>
</dependency>

<!--测试依赖插件-->
<dependency>
<groupId>com.tencent.polaris</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

public class FunctionalDecoratorRequest extends InvokeContext.RequestContext {

public FunctionalDecoratorRequest(ServiceKey service, String method) {
super(service, method);
public FunctionalDecoratorRequest(ServiceKey service, String protocol, String method, String path) {
super(service, protocol, method, path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ public static class RequestContext {

private final ServiceKey service;

private final String protocol;

private final String method;

private final String path;

private ResultToErrorCode resultToErrorCode;

public RequestContext(ServiceKey service, String method){
public RequestContext(ServiceKey service, String protocol, String method, String path) {
this.service = service;
this.protocol = protocol;
this.method = method;
this.path = path;
}

public ServiceKey getSourceService() {
Expand All @@ -53,10 +59,18 @@ public ServiceKey getService() {
return service;
}

public String getProtocol() {
return protocol;
}

public String getMethod() {
return method;
}

public String getPath() {
return path;
}

public ResultToErrorCode getResultToErrorCode() {
return resultToErrorCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,83 +32,83 @@

public class DefaultInvokeHandler implements InvokeHandler {

private final CircuitBreakAPI circuitBreakAPI;
private final CircuitBreakAPI circuitBreakAPI;

private final InvokeContext.RequestContext requestContext;
private final InvokeContext.RequestContext requestContext;

public DefaultInvokeHandler(InvokeContext.RequestContext requestContext, CircuitBreakAPI circuitBreakAPI) {
this.requestContext = requestContext;
this.circuitBreakAPI = circuitBreakAPI;
}
public DefaultInvokeHandler(InvokeContext.RequestContext requestContext, CircuitBreakAPI circuitBreakAPI) {
this.requestContext = requestContext;
this.circuitBreakAPI = circuitBreakAPI;
}

@Override
public void acquirePermission() {
CheckResult check = commonCheck(requestContext);
if (check != null){
throw new CallAbortedException(check.getRuleName(), check.getFallbackInfo());
}
}
@Override
public void acquirePermission() {
CheckResult check = commonCheck(requestContext);
if (check != null) {
throw new CallAbortedException(check.getRuleName(), check.getFallbackInfo());
}
}

@Override
public void onSuccess(InvokeContext.ResponseContext responseContext) {
long delay = responseContext.getDurationUnit().toMillis(responseContext.getDuration());
ResultToErrorCode resultToErrorCode = requestContext.getResultToErrorCode();
int code = 0;
RetStatus retStatus = RetStatus.RetUnknown;
if (null != resultToErrorCode) {
code = resultToErrorCode.onSuccess(responseContext.getResult());
}
commonReport(requestContext, delay, code, retStatus);
}
@Override
public void onSuccess(InvokeContext.ResponseContext responseContext) {
long delay = responseContext.getDurationUnit().toMillis(responseContext.getDuration());
ResultToErrorCode resultToErrorCode = requestContext.getResultToErrorCode();
int code = 0;
RetStatus retStatus = RetStatus.RetUnknown;
if (null != resultToErrorCode) {
code = resultToErrorCode.onSuccess(responseContext.getResult());
}
commonReport(requestContext, delay, code, retStatus);
}

@Override
public void onError(InvokeContext.ResponseContext responseContext) {
long delay = responseContext.getDurationUnit().toMillis(responseContext.getDuration());
ResultToErrorCode resultToErrorCode = requestContext.getResultToErrorCode();
int code = -1;
RetStatus retStatus = RetStatus.RetUnknown;
if (null != resultToErrorCode) {
code = resultToErrorCode.onError(responseContext.getError());
}
if (responseContext.getError() instanceof CallAbortedException) {
retStatus = RetStatus.RetReject;
}
commonReport(requestContext, delay, code, retStatus);
}
@Override
public void onError(InvokeContext.ResponseContext responseContext) {
long delay = responseContext.getDurationUnit().toMillis(responseContext.getDuration());
ResultToErrorCode resultToErrorCode = requestContext.getResultToErrorCode();
int code = -1;
RetStatus retStatus = RetStatus.RetUnknown;
if (null != resultToErrorCode) {
code = resultToErrorCode.onError(responseContext.getError());
}
if (responseContext.getError() instanceof CallAbortedException) {
retStatus = RetStatus.RetReject;
}
commonReport(requestContext, delay, code, retStatus);
}

private CheckResult commonCheck(InvokeContext.RequestContext requestContext) {
// check service
Resource svcResource = new ServiceResource(requestContext.getService(),
requestContext.getSourceService());
CheckResult check = circuitBreakAPI.check(svcResource);
if (!check.isPass()) {
return check;
}
// check method
if (StringUtils.isNotBlank(requestContext.getMethod())) {
Resource methodResource = new MethodResource(requestContext.getService(),
requestContext.getMethod(), requestContext.getSourceService());
check = circuitBreakAPI.check(methodResource);
if (!check.isPass()) {
return check;
}
}
return null;
}
private CheckResult commonCheck(InvokeContext.RequestContext requestContext) {
// check service
Resource svcResource = new ServiceResource(requestContext.getService(),
requestContext.getSourceService());
CheckResult check = circuitBreakAPI.check(svcResource);
if (!check.isPass()) {
return check;
}
// check method
if (StringUtils.isNotBlank(requestContext.getPath())) {
Resource methodResource = new MethodResource(requestContext.getService(), requestContext.getProtocol(),
requestContext.getMethod(), requestContext.getPath(), requestContext.getSourceService());
check = circuitBreakAPI.check(methodResource);
if (!check.isPass()) {
return check;
}
}
return null;
}

private void commonReport(InvokeContext.RequestContext requestContext, long delayMills, int code, RetStatus retStatus) {
// report service
Resource svcResource = new ServiceResource(requestContext.getService(),
requestContext.getSourceService());
ResourceStat resourceStat = new ResourceStat(svcResource, code, delayMills, retStatus);
circuitBreakAPI.report(resourceStat);
// report method
if (StringUtils.isNotBlank(requestContext.getMethod())) {
Resource methodResource = new MethodResource(requestContext.getService(),
requestContext.getMethod(), requestContext.getSourceService());
resourceStat = new ResourceStat(methodResource, code, delayMills, retStatus);
circuitBreakAPI.report(resourceStat);
}
}
private void commonReport(InvokeContext.RequestContext requestContext, long delayMills, int code, RetStatus retStatus) {
// report service
Resource svcResource = new ServiceResource(requestContext.getService(),
requestContext.getSourceService());
ResourceStat resourceStat = new ResourceStat(svcResource, code, delayMills, retStatus);
circuitBreakAPI.report(resourceStat);
// report method
if (StringUtils.isNotBlank(requestContext.getPath())) {
Resource methodResource = new MethodResource(requestContext.getService(), requestContext.getProtocol(),
requestContext.getMethod(), requestContext.getPath(), requestContext.getSourceService());
resourceStat = new ResourceStat(methodResource, code, delayMills, retStatus);
circuitBreakAPI.report(resourceStat);
}
}

}
11 changes: 9 additions & 2 deletions polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>polaris-circuitbreaker</artifactId>
<groupId>com.tencent.polaris</groupId>
Expand Down Expand Up @@ -95,6 +95,13 @@
<version>${project.version}</version>
</dependency>

<!--依赖事件上报插件-->
<dependency>
<groupId>com.tencent.polaris</groupId>
<artifactId>event-logger</artifactId>
<version>${project.version}</version>
</dependency>

<!--健康检查插件-->
<dependency>
<groupId>com.tencent.polaris</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ public void testMultipleUrlsNoRule() {
configurationImpl.getConsumer().getCircuitBreaker().setCountersExpireInterval(5000);
try (CircuitBreakAPI circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByConfig(configurationImpl)) {
for (int i = 0; i < 50; i++) {
if (i == 1) {
Utils.sleepUninterrupted(5 * 1000);
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
new ServiceKey(NAMESPACE_TEST, SERVICE_CIRCUIT_BREAKER), "/test/" + i);
new ServiceKey(NAMESPACE_TEST, SERVICE_CIRCUIT_BREAKER), "", "", "/test/" + i);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
Expand Down Expand Up @@ -135,7 +138,7 @@ public void testMultipleUrlsMethodRule() {
Utils.sleepUninterrupted(5 * 1000);
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodService, "/test1/path/" + i);
matchMethodService, "", "", "/test1/path/" + i);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
Expand Down Expand Up @@ -182,7 +185,7 @@ public void testCircuitBreakerRuleChanged() throws IOException {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
matchMethodDetectService, "", "", method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
Expand Down Expand Up @@ -214,7 +217,7 @@ public void testCircuitBreakerRuleChanged() throws IOException {
for (int i = 0; i < 10; i++) {
String method = "/test1/path/" + i;
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
matchMethodDetectService, "", "", method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void testFunctionalDecorator() {
Configuration configuration = TestUtils.configWithEnvAddress();
try (CircuitBreakAPI circuitBreakAPI = CircuitBreakAPIFactory.createCircuitBreakAPIByConfig(configuration)) {
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
new ServiceKey(NAMESPACE_TEST, SERVICE_CIRCUIT_BREAKER), "'");
new ServiceKey(NAMESPACE_TEST, SERVICE_CIRCUIT_BREAKER), "", "", "");
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
if (num % 2 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testFaultDetectRuleChanged() throws IOException {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
matchMethodDetectService, "*", "*", method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testFaultDetectRuleChanged() throws IOException {
method = "/test1/path/" + i;
}
FunctionalDecoratorRequest makeDecoratorRequest = new FunctionalDecoratorRequest(
matchMethodDetectService, method);
matchMethodDetectService, "", "", method);
FunctionalDecorator decorator = circuitBreakAPI.makeFunctionalDecorator(makeDecoratorRequest);
int finalI = i;
Consumer<Integer> integerConsumer = decorator.decorateConsumer(num -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.tencent.polaris.api.plugin.compose.DefaultRouterChainGroup;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.plugin.compose.RouterChainGroup;
import com.tencent.polaris.api.plugin.event.EventReporter;
import com.tencent.polaris.api.plugin.event.FlowEvent;
import com.tencent.polaris.api.plugin.loadbalance.LoadBalancer;
import com.tencent.polaris.api.plugin.registry.LocalRegistry;
import com.tencent.polaris.api.plugin.registry.ResourceFilter;
Expand Down Expand Up @@ -322,4 +324,25 @@ public static void buildFlowControlParam(RequestBaseEntity entity, Configuration
controlParam.setRetryIntervalMs(config.getGlobal().getAPI().getRetryInterval());
}

/**
* 上报流程事件
*
* @param extensions 插件上下文
* @param flowEvent 流程事件
*/
public static void reportFlowEvent(Extensions extensions, FlowEvent flowEvent) {
List<EventReporter> eventReporterList = extensions.getEventReporterList();
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting flow event: {}", flowEvent);
}
for (EventReporter eventReporter : eventReporterList) {
try {
if (!eventReporter.reportEvent(flowEvent)) {
LOG.warn("Report event by {} failed. Flow event detail: {}", eventReporter.getName(), flowEvent);
}
} catch (Throwable throwable) {
LOG.warn("Report event by {} failed. Flow event detail: {}", eventReporter.getName(), flowEvent, throwable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ global:
enable: false
#描述: 调用链实现
reporter: otel
# 事件上报相关的配置
eventReporter:
#描述: 是否启动事件上报
enable: true
#描述: 事件上报插件名列表
reporters:
- logger
#描述: 监控及日志数据上报相关配置
statReporter:
#描述: 是否启用上报
Expand Down
Loading

0 comments on commit f5a1ece

Please sign in to comment.