Skip to content

Commit

Permalink
Add retrier definition and implementation based on Resilience4j
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhiguo committed May 6, 2024
1 parent 46b5cb4 commit 14f42e2
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.jd.live.agent.governance.invoke.retry;

import com.jd.live.agent.governance.policy.service.failover.FailoverPolicy;

import java.util.function.Supplier;

/**
* Retrier
*
* @since 1.0.0
*/
public interface Retrier {

/**
* Execute retry logic
*
* @param supplier Retry logic
* @param <T> Response type
* @return Response
*/
<T> T execute(Supplier<T> supplier);

/**
* Get failover policy
*
* @return policy
*/
FailoverPolicy getPolicy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.jd.live.agent.governance.invoke.retry;

import com.jd.live.agent.core.extension.annotation.Extensible;
import com.jd.live.agent.governance.policy.service.failover.FailoverPolicy;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* RetrierFactory
*
* @since 1.0.0
*/
@Extensible("RetrierFactory")
public interface RetrierFactory {

Map<Long, AtomicReference<Retrier>> RETRIERS = new ConcurrentHashMap<>();

default Retrier get(FailoverPolicy policy) {
if (policy == null) {
return null;
}
AtomicReference<Retrier> reference = RETRIERS.computeIfAbsent(policy.getId(), n -> new AtomicReference<>());
Retrier retrier = reference.get();
if (retrier != null && retrier.getPolicy().getVersion() >= policy.getVersion()) {
return retrier;
}
Retrier newLimiter = create(policy);
while (true) {
retrier = reference.get();
if (retrier == null || retrier.getPolicy().getVersion() < policy.getVersion()) {
if (reference.compareAndSet(retrier, newLimiter)) {
retrier = newLimiter;
break;
}
}
}
return retrier;
}

/**
* Create Retrier
*
* @param failoverPolicy Failure retry policy
* @return Retrier
*/
Retrier create(FailoverPolicy failoverPolicy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package com.jd.live.agent.governance.policy.service.failover;

import com.jd.live.agent.governance.policy.PolicyId;
import com.jd.live.agent.governance.policy.PolicyInherit.PolicyInheritWithId;
import com.jd.live.agent.governance.policy.service.annotation.Consumer;
import lombok.Getter;
import lombok.Setter;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
* Defines a failover policy that specifies the behavior of a system or component in the event of a failure.
* This includes how many retry attempts should be made and the timeout for each attempt before considering
Expand All @@ -36,7 +41,7 @@
@Getter
@Setter
@Consumer
public class FailoverPolicy implements PolicyInheritWithId<FailoverPolicy> {
public class FailoverPolicy extends PolicyId implements PolicyInheritWithId<FailoverPolicy> {

/**
* The unique identifier of the failover policy. This ID can be used to reference and manage the policy
Expand All @@ -57,6 +62,16 @@ public class FailoverPolicy implements PolicyInheritWithId<FailoverPolicy> {
*/
private Integer timeoutInMilliseconds;

/**
* Collection of retry status codes. This parameter specifies which status codes should be considered retryable.
*/
private Set<Integer> retryableStatusCodes = new HashSet<>(Arrays.asList(500, 502, 503));

/**
* The version of the policy.
*/
private long version;

@Override
public void supplement(FailoverPolicy source) {
if (source == null) {
Expand All @@ -68,5 +83,11 @@ public void supplement(FailoverPolicy source) {
if (timeoutInMilliseconds == null) {
timeoutInMilliseconds = source.timeoutInMilliseconds;
}
if ((retryableStatusCodes == null || retryableStatusCodes.isEmpty()) && source.retryableStatusCodes != null) {
retryableStatusCodes = source.retryableStatusCodes;
}
if (version <= 0) {
version = source.version;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import java.util.Map;

/**
* Provides a base implementation for rate limiting policies. This abstract class defines common attributes
* and functionalities that all specific rate limiting policies should inherit. It includes basic properties
* Provides a base implementation for limiting policies. This abstract class defines common attributes
* and functionalities that all specific limiting policies should inherit. It includes basic properties
* such as policy name, strategy type, conditions under which the policy applies, and versioning information.
* <p>
* The class also outlines the mechanism for rate limiting by specifying an algorithm or a component (e.g.,
* The class also outlines the mechanism for limiting by specifying an algorithm or a component (e.g.,
* FixedWindow, LeakyBucket, TokenBucket, Sentinel, Resilience4j) that implements the actual limiting logic.
* This allows for flexible and extensible design where different rate limiting strategies can be easily
* This allows for flexible and extensible design where different limiting strategies can be easily
* integrated and managed.
* </p>
*
Expand All @@ -42,7 +42,7 @@
public abstract class AbstractLimitPolicy extends PolicyId implements LimitPolicy {

/**
* The name of the rate limiting policy.
* The name of the limiting policy.
*/
private String name;

Expand All @@ -57,20 +57,20 @@ public abstract class AbstractLimitPolicy extends PolicyId implements LimitPolic
protected List<TagCondition> conditions;

/**
* Specifies the algorithm or component used for implementing the rate limiting logic.
* Specifies the algorithm or component used for implementing the limiting logic.
* <p>
* Examples include FixedWindow, LeakyBucket, TokenBucket, Sentinel, Resilience4j, etc.
* </p>
*/
private String strategyType;

/**
* A map of parameters that further customize the action of the rate limiting strategy.
* A map of parameters that further customize the action of the limiting strategy.
*/
private Map<String, String> actionParameters;

/**
* The version of the rate limiting policy.
* The version of the limiting policy.
*/
private long version;

Expand All @@ -81,34 +81,34 @@ public AbstractLimitPolicy() {
}

/**
* Constructs a new rate limiting policy with the specified name.
* Constructs a new limiting policy with the specified name.
*
* @param name the name of the rate limiting policy
* @param name the name of the limiting policy
*/
public AbstractLimitPolicy(String name) {
this.name = name;
}

/**
* Constructs a new rate limiting policy with detailed specifications.
* Constructs a new limiting policy with detailed specifications.
*
* @param name the name of the rate limiting policy
* @param strategyType the strategy type of the rate limiting policy
* @param conditions a list of conditions (tags) for the rate limiting policy
* @param version the version of the rate limiting policy
* @param name the name of the limiting policy
* @param strategyType the strategy type of the limiting policy
* @param conditions a list of conditions (tags) for the limiting policy
* @param version the version of the limiting policy
*/
public AbstractLimitPolicy(String name, String strategyType, List<TagCondition> conditions, long version) {
this(name, strategyType, RelationType.AND, conditions, version);
}

/**
* Constructs a new rate limiting policy with comprehensive specifications including relation type.
* Constructs a new limiting policy with comprehensive specifications including relation type.
*
* @param name the name of the rate limiting policy
* @param strategyType the strategy type of the rate limiting policy
* @param name the name of the limiting policy
* @param strategyType the strategy type of the limiting policy
* @param relationType how conditions are related when evaluating the applicability of the policy
* @param conditions a list of conditions (tags) for the rate limiting policy
* @param version the version of the rate limiting policy
* @param conditions a list of conditions (tags) for the limiting policy
* @param version the version of the limiting policy
*/
public AbstractLimitPolicy(String name, String strategyType, RelationType relationType,
List<TagCondition> conditions, long version) {
Expand All @@ -120,11 +120,11 @@ public AbstractLimitPolicy(String name, String strategyType, RelationType relati
}

/**
* Supplements the current rate limiting policy with another policy's details. This method is used
* Supplements the current limiting policy with another policy's details. This method is used
* to inherit or override attributes from another policy. If the current policy lacks specific attributes,
* they are filled in with the values from the source policy.
*
* @param source the source rate limiting policy to supplement from
* @param source the source limiting policy to supplement from
*/
public void supplement(AbstractLimitPolicy source) {
if (source == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4j.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4j.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.jd.live.agent.implement.flowcontrol.resilience4j.retry;

import com.jd.live.agent.governance.invoke.retry.Retrier;
import com.jd.live.agent.governance.policy.service.failover.FailoverPolicy;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;

import java.time.Duration;
import java.util.function.Supplier;

/**
* Resilience4jRetrier
*
* @since 1.0.0
*/
public class Resilience4jRetrier implements Retrier {

private final FailoverPolicy policy;

private final Retry retry;

public Resilience4jRetrier(FailoverPolicy policy) {
this.policy = policy;
RetryConfig config = RetryConfig.custom()
.maxAttempts(policy.getRetry())
.waitDuration(Duration.ofMillis(policy.getTimeoutInMilliseconds()))
// TODO
// .retryOnResult(response -> response.getStatus() == 500)
.failAfterMaxAttempts(true)
.build();
RetryRegistry registry = RetryRegistry.of(config);
retry = registry.retry(policy.getId().toString());
}

/**
* {@inheritDoc}
*/
@Override
public <T> T execute(Supplier<T> supplier) {
return retry.executeSupplier(supplier);
}

/**
* {@inheritDoc}
*/
@Override
public FailoverPolicy getPolicy() {
return policy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.jd.live.agent.implement.flowcontrol.resilience4j.retry;

import com.jd.live.agent.governance.invoke.retry.Retrier;
import com.jd.live.agent.governance.invoke.retry.RetrierFactory;
import com.jd.live.agent.governance.policy.service.failover.FailoverPolicy;

/**
* Resilience4jRetrierFactory
*
* @since 1.0.0
*/
public class Resilience4jRetrierFactory implements RetrierFactory {

@Override
public Retrier create(FailoverPolicy failoverPolicy) {
return new Resilience4jRetrier(failoverPolicy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.jd.live.agent.implement.flowcontrol.resilience4j.retry.Resilience4jRetrierFactory

0 comments on commit 14f42e2

Please sign in to comment.