Skip to content

Commit

Permalink
[feat] Redis键前缀移入FullRedis,作为增强功能
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Mar 17, 2024
1 parent 7c6c06c commit b06fc61
Show file tree
Hide file tree
Showing 17 changed files with 590 additions and 495 deletions.
3 changes: 2 additions & 1 deletion NewLife.Redis.Extensions/DependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static IServiceCollection AddRedis(this IServiceCollection services, Acti
/// <param name="setupAction"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
[Obsolete("=>AddRedis")]
public static IServiceCollection AddPrefixedRedis(this IServiceCollection services, Action<RedisOptions> setupAction)
{
if (services == null)
Expand All @@ -121,7 +122,7 @@ public static IServiceCollection AddPrefixedRedis(this IServiceCollection servic

services.AddOptions();
services.Configure(setupAction);
services.AddSingleton(sp => new PrefixedRedis(sp, sp.GetRequiredService<IOptions<RedisOptions>>().Value));
services.AddSingleton(sp => new FullRedis(sp, sp.GetRequiredService<IOptions<RedisOptions>>().Value));

return services;
}
Expand Down
81 changes: 62 additions & 19 deletions NewLife.Redis/FullRedis.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public static FullRedis Create(String config)
#endregion

#region 属性
/// <summary>键前缀</summary>
public String? Prefix { get; set; }

/// <summary>自动检测集群节点。默认true</summary>
public Boolean AutoDetect { get; set; } = true;

Expand All @@ -61,17 +64,17 @@ public FullRedis(String server, String password, Int32 db) : base(server, passwo
/// <param name="options"></param>
public FullRedis(RedisOptions options)
{
Name = options.InstanceName;
if (!options.InstanceName.IsNullOrEmpty())
Name = options.InstanceName;

Server = options.Server;
Password = options.Password;
Db = options.Db;
Timeout = options.Timeout;
Prefix = options.Prefix;

if (!options.Configuration.IsNullOrEmpty())
Init(options.Configuration);
else
{
Server = options.Server;
Password = options.Password;
Db = options.Db;
Timeout = options.Timeout;
}
}

/// <summary>按照配置服务实例化Redis,用于NETCore依赖注入</summary>
Expand Down Expand Up @@ -209,6 +212,11 @@ public IPool<RedisClient> GetPool(IRedisNode node)
});
}

/// <summary>获取经前缀处理后的键名</summary>
/// <param name="key"></param>
/// <returns></returns>
public String GetKey(String key) => !Prefix.IsNullOrEmpty() ? key.EnsureStart(Prefix) : key;

/// <summary>重载执行,支持集群</summary>
/// <typeparam name="T"></typeparam>
/// <param name="key">用于选择集群节点的key</param>
Expand All @@ -219,6 +227,8 @@ public override T Execute<T>(String key, Func<RedisClient, String, T> func, Bool
{
InitCluster();

key = GetKey(key);

// 如果不支持集群,直接返回
if (Cluster == null) return base.Execute(key, func, write);

Expand Down Expand Up @@ -293,10 +303,12 @@ public virtual T ExecuteOnNode<T, TKey>(TKey key, Func<RedisClient, TKey, T> fun
/// <returns></returns>
public virtual T[] Execute<T>(String[] keys, Func<RedisClient, String[], T> func, Boolean write)
{
if (keys == null || keys.Length == 0) return new T[0];
if (keys == null || keys.Length == 0) return [];

InitCluster();

keys = keys.Select(GetKey).ToArray();

// 如果不支持集群,或者只有一个key,直接执行
if (Cluster == null || keys.Length == 1) return [Execute(keys.FirstOrDefault(), (rds, k) => func(rds, keys), write)];

Expand Down Expand Up @@ -373,6 +385,8 @@ public override async Task<T> ExecuteAsync<T>(String key, Func<RedisClient, Stri
{
InitCluster();

key = GetKey(key);

// 如果不支持集群,直接返回
if (Cluster == null) return await base.ExecuteAsync<T>(key, func, write);

Expand Down Expand Up @@ -423,15 +437,32 @@ public override async Task<T> ExecuteAsync<T>(String key, Func<RedisClient, Stri
}
#endregion

#region 子库
/// <summary>为同一服务器创建不同Db的子级库</summary>
/// <param name="db"></param>
/// <returns></returns>
public override Redis CreateSub(Int32 db)
{
var rds = (base.CreateSub(db) as FullRedis)!;
rds.AutoDetect = AutoDetect;
rds.Prefix = Prefix;

return rds;
}
#endregion

#region 基础操作
/// <summary>批量移除缓存项</summary>
/// <param name="keys">键集合</param>
public override Int32 Remove(params String[] keys)
{
if (keys == null || keys.Length == 0) return 0;

keys = keys.Select(GetKey).ToArray();
if (keys.Length == 1) return base.Remove(keys[0]);

InitCluster();

if (Cluster != null)
{
return Execute(keys, (rds, ks) => rds.Execute<Int32>("DEL", ks), true).Sum();
Expand All @@ -453,6 +484,8 @@ public override IDictionary<String, T> GetAll<T>(IEnumerable<String> keys)
if (keys == null || !keys.Any()) return new Dictionary<String, T>();

var keys2 = keys.ToArray();

keys2 = keys2.Select(GetKey).ToArray();
if (keys2.Length == 1 || Cluster == null) return base.GetAll<T>(keys2);

//Execute(keys.FirstOrDefault(), (rds, k) => rds.GetAll<T>(keys));
Expand Down Expand Up @@ -493,8 +526,9 @@ public override void SetAll<T>(IDictionary<String, T> values, Int32 expire = -1)
return;
}

var keys = values.Keys.Select(GetKey).ToArray();
//Execute(values.FirstOrDefault().Key, (rds, k) => rds.SetAll(values), true);
var rs = Execute([.. values.Keys], (rds, ks) => rds.SetAll(ks.ToDictionary(e => e, e => values[e])), true);
var rs = Execute(keys, (rds, ks) => rds.SetAll(ks.ToDictionary(e => e, e => values[e])), true);

// 使用管道批量设置过期时间
if (expire > 0)
Expand All @@ -504,9 +538,9 @@ public override void SetAll<T>(IDictionary<String, T> values, Int32 expire = -1)
StartPipeline();
try
{
foreach (var item in values)
foreach (var item in keys)
{
SetExpire(item.Key, ts);
SetExpire(item, ts);
}
}
finally
Expand Down Expand Up @@ -607,6 +641,7 @@ public override void SetAll<T>(IDictionary<String, T> values, Int32 expire = -1)
public virtual Boolean Rename(String key, String newKey, Boolean overwrite = true)
{
var cmd = overwrite ? "RENAME" : "RENAMENX";
newKey = GetKey(newKey);

var rs = Execute(key, (r, k) => r.Execute<String>(cmd, k, newKey), true);
if (rs.IsNullOrEmpty()) return false;
Expand Down Expand Up @@ -643,7 +678,7 @@ IEnumerable<String> Scan(IRedisNode? node = null)
while (count > 0)
{
var p = model.Position;
var rs = Execute(r => r.Execute<Object[]>("SCAN", p, "MATCH", model.Pattern + "", "COUNT", count), node);
var rs = Execute(r => r.Execute<Object[]>("SCAN", p, "MATCH", GetKey(model.Pattern + ""), "COUNT", count), node);
if (rs == null || rs.Length != 2) break;

model.Position = (rs[0] as Packet)?.ToStr().ToInt() ?? 0;
Expand Down Expand Up @@ -681,6 +716,8 @@ IEnumerable<String> Scan(IRedisNode? node = null)
/// <returns></returns>
public virtual Int32 RPUSH<T>(String key, params T[] values)
{
// 这里提前打包参数,需要处理key前缀。其它普通情况由Execute处理
key = GetKey(key);
var args = new List<Object>
{
key
Expand All @@ -699,6 +736,8 @@ public virtual Int32 RPUSH<T>(String key, params T[] values)
/// <returns></returns>
public virtual Int32 LPUSH<T>(String key, params T[] values)
{
// 这里提前打包参数,需要处理key前缀。其它普通情况由Execute处理
key = GetKey(key);
var args = new List<Object>
{
key
Expand All @@ -722,7 +761,7 @@ public virtual Int32 LPUSH<T>(String key, params T[] values)
/// <param name="source">源列表名称</param>
/// <param name="destination">元素后写入的新列表名称</param>
/// <returns></returns>
public virtual T? RPOPLPUSH<T>(String source, String destination) => Execute(source, (rc, k) => rc.Execute<T>("RPOPLPUSH", k, destination), true);
public virtual T? RPOPLPUSH<T>(String source, String destination) => Execute(source, (rc, k) => rc.Execute<T>("RPOPLPUSH", k, GetKey(destination)), true);

/// <summary>
/// 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
Expand All @@ -733,7 +772,7 @@ public virtual Int32 LPUSH<T>(String key, params T[] values)
/// <param name="destination">元素后写入的新列表名称</param>
/// <param name="secTimeout">设置的阻塞时长,单位为秒。设置前请确认该值不能超过FullRedis.Timeout 否则会出现异常</param>
/// <returns></returns>
public virtual T? BRPOPLPUSH<T>(String source, String destination, Int32 secTimeout) => Execute(source, (rc, k) => rc.Execute<T>("BRPOPLPUSH", k, destination, secTimeout), true);
public virtual T? BRPOPLPUSH<T>(String source, String destination, Int32 secTimeout) => Execute(source, (rc, k) => rc.Execute<T>("BRPOPLPUSH", k, GetKey(destination), secTimeout), true);

/// <summary>从列表头部弹出一个元素</summary>
/// <typeparam name="T"></typeparam>
Expand All @@ -755,10 +794,11 @@ public virtual Int32 LPUSH<T>(String key, params T[] values)
var sb = new StringBuilder();
foreach (var item in keys)
{
var key = GetKey(item);
if (sb.Length <= 0)
sb.Append($"{item}");
sb.Append($"{key}");
else
sb.Append($" {item}");
sb.Append($" {key}");
}
var rs = Execute(keys[0], (rc, k) => rc.Execute<String[]>("BRPOP", sb.ToString(), secTimeout), true);
if (rs == null || rs.Length != 2) return null;
Expand Down Expand Up @@ -793,10 +833,11 @@ public virtual Int32 LPUSH<T>(String key, params T[] values)
var sb = new StringBuilder();
foreach (var item in keys)
{
var key = GetKey(item);
if (sb.Length <= 0)
sb.Append($"{item}");
sb.Append($"{key}");
else
sb.Append($" {item}");
sb.Append($" {key}");
}
var rs = Execute(keys[0], (rc, k) => rc.Execute<String[]>("BLPOP", sb.ToString(), secTimeout), true);
if (rs == null || rs.Length != 2) return null;
Expand Down Expand Up @@ -824,6 +865,7 @@ public virtual Int32 LPUSH<T>(String key, params T[] values)
/// <returns></returns>
public virtual Int32 SADD<T>(String key, params T[] members)
{
key = GetKey(key);
var args = new List<Object>
{
key
Expand All @@ -842,6 +884,7 @@ public virtual Int32 SADD<T>(String key, params T[] members)
/// <returns></returns>
public virtual Int32 SREM<T>(String key, params T[] members)
{
key = GetKey(key);
var args = new List<Object>
{
key
Expand Down
112 changes: 54 additions & 58 deletions NewLife.Redis/HyperLogLog.cs
Original file line number Diff line number Diff line change
@@ -1,71 +1,67 @@
using System;
using System.Collections.Generic;
namespace NewLife.Caching;

namespace NewLife.Caching
/// <summary>超级基数估算</summary>
/// <remarks>
/// HyperLogLog可以使用固定且很少的内存(每个HyperLogLog结构需要12K字节再加上key本身的几个字节)来存储集合的唯一元素。
/// 返回的可见集合基数并不是精确值, 而是一个带有 0.81% 标准错误(standard error)的近似值。
/// 例如为了记录一天会执行多少次各不相同的搜索查询, 一个程序可以在每次执行搜索查询时调用一次PFADD, 并通过调用PFCOUNT命令来获取这个记录的近似结果。
/// 注意: 这个命令的一个副作用是可能会导致HyperLogLog内部被更改,出于缓存的目的,它会用8字节的来记录最近一次计算得到基数,所以PFCOUNT命令在技术上是个写命令。
/// </remarks>
public class HyperLogLog : RedisBase
{
/// <summary>超级基数估算</summary>
#region 实例化
/// <summary>实例化超级基数</summary>
/// <param name="redis"></param>
/// <param name="key"></param>
public HyperLogLog(Redis redis, String key) : base(redis, key) { }
#endregion

/// <summary>添加</summary>
/// <remarks>
/// HyperLogLog可以使用固定且很少的内存(每个HyperLogLog结构需要12K字节再加上key本身的几个字节)来存储集合的唯一元素
/// 返回的可见集合基数并不是精确值, 而是一个带有 0.81% 标准错误(standard error)的近似值。
/// 例如为了记录一天会执行多少次各不相同的搜索查询, 一个程序可以在每次执行搜索查询时调用一次PFADD, 并通过调用PFCOUNT命令来获取这个记录的近似结果
/// 注意: 这个命令的一个副作用是可能会导致HyperLogLog内部被更改,出于缓存的目的,它会用8字节的来记录最近一次计算得到基数,所以PFCOUNT命令在技术上是个写命令。
/// 这个命令的一个副作用是它可能会更改这个HyperLogLog的内部来反映在每添加一个唯一的对象时估计的基数(集合的基数)
/// 如果一个HyperLogLog的估计的近似基数在执行命令过程中发了变化, PFADD 返回1,否则返回0,
/// 如果指定的key不存在,这个命令会自动创建一个空的HyperLogLog结构(指定长度和编码的字符串)
/// 如果在调用该命令时仅提供变量名而不指定元素也是可以的,如果这个变量名存在,则不会有任何操作,如果不存在,则会创建一个数据结构(返回1)
/// </remarks>
public class HyperLogLog : RedisBase
/// <param name="items"></param>
/// <returns></returns>
public Int32 Add(params String[] items)
{
#region 实例化
/// <summary>实例化超级基数</summary>
/// <param name="redis"></param>
/// <param name="key"></param>
public HyperLogLog(Redis redis, String key) : base(redis, key) { }
#endregion

/// <summary>添加</summary>
/// <remarks>
/// 这个命令的一个副作用是它可能会更改这个HyperLogLog的内部来反映在每添加一个唯一的对象时估计的基数(集合的基数)。
/// 如果一个HyperLogLog的估计的近似基数在执行命令过程中发了变化, PFADD 返回1,否则返回0,
/// 如果指定的key不存在,这个命令会自动创建一个空的HyperLogLog结构(指定长度和编码的字符串)。
/// 如果在调用该命令时仅提供变量名而不指定元素也是可以的,如果这个变量名存在,则不会有任何操作,如果不存在,则会创建一个数据结构(返回1)
/// </remarks>
/// <param name="items"></param>
/// <returns></returns>
public Int32 Add(params String[] items)
var args = new List<Object>
{
Key
};
foreach (var item in items)
{
var args = new List<Object>
{
Key
};
foreach (var item in items)
{
args.Add(item);
}
return Execute((rc, k) => rc.Execute<Int32>("PFADD", args.ToArray()), true);
args.Add(item);
}
return Execute((rc, k) => rc.Execute<Int32>("PFADD", args.ToArray()), true);
}

/// <summary>近似基数</summary>
/// <remarks>
/// 返回存储在HyperLogLog结构体的该变量的近似基数,如果该变量不存在,则返回0。
/// 当参数为多个key时,返回这些HyperLogLog并集的近似基数,这个值是将所给定的所有key的HyperLoglog结构合并到一个临时的HyperLogLog结构中计算而得到的。
/// </remarks>
public Int32 Count => Execute((rc, k) => rc.Execute<Int32>("PFCOUNT", Key));
/// <summary>近似基数</summary>
/// <remarks>
/// 返回存储在HyperLogLog结构体的该变量的近似基数,如果该变量不存在,则返回0。
/// 当参数为多个key时,返回这些HyperLogLog并集的近似基数,这个值是将所给定的所有key的HyperLoglog结构合并到一个临时的HyperLogLog结构中计算而得到的。
/// </remarks>
public Int32 Count => Execute((rc, k) => rc.Execute<Int32>("PFCOUNT", Key));

/// <summary>合并</summary>
/// <remarks>
/// 将多个 HyperLogLog 合并(merge)为一个 HyperLogLog , 合并后的 HyperLogLog 的基数接近于所有输入 HyperLogLog 的可见集合(observed set)的并集。
/// 合并得出的 HyperLogLog 会被储存在目标变量(第一个参数)里面, 如果该键并不存在, 那么命令在执行之前, 会先为该键创建一个空的。
/// </remarks>
/// <param name="keys"></param>
/// <returns></returns>
public Boolean Merge(params String[] keys)
/// <summary>合并</summary>
/// <remarks>
/// 将多个 HyperLogLog 合并(merge)为一个 HyperLogLog , 合并后的 HyperLogLog 的基数接近于所有输入 HyperLogLog 的可见集合(observed set)的并集。
/// 合并得出的 HyperLogLog 会被储存在目标变量(第一个参数)里面, 如果该键并不存在, 那么命令在执行之前, 会先为该键创建一个空的。
/// </remarks>
/// <param name="keys"></param>
/// <returns></returns>
public Boolean Merge(params String[] keys)
{
var args = new List<Object>
{
Key
};
foreach (var item in keys)
{
var args = new List<Object>
{
Key
};
foreach (var item in keys)
{
args.Add(item);
}
return Execute((rc, k) => rc.Execute<String>("PFMERGE", args.ToArray()), true) == "OK";
args.Add(item);
}
return Execute((rc, k) => rc.Execute<String>("PFMERGE", args.ToArray()), true) == "OK";
}
}
Loading

0 comments on commit b06fc61

Please sign in to comment.