Skip to content

Commit

Permalink
feat:将速率限制改为了服务端获取
Browse files Browse the repository at this point in the history
  • Loading branch information
15911075183ma committed Dec 18, 2023
1 parent 84c2911 commit bdfa411
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.dongtai.iast.common.state.AgentState;
import io.dongtai.iast.common.state.State;
import io.dongtai.iast.common.state.StateCause;
import io.dongtai.iast.common.utils.limit.InterfaceRateLimiterUtil;
import io.dongtai.log.DongTaiLog;
import io.dongtai.log.ErrorCode;

Expand Down Expand Up @@ -172,19 +171,6 @@ private static void install(final Instrumentation inst) {
if (!AGENT_STATE.isException()) {
AGENT_STATE.setState(State.RUNNING);
}
String rateCaps = System.getProperty("rate.caps");
if (rateCaps != null && !rateCaps.isEmpty()){
try {
InterfaceRateLimiterUtil.initializeInstance(Long.parseLong(rateCaps));
} catch (NumberFormatException e) {
// 当出现异常时,降级为默认速率
rateCaps = IastProperties.getInstance().cfg.getProperty("rate.caps");
DongTaiLog.error("Interface Rate Limiter Initialization Failure Reason:{} \n " +
"the default rate will be used {}",e.getMessage(),rateCaps);
InterfaceRateLimiterUtil.initializeInstance(Long.parseLong(rateCaps));

}
}
} else {
DongTaiLog.error(ErrorCode.AGENT_REGISTER_INFO_INVALID);
AGENT_STATE.setState(State.EXCEPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.dongtai.iast.common.config.ConfigKey;
import io.dongtai.iast.common.constants.AgentConstant;
import io.dongtai.iast.common.constants.ApiPath;
import io.dongtai.iast.common.utils.limit.InterfaceRateLimiterUtil;
import io.dongtai.log.DongTaiLog;
import io.dongtai.log.ErrorCode;

Expand Down Expand Up @@ -48,6 +49,37 @@ private void updateConfig() {
if (logLevel != null) {
DongTaiLog.setLevel(DongTaiLog.parseLevel(logLevel));
}

//获取是否开启qps限流
Boolean enableQpsRate = ConfigBuilder.getInstance().get(ConfigKey.ENABLE_QPS_RATE_LIMIT);
if (enableQpsRate) {
int qpsRateLimit = ConfigBuilder.getInstance().get(ConfigKey.QPS_RATE_LIMIT);
if (qpsRateLimit <= 0){
DongTaiLog.error("qpsRateLimit the value cannot be less than 0");
qpsRateLimit = 100;
DongTaiLog.error("qpsRateLimit revert to 100");

}
int tokenBucketPoolSize = ConfigBuilder.getInstance().get(ConfigKey.TOKEN_BUCKET_POOL_SIZE);
if (tokenBucketPoolSize <= 0){
DongTaiLog.error("tokenBucketPoolSize the value cannot be less than 0");
tokenBucketPoolSize=5000;
DongTaiLog.error("tokenBucketPoolSize revert to 5000");

}
//判断是否已经开启,如果已经开启,则更新数据
if (InterfaceRateLimiterUtil.getRateLimiterState()) {
InterfaceRateLimiterUtil.updateTheData(qpsRateLimit, tokenBucketPoolSize);
} else {
//初始化令牌池工具,设置池大小和速率
InterfaceRateLimiterUtil.initializeInstance(qpsRateLimit, tokenBucketPoolSize);
}
}else {
if(InterfaceRateLimiterUtil.getRateLimiterState()){
InterfaceRateLimiterUtil.turnOffTheRateLimiter();
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ private ConfigBuilder() {
Config.<String>create(ConfigKey.LOGGER_LEVEL));
this.configMap.put(ConfigKey.VALIDATED_SINK,
Config.<Boolean>create(ConfigKey.VALIDATED_SINK).setDefaultValue(false));
this.configMap.put(ConfigKey.ENABLE_QPS_RATE_LIMIT,
Config.<Boolean>create(ConfigKey.ENABLE_QPS_RATE_LIMIT).setDefaultValue(false));
this.configMap.put(ConfigKey.QPS_RATE_LIMIT,
Config.<Integer>create(ConfigKey.QPS_RATE_LIMIT).setDefaultValue(100));
this.configMap.put(ConfigKey.TOKEN_BUCKET_POOL_SIZE,
Config.<Integer>create(ConfigKey.TOKEN_BUCKET_POOL_SIZE).setDefaultValue(5000));
}

public static ConfigBuilder getInstance() {
Expand Down Expand Up @@ -68,6 +74,9 @@ public void update(JSONObject config) {
updateBool(config, ConfigKey.JsonKey.JSON_ENABLE_LOGGER);
updateString(config, ConfigKey.JsonKey.JSON_LOGGER_LEVEL);
updateBool(config, ConfigKey.JsonKey.JSON_VALIDATED_SINK);
updateBool(config, ConfigKey.JsonKey.JSON_ENABLE_QPS_RATE_LIMIT);
updateInt(config, ConfigKey.JsonKey.JSON_QPS_RATE_LIMIT);
updateInt(config, ConfigKey.JsonKey.JSON_TOKEN_BUCKET_POOL_SIZE);
updateRequestDenyList(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public enum ConfigKey {
ENABLE_LOGGER,
LOGGER_LEVEL,
VALIDATED_SINK,
ENABLE_QPS_RATE_LIMIT,
QPS_RATE_LIMIT,
TOKEN_BUCKET_POOL_SIZE,
;

public enum JsonKey {
Expand All @@ -20,6 +23,9 @@ public enum JsonKey {
JSON_ENABLE_LOGGER("enable_log", ENABLE_LOGGER),
JSON_LOGGER_LEVEL("log_level", LOGGER_LEVEL),
JSON_VALIDATED_SINK("report_validated_sink", VALIDATED_SINK),
JSON_ENABLE_QPS_RATE_LIMIT("enable_qps_rate_limit", ENABLE_QPS_RATE_LIMIT),
JSON_QPS_RATE_LIMIT("qps_rate_limit", QPS_RATE_LIMIT),
JSON_TOKEN_BUCKET_POOL_SIZE("token_bucket_pool_size", TOKEN_BUCKET_POOL_SIZE),
;

private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
*/
public class InterfaceRateLimiter {

private final InterfaceRateLimiterSoftReferenceHashMap<String, Bucket> buckets = new InterfaceRateLimiterSoftReferenceHashMap<>();
private final InterfaceRateLimiterSoftReferenceHashMap<String> buckets = new InterfaceRateLimiterSoftReferenceHashMap<>();

private int theNumberOfTokenBucketPools;


//最高速率
private final long rateCaps;
private long rateCaps;


private InterfaceRateLimiter(long rateCaps) {
private InterfaceRateLimiter(long rateCaps, int theNumberOfTokenBucketPools) {
this.rateCaps = rateCaps;
this.theNumberOfTokenBucketPools = theNumberOfTokenBucketPools;
}

public static InterfaceRateLimiter getInstance(long rateCaps) {
return new InterfaceRateLimiter(rateCaps);
public static InterfaceRateLimiter getInstance(long rateCaps, int theNumberOfTokenBucketPools) {
return new InterfaceRateLimiter(rateCaps, theNumberOfTokenBucketPools);
}

/**
Expand All @@ -43,8 +46,8 @@ public boolean whetherItPassesOrNot(String interfaceName) {
if (bucket == null) {
return true;
}
}else {
if (buckets.size() >= 5000){
} else {
if (buckets.size() >= theNumberOfTokenBucketPools) {
return true;
}
bucket = Bucket.builder().addLimit(Bandwidth.simple(rateCaps, Duration.ofSeconds(1))).build();
Expand All @@ -55,4 +58,24 @@ public boolean whetherItPassesOrNot(String interfaceName) {
return probe.isConsumed();
}

/**
* 更新速率限制器的配置
* @param rateCaps 速率
* @param theNumberOfTokenBucketPools 桶池大小
* 只有当值发生改变时,才会开启更新,否则不执行操作
*/
public void updateTheData(long rateCaps, int theNumberOfTokenBucketPools) {
if (this.rateCaps != rateCaps){
this.rateCaps = rateCaps;
buckets.updateTheData(this.rateCaps);
}
if (this.theNumberOfTokenBucketPools != theNumberOfTokenBucketPools) {
this.theNumberOfTokenBucketPools = theNumberOfTokenBucketPools;
//需要更新桶池大小时,判断已有的是否超过最大限制,如果超过,则删除到指定大小
if (theNumberOfTokenBucketPools - buckets.size() < 0){
int i = buckets.size() - theNumberOfTokenBucketPools;
buckets.deleteTheElement(i);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,59 @@
package io.dongtai.iast.common.utils.limit;

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;

import java.lang.ref.SoftReference;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author mazepeng
* @date 2023/12/7 16:56
*/
public class InterfaceRateLimiterSoftReferenceHashMap<K,V> {
public class InterfaceRateLimiterSoftReferenceHashMap<K> {

private final ConcurrentHashMap<K, SoftReference<V>> map = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, SoftReference<Bucket>> map = new ConcurrentHashMap<>();

public void put(K key, V value) {
public void put(K key, Bucket value) {
map.put(key, new SoftReference<>(value));
}

public V get(K key) {
SoftReference<V> softReference = map.get(key);
/**
* 更新令牌桶的速率限制
* @param rateCaps 速率
*/
public void updateTheData(long rateCaps){
map.replaceAll((key, value) -> {
Bucket build = Bucket.builder().addLimit(Bandwidth.simple(rateCaps, Duration.ofSeconds(1))).build();
return new SoftReference<>(build);
});
}

/**
* 删除指定个桶
* @param theNumberOfTokenBucketPools 删除的数量
*/
public void deleteTheElement(int theNumberOfTokenBucketPools){
AtomicInteger atomicInteger = new AtomicInteger(theNumberOfTokenBucketPools);
for (Iterator<Map.Entry<K, SoftReference<Bucket>>> iterator = map.entrySet().iterator(); ; iterator.hasNext()) {
Map.Entry<K, SoftReference<Bucket>> next = iterator.next();

int i = atomicInteger.decrementAndGet();
if (i >= 0) {
iterator.remove();
} else {
// 满足终止条件时,跳出循环
break;
}
}
}

public Bucket get(K key) {
SoftReference<Bucket> softReference = map.get(key);
if (softReference != null) {
return softReference.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@ public static boolean getRateLimiterState(){
return turnOnTheRateLimiter;
}

public static void turnOffTheRateLimiter(boolean state){
turnOnTheRateLimiter = true;
/**
* 关闭速率限制器,并将限制器置为空释放空间
*/
public static void turnOffTheRateLimiter(){
turnOnTheRateLimiter = false;
instance = null;
}

/**
* 初始化速率限制
*/
public static void initializeInstance(long rateCaps) {
public static void initializeInstance(long rateCaps,int theNumberOfTokenBucketPools) {
if (instance == null) {
synchronized (InterfaceRateLimiterUtil.class) {
if (instance == null) {
instance = InterfaceRateLimiter.getInstance(rateCaps);
instance = InterfaceRateLimiter.getInstance(rateCaps,theNumberOfTokenBucketPools);
turnOnTheRateLimiter = true;
}
}
Expand All @@ -56,5 +60,13 @@ public static boolean whetherItPassesOrNot(String interfaceName){
return instance.whetherItPassesOrNot(interfaceName);
}

/**
* 更新令牌桶的设置
* @param rateCaps 速率
* @param theNumberOfTokenBucketPools 令牌桶池上限
*/
public static void updateTheData(long rateCaps,int theNumberOfTokenBucketPools) {
instance.updateTheData(rateCaps,theNumberOfTokenBucketPools);
}
}

0 comments on commit bdfa411

Please sign in to comment.