Skip to content

Commit

Permalink
为了提升性能,在同步调用异步时规避卡UI上下文,所有await状态机都设置ConfigureAwait(false),开启CA2007并视…
Browse files Browse the repository at this point in the history
…为编译错误。减少不必要的await状态机
  • Loading branch information
nnhy committed Nov 30, 2024
1 parent 35e7d95 commit 2b0f3b1
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 111 deletions.
47 changes: 28 additions & 19 deletions NewLife.Redis.Extensions/NewLife.Redis.Extensions.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netcoreapp3.1;net5.0;net6.0;net7.0;net8.0;netstandard2.0;netstandard2.1</TargetFrameworks>
<TargetFrameworks>netcoreapp3.1;net5.0;net6.0;net7.0;net8.0;net9.0;netstandard2.0;netstandard2.1</TargetFrameworks>
<AssemblyTitle>新生命Redis扩展</AssemblyTitle>
<Description>Redis扩展库,便于注入Redis,支持分布式缓存IDistributedCache和数据保护IDataProtection</Description>
<Company>新生命开发团队</Company>
Expand All @@ -19,6 +19,9 @@
<LangVersion>latest</LangVersion>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>..\Doc\newlife.snk</AssemblyOriginatorKeyFile>
<NoWarn>1701;1702;NU5104;NU1505;NETSDK1138;CS7035</NoWarn>
<AnalysisLevel>latest</AnalysisLevel>
<WarningsAsErrors>CA2007</WarningsAsErrors>
</PropertyGroup>

<PropertyGroup>
Expand All @@ -45,30 +48,36 @@
</PackageReference>
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='netcoreapp3.1'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net5.0'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="5.0.8" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net6.0'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<ItemGroup Condition="'$(TargetFramework)'=='net6.0' Or '$(TargetFramework)'=='netstandard2.0' Or '$(TargetFramework)'=='netstandard2.1'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net7.0' Or '$(TargetFramework)'=='netstandard2.0' Or '$(TargetFramework)'=='netstandard2.1'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<ItemGroup Condition="'$(TargetFramework)'=='net7.0'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net9.0'">
<PackageReference Include="Microsoft.AspNetCore.DataProtection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
Expand Down
6 changes: 3 additions & 3 deletions NewLife.Redis/FullRedis.cs
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ public override async Task<T> ExecuteAsync<T>(String key, Func<RedisClient, Stri
key = GetKey(key);

// 如果不支持集群,直接返回
if (Cluster == null) return await base.ExecuteAsync<T>(key, func, write);
if (Cluster == null) return await base.ExecuteAsync<T>(key, func, write).ConfigureAwait(false);

var node = Cluster.SelectNode(key, write);
//?? throw new XException($"集群[{Name}]没有可用节点");
if (node == null) return await base.ExecuteAsync<T>(key, func, write);
if (node == null) return await base.ExecuteAsync<T>(key, func, write).ConfigureAwait(false);

// 统计性能
var sw = Counter?.StartCount();
Expand All @@ -420,7 +420,7 @@ public override async Task<T> ExecuteAsync<T>(String key, Func<RedisClient, Stri
try
{
client.Reset();
var rs = await func(client, key);
var rs = await func(client, key).ConfigureAwait(false);

return rs;
}
Expand Down
5 changes: 4 additions & 1 deletion NewLife.Redis/NewLife.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
<LangVersion>latest</LangVersion>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>..\Doc\newlife.snk</AssemblyOriginatorKeyFile>
<NoWarn>1701;1702;NU5104;NU1505;NETSDK1138;CS7035</NoWarn>
<AnalysisLevel>latest</AnalysisLevel>
<WarningsAsErrors>CA2007</WarningsAsErrors>
</PropertyGroup>

<PropertyGroup>
Expand Down Expand Up @@ -53,7 +56,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="11.0.2024.1125-beta1216" />
<PackageReference Include="NewLife.Core" Version="11.0.2024.1130-beta0449" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions NewLife.Redis/PubSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ public async Task SubscribeAsync(Action<String, String> onMessage, CancellationT
client.Reset();

var channels = Key.Split(",", ";").Cast<Object>().ToArray();
await client.ExecuteAsync<String[]>("SUBSCRIBE", channels);
await client.ExecuteAsync<String[]>("SUBSCRIBE", channels, cancellationToken).ConfigureAwait(false);

while (!cancellationToken.IsCancellationRequested)
{
var source = new CancellationTokenSource(Redis.Timeout);
var source2 = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, source.Token);

//var rs = await client.ExecuteAsync<String[]>(null, new Object[] { new Object() }, source2.Token);
var rs = await client.ReadMoreAsync<String[]>(source2.Token);
var rs = await client.ReadMoreAsync<String[]>(source2.Token).ConfigureAwait(false);
if (rs != null && rs.Length == 3 && rs[0] == "message") onMessage(rs[1], rs[2]);
}

await client.ExecuteAsync<String[]>("SUBSCRIBE", channels);
await client.ExecuteAsync<String[]>("SUBSCRIBE", channels, cancellationToken).ConfigureAwait(false);

Redis.Pool.Return(client);
}
Expand Down
2 changes: 1 addition & 1 deletion NewLife.Redis/Queues/MultipleConsumerGroupsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private async Task getSubscribe(String subscribeAppName)
while (_Cts != null && !_Cts.IsCancellationRequested)
{

var msg = await _Queue.TakeMessageAsync(10);
var msg = await _Queue.TakeMessageAsync(10).ConfigureAwait(false);
if (msg != null && !msg.Id.IsNullOrEmpty())
{
try
Expand Down
14 changes: 7 additions & 7 deletions NewLife.Redis/Queues/QueueExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static async Task ConsumeAsync<T>(this RedisReliableQueue<String> queue,
try
{
// 异步阻塞消费
mqMsg = await queue.TakeOneAsync(timeout, cancellationToken);
mqMsg = await queue.TakeOneAsync(timeout, cancellationToken).ConfigureAwait(false);
if (mqMsg != null)
{
// 埋点
Expand All @@ -197,15 +197,15 @@ public static async Task ConsumeAsync<T>(this RedisReliableQueue<String> queue,
}

// 处理消息
if (msg != null) await onMessage(msg, mqMsg, cancellationToken);
if (msg != null) await onMessage(msg, mqMsg, cancellationToken).ConfigureAwait(false);

// 确认消息
queue.Acknowledge(mqMsg);
}
else
{
// 没有消息,歇一会
await Task.Delay(1000, cancellationToken);
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
}
}
catch (ThreadAbortException) { break; }
Expand Down Expand Up @@ -249,9 +249,9 @@ public static async Task ConsumeAsync<T>(this RedisReliableQueue<String> queue,
/// <param name="log">日志对象</param>
/// <param name="idField">消息标识字段名,用于处理错误重试</param>
/// <returns></returns>
public static async Task ConsumeAsync<T>(this RedisReliableQueue<String> queue, Action<T> onMessage, CancellationToken cancellationToken = default, ILog? log = null, String? idField = null)
public static Task ConsumeAsync<T>(this RedisReliableQueue<String> queue, Action<T> onMessage, CancellationToken cancellationToken = default, ILog? log = null, String? idField = null)
{
await queue.ConsumeAsync<T>((m, k, t) => { onMessage(m); return Task.FromResult(0); }, cancellationToken, log, idField);
return queue.ConsumeAsync<T>((m, k, t) => { onMessage(m); return Task.FromResult(0); }, cancellationToken, log, idField);
}

/// <summary>队列消费大循环,处理消息后自动确认</summary>
Expand Down Expand Up @@ -297,7 +297,7 @@ public static async Task ConsumeAsync<T>(this RedisReliableQueue<String> queue,
try
{
// 异步阻塞消费
mqMsg = await queue.TakeOneAsync(timeout, cancellationToken);
mqMsg = await queue.TakeOneAsync(timeout, cancellationToken).ConfigureAwait(false);
if (mqMsg != null)
{
// 埋点
Expand All @@ -312,7 +312,7 @@ public static async Task ConsumeAsync<T>(this RedisReliableQueue<String> queue,
}
else
// 没有消息,歇一会
await Task.Delay(1000, cancellationToken);
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
}
catch (ThreadAbortException) { break; }
catch (ThreadInterruptedException) { break; }
Expand Down
10 changes: 6 additions & 4 deletions NewLife.Redis/Queues/RedisDelayQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ public Int32 Add(params T[] values)
while (!cancellationToken.IsCancellationRequested)
{
var score = DateTime.UtcNow.ToInt();
var rs = await _sort.RangeByScoreAsync(0, score, 0, 1, cancellationToken);
var rs = await _sort.RangeByScoreAsync(0, score, 0, 1, cancellationToken).ConfigureAwait(false);
if (rs != null && rs.Length > 0 && TryPop(rs[0])) return rs[0];

// 是否需要等待
if (timeout <= 0) break;

await Task.Delay(1000, cancellationToken);
await Task.Delay(1000, cancellationToken).ConfigureAwait(false);
timeout--;
}

Expand Down Expand Up @@ -231,7 +231,7 @@ public async Task TransferAsync(IProducerConsumer<T> queue, Action<Exception>? o
{
// 异步阻塞消费
var score = DateTime.UtcNow.ToInt();
var msgs = await _sort.RangeByScoreAsync(0, score, 0, 10, cancellationToken);
var msgs = await _sort.RangeByScoreAsync(0, score, 0, 10, cancellationToken).ConfigureAwait(false);
if (msgs != null && msgs.Length > 0)
{
// 删除消息后直接进入目标队列,无需进入Ack
Expand All @@ -248,8 +248,10 @@ public async Task TransferAsync(IProducerConsumer<T> queue, Action<Exception>? o
if (list.Count > 0) queue.Add(list.ToArray());
}
else
{
// 没有消息,歇一会
await Task.Delay(TransferInterval * 1000, cancellationToken);
await Task.Delay(TransferInterval * 1000, cancellationToken).ConfigureAwait(false);
}
}
catch (ThreadAbortException) { break; }
catch (ThreadInterruptedException) { break; }
Expand Down
4 changes: 2 additions & 2 deletions NewLife.Redis/Queues/RedisQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ public Int32 Add(params T[] values)
/// <returns></returns>
public async Task<T?> TakeOneAsync(Int32 timeout = 0, CancellationToken cancellationToken = default)
{
if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("RPOP", Key), true);
if (timeout < 0) return await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("RPOP", Key), true).ConfigureAwait(false);

if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000;

var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync<IPacket[]>("BRPOP", new Object[] { Key, timeout }, cancellationToken), true);
var rs = await ExecuteAsync((rc, k) => rc.ExecuteAsync<IPacket[]>("BRPOP", [Key, timeout], cancellationToken), true).ConfigureAwait(false);
return rs == null || rs.Length < 2 ? default : (T?)Redis.Encoder.Decode(rs[1], typeof(T));
}

Expand Down
10 changes: 5 additions & 5 deletions NewLife.Redis/Queues/RedisReliableQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ public Int32 Add(params T[] values)
if (timeout > 0 && Redis.Timeout < (timeout + 1) * 1000) Redis.Timeout = (timeout + 1) * 1000;

var rs = timeout < 0 ?
await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("RPOPLPUSH", new Object[] { Key, AckKey }, cancellationToken), true) :
await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("BRPOPLPUSH", new Object[] { Key, AckKey, timeout }, cancellationToken), true);
await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("RPOPLPUSH", [Key, AckKey], cancellationToken), true).ConfigureAwait(false) :
await ExecuteAsync((rc, k) => rc.ExecuteAsync<T>("BRPOPLPUSH", [Key, AckKey, timeout], cancellationToken), true).ConfigureAwait(false);

if (rs != null) _Status.Consumes++;

Expand Down Expand Up @@ -332,8 +332,8 @@ public async Task<TResult> ConsumeAsync<TResult>(Func<T, Task<TResult>> func, In

// 取出消息键
var msgId = timeout < 0 ?
await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("RPOPLPUSH", Key, AckKey), true) :
await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("BRPOPLPUSH", Key, AckKey, timeout), true);
await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("RPOPLPUSH", Key, AckKey), true).ConfigureAwait(false) :
await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("BRPOPLPUSH", Key, AckKey, timeout), true).ConfigureAwait(false);
if (msgId.IsNullOrEmpty()) return default;

_Status.Consumes++;
Expand All @@ -347,7 +347,7 @@ await ExecuteAsync((rc, k) => rc.ExecuteAsync<String>("RPOPLPUSH", Key, AckKey),
}

// 处理消息。如果消息已被删除,此时调用func将受到空引用
var rs = await func(messge);
var rs = await func(messge).ConfigureAwait(false);

// 确认并删除消息
Redis.Remove(msgId);
Expand Down
Loading

0 comments on commit 2b0f3b1

Please sign in to comment.