From 2b0f3b131b1640899cc03e3f1b2afc04494665ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E7=9F=B3=E5=A4=B4?= Date: Sat, 30 Nov 2024 23:18:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=BA=E4=BA=86=E6=8F=90=E5=8D=87=E6=80=A7?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E5=9C=A8=E5=90=8C=E6=AD=A5=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=97=B6=E8=A7=84=E9=81=BF=E5=8D=A1UI?= =?UTF-8?q?=E4=B8=8A=E4=B8=8B=E6=96=87=EF=BC=8C=E6=89=80=E6=9C=89await?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=9C=BA=E9=83=BD=E8=AE=BE=E7=BD=AEConfigure?= =?UTF-8?q?Await(false)=EF=BC=8C=E5=BC=80=E5=90=AFCA2007=E5=B9=B6=E8=A7=86?= =?UTF-8?q?=E4=B8=BA=E7=BC=96=E8=AF=91=E9=94=99=E8=AF=AF=E3=80=82=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E4=B8=8D=E5=BF=85=E8=A6=81=E7=9A=84await=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NewLife.Redis.Extensions.csproj | 47 +++++++++------- NewLife.Redis/FullRedis.cs | 6 +-- NewLife.Redis/NewLife.Redis.csproj | 5 +- NewLife.Redis/PubSub.cs | 6 +-- .../Queues/MultipleConsumerGroupsQueue.cs | 2 +- NewLife.Redis/Queues/QueueExtensions.cs | 14 ++--- NewLife.Redis/Queues/RedisDelayQueue.cs | 10 ++-- NewLife.Redis/Queues/RedisQueue.cs | 4 +- NewLife.Redis/Queues/RedisReliableQueue.cs | 10 ++-- NewLife.Redis/Queues/RedisStream.cs | 53 +++++++++---------- NewLife.Redis/Redis.cs | 6 +-- NewLife.Redis/RedisBase.cs | 2 +- NewLife.Redis/RedisClient.cs | 50 ++++++++--------- NewLife.Redis/RedisSortedSet.cs | 2 +- NewLife.Redis/RedisStack.cs | 4 +- Test/Test.csproj | 2 +- XUnitTest/RedisTest.cs | 8 +-- XUnitTest/XUnitTest.csproj | 2 +- 18 files changed, 122 insertions(+), 111 deletions(-) diff --git a/NewLife.Redis.Extensions/NewLife.Redis.Extensions.csproj b/NewLife.Redis.Extensions/NewLife.Redis.Extensions.csproj index 269d491..d4ba3c2 100644 --- a/NewLife.Redis.Extensions/NewLife.Redis.Extensions.csproj +++ b/NewLife.Redis.Extensions/NewLife.Redis.Extensions.csproj @@ -1,7 +1,7 @@ - netcoreapp3.1;net5.0;net6.0;net7.0;net8.0;netstandard2.0;netstandard2.1 + netcoreapp3.1;net5.0;net6.0;net7.0;net8.0;net9.0;netstandard2.0;netstandard2.1 新生命Redis扩展 Redis扩展库,便于注入Redis,支持分布式缓存IDistributedCache和数据保护IDataProtection 新生命开发团队 @@ -19,6 +19,9 @@ latest True ..\Doc\newlife.snk + 1701;1702;NU5104;NU1505;NETSDK1138;CS7035 + latest + CA2007 @@ -45,30 +48,36 @@ - - - - + + + + - - - - + + + + - - - - - + + + + + - - - - - + + + + + + + + + + + diff --git a/NewLife.Redis/FullRedis.cs b/NewLife.Redis/FullRedis.cs index ec9d717..4f40ad2 100644 --- a/NewLife.Redis/FullRedis.cs +++ b/NewLife.Redis/FullRedis.cs @@ -400,11 +400,11 @@ public override async Task ExecuteAsync(String key, Func(key, func, write); + if (Cluster == null) return await base.ExecuteAsync(key, func, write).ConfigureAwait(false); var node = Cluster.SelectNode(key, write); //?? throw new XException($"集群[{Name}]没有可用节点"); - if (node == null) return await base.ExecuteAsync(key, func, write); + if (node == null) return await base.ExecuteAsync(key, func, write).ConfigureAwait(false); // 统计性能 var sw = Counter?.StartCount(); @@ -420,7 +420,7 @@ public override async Task ExecuteAsync(String key, Funclatest True ..\Doc\newlife.snk + 1701;1702;NU5104;NU1505;NETSDK1138;CS7035 + latest + CA2007 @@ -53,7 +56,7 @@ - + diff --git a/NewLife.Redis/PubSub.cs b/NewLife.Redis/PubSub.cs index e482c04..e2f2ac2 100644 --- a/NewLife.Redis/PubSub.cs +++ b/NewLife.Redis/PubSub.cs @@ -57,7 +57,7 @@ public async Task SubscribeAsync(Action onMessage, CancellationT client.Reset(); var channels = Key.Split(",", ";").Cast().ToArray(); - await client.ExecuteAsync("SUBSCRIBE", channels); + await client.ExecuteAsync("SUBSCRIBE", channels, cancellationToken).ConfigureAwait(false); while (!cancellationToken.IsCancellationRequested) { @@ -65,11 +65,11 @@ public async Task SubscribeAsync(Action onMessage, CancellationT var source2 = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, source.Token); //var rs = await client.ExecuteAsync(null, new Object[] { new Object() }, source2.Token); - var rs = await client.ReadMoreAsync(source2.Token); + var rs = await client.ReadMoreAsync(source2.Token).ConfigureAwait(false); if (rs != null && rs.Length == 3 && rs[0] == "message") onMessage(rs[1], rs[2]); } - await client.ExecuteAsync("SUBSCRIBE", channels); + await client.ExecuteAsync("SUBSCRIBE", channels, cancellationToken).ConfigureAwait(false); Redis.Pool.Return(client); } diff --git a/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs b/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs index 5e5e6d5..03f3160 100644 --- a/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs +++ b/NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs @@ -214,7 +214,7 @@ private async Task getSubscribe(String subscribeAppName) while (_Cts != null && !_Cts.IsCancellationRequested) { - var msg = await _Queue.TakeMessageAsync(10); + var msg = await _Queue.TakeMessageAsync(10).ConfigureAwait(false); if (msg != null && !msg.Id.IsNullOrEmpty()) { try diff --git a/NewLife.Redis/Queues/QueueExtensions.cs b/NewLife.Redis/Queues/QueueExtensions.cs index 00b722e..95b314d 100644 --- a/NewLife.Redis/Queues/QueueExtensions.cs +++ b/NewLife.Redis/Queues/QueueExtensions.cs @@ -173,7 +173,7 @@ public static async Task ConsumeAsync(this RedisReliableQueue queue, try { // 异步阻塞消费 - mqMsg = await queue.TakeOneAsync(timeout, cancellationToken); + mqMsg = await queue.TakeOneAsync(timeout, cancellationToken).ConfigureAwait(false); if (mqMsg != null) { // 埋点 @@ -197,7 +197,7 @@ public static async Task ConsumeAsync(this RedisReliableQueue queue, } // 处理消息 - if (msg != null) await onMessage(msg, mqMsg, cancellationToken); + if (msg != null) await onMessage(msg, mqMsg, cancellationToken).ConfigureAwait(false); // 确认消息 queue.Acknowledge(mqMsg); @@ -205,7 +205,7 @@ public static async Task ConsumeAsync(this RedisReliableQueue queue, else { // 没有消息,歇一会 - await Task.Delay(1000, cancellationToken); + await Task.Delay(1000, cancellationToken).ConfigureAwait(false); } } catch (ThreadAbortException) { break; } @@ -249,9 +249,9 @@ public static async Task ConsumeAsync(this RedisReliableQueue queue, /// 日志对象 /// 消息标识字段名,用于处理错误重试 /// - public static async Task ConsumeAsync(this RedisReliableQueue queue, Action onMessage, CancellationToken cancellationToken = default, ILog? log = null, String? idField = null) + public static Task ConsumeAsync(this RedisReliableQueue queue, Action onMessage, CancellationToken cancellationToken = default, ILog? log = null, String? idField = null) { - await queue.ConsumeAsync((m, k, t) => { onMessage(m); return Task.FromResult(0); }, cancellationToken, log, idField); + return queue.ConsumeAsync((m, k, t) => { onMessage(m); return Task.FromResult(0); }, cancellationToken, log, idField); } /// 队列消费大循环,处理消息后自动确认 @@ -297,7 +297,7 @@ public static async Task ConsumeAsync(this RedisReliableQueue queue, try { // 异步阻塞消费 - mqMsg = await queue.TakeOneAsync(timeout, cancellationToken); + mqMsg = await queue.TakeOneAsync(timeout, cancellationToken).ConfigureAwait(false); if (mqMsg != null) { // 埋点 @@ -312,7 +312,7 @@ public static async Task ConsumeAsync(this RedisReliableQueue queue, } else // 没有消息,歇一会 - await Task.Delay(1000, cancellationToken); + await Task.Delay(1000, cancellationToken).ConfigureAwait(false); } catch (ThreadAbortException) { break; } catch (ThreadInterruptedException) { break; } diff --git a/NewLife.Redis/Queues/RedisDelayQueue.cs b/NewLife.Redis/Queues/RedisDelayQueue.cs index f6c32a2..7217e31 100644 --- a/NewLife.Redis/Queues/RedisDelayQueue.cs +++ b/NewLife.Redis/Queues/RedisDelayQueue.cs @@ -143,13 +143,13 @@ public Int32 Add(params T[] values) while (!cancellationToken.IsCancellationRequested) { var score = DateTime.UtcNow.ToInt(); - var rs = await _sort.RangeByScoreAsync(0, score, 0, 1, cancellationToken); + var rs = await _sort.RangeByScoreAsync(0, score, 0, 1, cancellationToken).ConfigureAwait(false); if (rs != null && rs.Length > 0 && TryPop(rs[0])) return rs[0]; // 是否需要等待 if (timeout <= 0) break; - await Task.Delay(1000, cancellationToken); + await Task.Delay(1000, cancellationToken).ConfigureAwait(false); timeout--; } @@ -231,7 +231,7 @@ public async Task TransferAsync(IProducerConsumer queue, Action? o { // 异步阻塞消费 var score = DateTime.UtcNow.ToInt(); - var msgs = await _sort.RangeByScoreAsync(0, score, 0, 10, cancellationToken); + var msgs = await _sort.RangeByScoreAsync(0, score, 0, 10, cancellationToken).ConfigureAwait(false); if (msgs != null && msgs.Length > 0) { // 删除消息后直接进入目标队列,无需进入Ack @@ -248,8 +248,10 @@ public async Task TransferAsync(IProducerConsumer queue, Action? o if (list.Count > 0) queue.Add(list.ToArray()); } else + { // 没有消息,歇一会 - await Task.Delay(TransferInterval * 1000, cancellationToken); + await Task.Delay(TransferInterval * 1000, cancellationToken).ConfigureAwait(false); + } } catch (ThreadAbortException) { break; } catch (ThreadInterruptedException) { break; } diff --git a/NewLife.Redis/Queues/RedisQueue.cs b/NewLife.Redis/Queues/RedisQueue.cs index 25c896c..2725dd1 100644 --- a/NewLife.Redis/Queues/RedisQueue.cs +++ b/NewLife.Redis/Queues/RedisQueue.cs @@ -121,11 +121,11 @@ public Int32 Add(params T[] values) /// public async Task TakeOneAsync(Int32 timeout = 0, CancellationToken cancellationToken = default) { - if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOP", Key), true); + if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOP", Key), true).ConfigureAwait(false); if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000; - var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOP", new Object[] { Key, timeout }, cancellationToken), true); + var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOP", [Key, timeout], cancellationToken), true).ConfigureAwait(false); return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T)); } diff --git a/NewLife.Redis/Queues/RedisReliableQueue.cs b/NewLife.Redis/Queues/RedisReliableQueue.cs index 39804c5..3f7ae51 100644 --- a/NewLife.Redis/Queues/RedisReliableQueue.cs +++ b/NewLife.Redis/Queues/RedisReliableQueue.cs @@ -168,8 +168,8 @@ public Int32 Add(params T[] values) if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000; var rs = timeout < 0 ? - await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOPLPUSH", new Object[] { Key, AckKey }, cancellationToken), true) : - await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOPLPUSH", new Object[] { Key, AckKey, timeout }, cancellationToken), true); + await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOPLPUSH", [Key, AckKey], cancellationToken), true).ConfigureAwait(false) : + await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOPLPUSH", [Key, AckKey, timeout], cancellationToken), true).ConfigureAwait(false); if (rs != null) _Status.Consumes++; @@ -332,8 +332,8 @@ public async Task ConsumeAsync(Func> func, In // 取出消息键 var msgId = timeout < 0 ? - await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOPLPUSH", Key, AckKey), true) : - await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOPLPUSH", Key, AckKey, timeout), true); + await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOPLPUSH", Key, AckKey), true).ConfigureAwait(false) : + await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOPLPUSH", Key, AckKey, timeout), true).ConfigureAwait(false); if (msgId.IsNullOrEmpty()) return default; _Status.Consumes++; @@ -347,7 +347,7 @@ await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOPLPUSH", Key, AckKey), } // 处理消息。如果消息已被删除,此时调用func将受到空引用 - var rs = await func(messge); + var rs = await func(messge).ConfigureAwait(false); // 确认并删除消息 Redis.Remove(msgId); diff --git a/NewLife.Redis/Queues/RedisStream.cs b/NewLife.Redis/Queues/RedisStream.cs index a8c1a7f..459f173 100644 --- a/NewLife.Redis/Queues/RedisStream.cs +++ b/NewLife.Redis/Queues/RedisStream.cs @@ -288,7 +288,7 @@ public IEnumerable Take(Int32 count = 1) /// public async Task TakeOneAsync(Int32 timeout = 0, CancellationToken cancellationToken = default) { - var msg = await TakeMessageAsync(timeout, cancellationToken); + var msg = await TakeMessageAsync(timeout, cancellationToken).ConfigureAwait(false); if (msg == null) return default; return msg.GetBody(); @@ -305,7 +305,7 @@ public IEnumerable Take(Int32 count = 1) /// public async Task TakeMessageAsync(Int32 timeout = 0, CancellationToken cancellationToken = default) { - var rs = await TakeMessagesAsync(1, timeout, cancellationToken); + var rs = await TakeMessagesAsync(1, timeout, cancellationToken).ConfigureAwait(false); return rs?.FirstOrDefault(); } @@ -328,7 +328,7 @@ public IEnumerable Take(Int32 count = 1) // 抢过来的消息,优先处理,可能需要多次消费才能消耗完 if (_claims > 0) { - var rs2 = await ReadGroupAsync(group, Consumer, count, 3_000, "0", cancellationToken); + var rs2 = await ReadGroupAsync(group, Consumer, count, 3_000, "0", cancellationToken).ConfigureAwait(false); if (rs2 != null && rs2.Count > 0) { _claims -= rs2.Count; @@ -347,8 +347,8 @@ public IEnumerable Take(Int32 count = 1) //var id = FromLastOffset ? "$" : ">"; var rs = !group.IsNullOrEmpty() ? - await ReadGroupAsync(group, Consumer, count, t, ">", cancellationToken) : - await ReadAsync(StartId, count, t, cancellationToken); + await ReadGroupAsync(group, Consumer, count, t, ">", cancellationToken).ConfigureAwait(false) : + await ReadAsync(StartId, count, t, cancellationToken).ConfigureAwait(false); if (rs == null || rs.Count == 0) { // id为>时,消费从未传递给消费者的消息 @@ -357,7 +357,7 @@ await ReadGroupAsync(group, Consumer, count, t, ">", cancellationToken) : // 使用消费组时,如果拿不到消息,则尝试当前消费者之前消费但没有确认的消息 if (!group.IsNullOrEmpty()) { - rs = await ReadGroupAsync(group, Consumer, count, 3_000, "0", cancellationToken); + rs = await ReadGroupAsync(group, Consumer, count, 3_000, "0", cancellationToken).ConfigureAwait(false); if (rs == null || rs.Count == 0) return null; XTrace.WriteLine("[{0}]处理历史:{1}", group, rs.Join(",", e => e.Id)); @@ -569,7 +569,7 @@ public IList Range(String startId, String endId, Int32 count = -1) var rs = count > 0 ? Execute((rc, k) => rc.Execute("XRANGE", Key, startId, endId, "COUNT", count), false) : Execute((rc, k) => rc.Execute("XRANGE", Key, startId, endId), false); - if (rs == null) return new Message[0]; + if (rs == null) return []; return Parse(rs); } @@ -630,7 +630,7 @@ XREAD count 3 streams stream_key 0-0 if (vs[1] is Object[] vs2) return Parse(vs2); } - return new Message[0]; + return []; } /// 异步原始独立消费 @@ -661,13 +661,13 @@ public async Task> ReadAsync(String startId, Int32 count, Int32 b args.Add(Key); args.Add(startId); - var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREAD", args.ToArray(), cancellationToken), true); + var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREAD", args.ToArray(), cancellationToken), true).ConfigureAwait(false); if (rs != null && rs.Length == 1 && rs[0] is Object[] vs && vs.Length == 2) { if (vs[1] is Object[] vs2) return Parse(vs2); } - return new Message[0]; + return []; } private IList Parse(Object[] vs) @@ -721,7 +721,7 @@ public PendingItem[] Pending(String group, String? startId, String? endId, Int32 var rs = count > 0 ? Execute((rc, k) => rc.Execute("XPENDING", Key, group, startId, endId, count), false) : Execute((rc, k) => rc.Execute("XPENDING", Key, group, startId, endId), false); - if (rs == null) return new PendingItem[0]; + if (rs == null) return []; var list = new List(); foreach (var item in rs.Cast()) @@ -808,7 +808,7 @@ public IList ReadGroup(String group, String consumer, Int32 count, Stri if (vs[1] is Object[] vs2) return Parse(vs2); } - return new Message[0]; + return []; } /// 异步消费组消费 @@ -829,14 +829,14 @@ public async Task> ReadGroupAsync(String group, String consumer, if (id.IsNullOrEmpty()) id = ">"; var rs = count > 0 ? - await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREADGROUP", ["GROUP", group, consumer, "BLOCK", block, "COUNT", count, "STREAMS", Key, id], cancellationToken), true) : - await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREADGROUP", ["GROUP", group, consumer, "BLOCK", block, "STREAMS", Key, id], cancellationToken), true); + await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREADGROUP", ["GROUP", group, consumer, "BLOCK", block, "COUNT", count, "STREAMS", Key, id], cancellationToken), true).ConfigureAwait(false) : + await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREADGROUP", ["GROUP", group, consumer, "BLOCK", block, "STREAMS", Key, id], cancellationToken), true).ConfigureAwait(false); if (rs != null && rs.Length == 1 && rs[0] is Object[] vs && vs.Length == 2) { if (vs[1] is Object[] vs2) return Parse(vs2); } - return new Message[0]; + return []; } #endregion @@ -859,7 +859,7 @@ await ExecuteAsync((rc, k) => rc.ExecuteAsync("XREADGROUP", ["GROUP", public GroupInfo[] GetGroups() { var rs = Execute((rc, k) => rc.Execute("XINFO", "GROUPS", Key), false); - if (rs == null) return new GroupInfo[0]; + if (rs == null) return []; var gs = new GroupInfo[rs.Length]; for (var i = 0; i < rs.Length; i++) @@ -922,7 +922,7 @@ public async Task ConsumeAsync(Func onMessa try { // 异步阻塞消费 - var mqMsg = await TakeMessageAsync(timeout, cancellationToken); + var mqMsg = await TakeMessageAsync(timeout, cancellationToken).ConfigureAwait(false); if (mqMsg != null && !mqMsg.Id.IsNullOrEmpty()) { // 埋点 @@ -943,7 +943,7 @@ public async Task ConsumeAsync(Func onMessa var msg = mqMsg.GetBody(); // 处理消息 - if (msg != null) await onMessage(msg, mqMsg, cancellationToken); + if (msg != null) await onMessage(msg, mqMsg, cancellationToken).ConfigureAwait(false); // 确认消息 Acknowledge(mqMsg.Id); @@ -951,7 +951,7 @@ public async Task ConsumeAsync(Func onMessa else { // 没有消息,歇一会 - await Task.Delay(1000, cancellationToken); + await Task.Delay(1000, cancellationToken).ConfigureAwait(false); } } catch (ThreadAbortException) { break; } @@ -983,7 +983,7 @@ public async Task ConsumeAsync(Func onMessa /// 消息处理。如果处理消息时抛出异常,消息将延迟后回到队列 /// 取消令牌 /// - public async Task ConsumeAsync(Action onMessage, CancellationToken cancellationToken = default) => await ConsumeAsync((m, k, t) => + public Task ConsumeAsync(Action onMessage, CancellationToken cancellationToken = default) => ConsumeAsync((m, k, t) => { onMessage(m); return Task.FromResult(0); @@ -1023,7 +1023,7 @@ public async Task ConsumeAsync(Func onM try { // 异步阻塞消费 - var mqMsgs = await TakeMessagesAsync(batchSize, timeout, cancellationToken); + var mqMsgs = await TakeMessagesAsync(batchSize, timeout, cancellationToken).ConfigureAwait(false); if (mqMsgs != null && mqMsgs.Count > 0) { // 埋点 @@ -1044,7 +1044,7 @@ public async Task ConsumeAsync(Func onM var msgs = mqMsgs.Select(e => e.GetBody()!).ToArray(); // 处理消息 - await onMessage(msgs, mqMsgs.ToArray(), cancellationToken); + await onMessage(msgs, mqMsgs.ToArray(), cancellationToken).ConfigureAwait(false); // 确认消息 Acknowledge(mqMsgs.Select(e => e.Id!).ToArray()); @@ -1052,7 +1052,7 @@ public async Task ConsumeAsync(Func onM else { // 没有消息,歇一会 - await Task.Delay(1000, cancellationToken); + await Task.Delay(1000, cancellationToken).ConfigureAwait(false); } } catch (ThreadAbortException) { break; } @@ -1086,10 +1086,7 @@ public async Task ConsumeAsync(Func onM /// 批大小。默认100 /// 取消令牌 /// - public async Task ConsumeAsync(Func onMessage, Int32 batchSize = 100, CancellationToken cancellationToken = default) => await ConsumeAsync(async (m, k, t) => - { - await onMessage(m); - }, batchSize, cancellationToken); + public Task ConsumeAsync(Func onMessage, Int32 batchSize = 100, CancellationToken cancellationToken = default) => ConsumeAsync((m, k, t) => onMessage(m), batchSize, cancellationToken); /// 队列批量消费大循环,批量处理消息后自动确认 /// 批量消费最大的问题是部分消费成功,需要用户根据实际情况妥善处理 @@ -1097,7 +1094,7 @@ public async Task ConsumeAsync(Func onMessage, Int32 batchSize = 100, /// 批大小。默认100 /// 取消令牌 /// - public async Task ConsumeAsync(Action onMessage, Int32 batchSize = 100, CancellationToken cancellationToken = default) => await ConsumeAsync((m, k, t) => + public Task ConsumeAsync(Action onMessage, Int32 batchSize = 100, CancellationToken cancellationToken = default) => ConsumeAsync((m, k, t) => { onMessage(m); return Task.FromResult(0); diff --git a/NewLife.Redis/Redis.cs b/NewLife.Redis/Redis.cs index 8c44a57..bbb6f63 100644 --- a/NewLife.Redis/Redis.cs +++ b/NewLife.Redis/Redis.cs @@ -547,7 +547,7 @@ public virtual async Task ExecuteAsync(String key, Func 0) rds = StartPipeline(); if (rds != null) { - var rs = await func(rds, key); + var rs = await func(rds, key).ConfigureAwait(false); // 命令数足够,自动提交 if (AutoPipeline > 0 && rds.PipelineCommands >= AutoPipeline) @@ -575,7 +575,7 @@ public virtual async Task ExecuteAsync(String key, Func(String key, [MaybeNullWhen(false)] out T T? v1 = default; var rs1 = Execute(key, (rds, k) => { - var rs2 = rds.TryExecute("GET", new[] { k }, out T? v2); + var rs2 = rds.TryExecute("GET", [k], out T? v2); v1 = v2; return rs2; }); diff --git a/NewLife.Redis/RedisBase.cs b/NewLife.Redis/RedisBase.cs index 240342d..25ebce8 100644 --- a/NewLife.Redis/RedisBase.cs +++ b/NewLife.Redis/RedisBase.cs @@ -40,6 +40,6 @@ public RedisBase(Redis redis, String key) /// /// 是否写入操作 /// - public virtual async Task ExecuteAsync(Func> func, Boolean write = false) => await Redis.ExecuteAsync(Key, func, write); + public virtual Task ExecuteAsync(Func> func, Boolean write = false) => Redis.ExecuteAsync(Key, func, write); #endregion } \ No newline at end of file diff --git a/NewLife.Redis/RedisClient.cs b/NewLife.Redis/RedisClient.cs index 1ff6ba2..80fc47f 100644 --- a/NewLife.Redis/RedisClient.cs +++ b/NewLife.Redis/RedisClient.cs @@ -120,7 +120,7 @@ protected override void Dispose(Boolean disposing) var uri = Server; var addrs = uri.GetAddresses(); DefaultSpan.Current?.AppendTag($"addrs={addrs.Join()} port={uri.Port}"); - await tc.ConnectAsync(addrs, uri.Port); + await tc.ConnectAsync(addrs, uri.Port).ConfigureAwait(false); Client = tc; ns = tc.GetStream(); @@ -130,7 +130,7 @@ protected override void Dispose(Boolean disposing) if (sp != SslProtocols.None) { var sslStream = new SslStream(ns, false, OnCertificateValidationCallback); - await sslStream.AuthenticateAsClientAsync(uri.Host ?? uri.Address + "", [], sp, false); + await sslStream.AuthenticateAsClientAsync(uri.Host ?? uri.Address + "", [], sp, false).ConfigureAwait(false); ns = sslStream; } @@ -272,7 +272,7 @@ protected virtual Int32 GetRequest(Memory memory, String cmd, Object?[]? a // 取巧进行异步操作,只要异步读取到第一个字节,后续同步读取 if (cancellationToken == CancellationToken.None) cancellationToken = new CancellationTokenSource(Timeout > 0 ? Timeout : Host.Timeout).Token; - var n = await ms.ReadAsync(buf, 0, 1, cancellationToken); + var n = await ms.ReadAsync(buf, 0, 1, cancellationToken).ConfigureAwait(false); if (n <= 0) return list; header = (Char)buf[0]; @@ -335,14 +335,14 @@ protected virtual Int32 GetRequest(Memory memory, String cmd, Object?[]? a { var isQuit = cmd == "QUIT"; - var ns = await GetStreamAsync(!isQuit); + var ns = await GetStreamAsync(!isQuit).ConfigureAwait(false); if (ns == null) return null; if (!cmd.IsNullOrEmpty()) { // 验证登录 - await CheckLogin(cmd); - await CheckSelect(cmd); + await CheckLogin(cmd).ConfigureAwait(false); + await CheckSelect(cmd).ConfigureAwait(false); // 估算数据包大小,从内存池借出 var total = GetCommandSize(cmd, args); @@ -355,14 +355,14 @@ protected virtual Int32 GetRequest(Memory memory, String cmd, Object?[]? a var max = Host.MaxMessageSize; if (max > 0 && memory.Length >= max) throw new InvalidOperationException($"命令[{cmd}]的数据包大小[{memory.Length}]超过最大限制[{max}],大key会拖累整个Redis实例,可通过Redis.MaxMessageSize调节。"); - if (memory.Length > 0) await ns.WriteAsync(memory, cancellationToken); + if (memory.Length > 0) await ns.WriteAsync(memory, cancellationToken).ConfigureAwait(false); if (total < MAX_POOL_SIZE) Pool.Shared.Return(buffer); - await ns.FlushAsync(cancellationToken); + await ns.FlushAsync(cancellationToken).ConfigureAwait(false); } - var rs = await GetResponseAsync(ns, 1, cancellationToken); + var rs = await GetResponseAsync(ns, 1, cancellationToken).ConfigureAwait(false); if (isQuit) Logined = false; @@ -374,7 +374,7 @@ private async Task CheckLogin(String? cmd) if (Logined) return; if (cmd.EqualIgnoreCase("Auth")) return; - if (!Host.Password.IsNullOrEmpty() && !(await Auth(Host.UserName, Host.Password))) + if (!Host.Password.IsNullOrEmpty() && !(await Auth(Host.UserName, Host.Password).ConfigureAwait(false))) throw new Exception("登录失败!"); Logined = true; @@ -388,7 +388,7 @@ private async Task CheckSelect(String? cmd) if (_selected == db) return; if (cmd.EqualIgnoreCase("Auth", "Select", "Info")) return; - if (db > 0 && (Host is not FullRedis rds || !rds.Mode.EqualIgnoreCase("cluster", "sentinel"))) await Select(db); + if (db > 0 && (Host is not FullRedis rds || !rds.Mode.EqualIgnoreCase("cluster", "sentinel"))) await Select(db).ConfigureAwait(false); _selected = db; } @@ -635,7 +635,7 @@ public virtual Boolean TryExecute(String cmd, Object?[] args, out TResu using var span = cmd.IsNullOrEmpty() ? null : Host.Tracer?.NewSpan($"redis:{Name}:{act}", args); try { - return await ExecuteCommandAsync(cmd, args, cancellationToken); + return await ExecuteCommandAsync(cmd, args, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -648,7 +648,7 @@ public virtual Boolean TryExecute(String cmd, Object?[] args, out TResu /// 命令 /// 参数数组 /// - public virtual async Task ExecuteAsync(String cmd, params Object?[] args) => await ExecuteAsync(cmd, args, CancellationToken.None); + public virtual Task ExecuteAsync(String cmd, params Object?[] args) => ExecuteAsync(cmd, args, CancellationToken.None); /// 异步执行命令。返回基本类型、对象、对象数组 /// 命令 @@ -664,7 +664,7 @@ public virtual Boolean TryExecute(String cmd, Object?[] args, out TResu return default; } - var rs = await ExecuteAsync(cmd, args, cancellationToken); + var rs = await ExecuteAsync(cmd, args, cancellationToken).ConfigureAwait(false); if (rs == null) return default; if (rs is TResult rs2) return rs2; if (TryChangeType(rs, typeof(TResult), out var target)) return (TResult?)target; @@ -677,10 +677,10 @@ public virtual Boolean TryExecute(String cmd, Object?[] args, out TResu /// public virtual async Task ReadMoreAsync(CancellationToken cancellationToken) { - var ns = await GetStreamAsync(false); + var ns = await GetStreamAsync(false).ConfigureAwait(false); if (ns == null) return default; - var rss = await GetResponseAsync(ns, 1, cancellationToken); + var rss = await GetResponseAsync(ns, 1, cancellationToken).ConfigureAwait(false); var rs = rss.FirstOrDefault(); //var rs = ExecuteCommand(null, null, null); @@ -769,15 +769,15 @@ public virtual Boolean TryChangeType(Object value, Type type, out Object? target _ps = null; - var ns = await GetStreamAsync(true); + var ns = await GetStreamAsync(true).ConfigureAwait(false); if (ns == null) return null; using var span = Host.Tracer?.NewSpan($"redis:{Name}:Pipeline", null); try { // 验证登录 - await CheckLogin(null); - await CheckSelect(null); + await CheckLogin(null).ConfigureAwait(false); + await CheckSelect(null).ConfigureAwait(false); // 估算数据包大小,从内存池借出 var total = 0; @@ -803,13 +803,13 @@ public virtual Boolean TryChangeType(Object value, Type type, out Object? target span?.SetTag(cmds); // 整体发出 - if (memory.Length > 0) await ns.WriteAsync(memory); + if (memory.Length > 0) await ns.WriteAsync(memory).ConfigureAwait(false); if (total < MAX_POOL_SIZE) Pool.Shared.Return(buffer); if (!requireResult) return new Object[ps.Count]; // 获取响应 - var list = await GetResponseAsync(ns, ps.Count); + var list = await GetResponseAsync(ns, ps.Count).ConfigureAwait(false); for (var i = 0; i < list.Count; i++) { var rs = list[i]; @@ -836,12 +836,12 @@ private class Command(String name, Object?[] args, Type type) #region 基础功能 /// 心跳 /// - public async Task Ping() => await ExecuteAsync("PING") == "PONG"; + public async Task Ping() => await ExecuteAsync("PING").ConfigureAwait(false) == "PONG"; /// 选择Db /// /// - public async Task Select(Int32 db) => await ExecuteAsync("SELECT", db + "") == "OK"; + public async Task Select(Int32 db) => await ExecuteAsync("SELECT", db + "").ConfigureAwait(false) == "OK"; /// 验证密码 /// @@ -850,8 +850,8 @@ private class Command(String name, Object?[] args, Type type) public async Task Auth(String? username, String password) { var rs = username.IsNullOrEmpty() ? - await ExecuteAsync("AUTH", password) : - await ExecuteAsync("AUTH", username, password); + await ExecuteAsync("AUTH", password).ConfigureAwait(false) : + await ExecuteAsync("AUTH", username, password).ConfigureAwait(false); return rs == "OK"; } diff --git a/NewLife.Redis/RedisSortedSet.cs b/NewLife.Redis/RedisSortedSet.cs index 093aff3..711df4c 100644 --- a/NewLife.Redis/RedisSortedSet.cs +++ b/NewLife.Redis/RedisSortedSet.cs @@ -189,7 +189,7 @@ public IDictionary RangeWithScores(Int32 start, Int32 stop) /// 个数 /// 取消令牌 /// - public async Task RangeByScoreAsync(Double min, Double max, Int32 offset, Int32 count, CancellationToken cancellationToken = default) => await ExecuteAsync((r,k) => r.ExecuteAsync("ZRANGEBYSCORE", new Object[] { Key, min, max, "LIMIT", offset, count }, cancellationToken)); + public Task RangeByScoreAsync(Double min, Double max, Int32 offset, Int32 count, CancellationToken cancellationToken = default) => ExecuteAsync((r, k) => r.ExecuteAsync("ZRANGEBYSCORE", [Key, min, max, "LIMIT", offset, count], cancellationToken)); /// 返回指定分数区间的成员分数对,低分到高分排序 /// 低分,包含 diff --git a/NewLife.Redis/RedisStack.cs b/NewLife.Redis/RedisStack.cs index 049de87..d69fda2 100644 --- a/NewLife.Redis/RedisStack.cs +++ b/NewLife.Redis/RedisStack.cs @@ -92,9 +92,9 @@ public IEnumerable Take(Int32 count = 1) /// public async Task TakeOneAsync(Int32 timeout = 0, CancellationToken cancellationToken = default) { - if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOP", Key), true); + if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync("RPOP", Key), true).ConfigureAwait(false); - var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOP", new Object[] { Key, timeout }, cancellationToken), true); + var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync("BRPOP", [Key, timeout], cancellationToken), true).ConfigureAwait(false); return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T)); } diff --git a/Test/Test.csproj b/Test/Test.csproj index 8361235..1a7f13f 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -16,7 +16,7 @@ - + diff --git a/XUnitTest/RedisTest.cs b/XUnitTest/RedisTest.cs index 605fbc0..193452a 100644 --- a/XUnitTest/RedisTest.cs +++ b/XUnitTest/RedisTest.cs @@ -517,9 +517,9 @@ public void MaxMessageSizeTest() ic.MaxMessageSize = 1028; - var ex = Assert.Throws(() => ic.Set("ttt", Rand.NextString(1029))); - var ex2 = ex.GetTrue() as InvalidOperationException; - Assert.NotNull(ex2); - Assert.Equal("命令[SET]的数据包大小[1060]超过最大限制[1028],大key会拖累整个Redis实例,可通过Redis.MaxMessageSize调节。", ex2.Message); + var ex = Assert.Throws(() => ic.Set("ttt", Rand.NextString(1029))); + //var ex2 = ex.GetTrue() as InvalidOperationException; + Assert.NotNull(ex); + Assert.Equal("命令[SET]的数据包大小[1060]超过最大限制[1028],大key会拖累整个Redis实例,可通过Redis.MaxMessageSize调节。", ex.Message); } } \ No newline at end of file diff --git a/XUnitTest/XUnitTest.csproj b/XUnitTest/XUnitTest.csproj index 2ff451d..f6dcc20 100644 --- a/XUnitTest/XUnitTest.csproj +++ b/XUnitTest/XUnitTest.csproj @@ -10,7 +10,7 @@ - +