-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bringing in an extension from Aspire core to wait for log stream
- Loading branch information
1 parent
e558156
commit c1d9bae
Showing
3 changed files
with
158 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
155 changes: 155 additions & 0 deletions
155
tests/CommunityToolkit.Aspire.Testing/LoggerNotificationExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
// copied from: https://github.com/dotnet/aspire/blob/2ea981718d2addc835b033fc4a52fae63b2a4a51/tests/Aspire.Hosting.Tests/Utils/LoggerNotificationExtensions.cs | ||
using Aspire.Hosting; | ||
using Aspire.Hosting.ApplicationModel; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Hosting; | ||
using Microsoft.Extensions.Logging; | ||
|
||
namespace CommunityToolkit.Aspire.Testing; | ||
|
||
public static class LoggerNotificationExtensions | ||
{ | ||
/// <summary> | ||
/// Waits for the specified text to be logged. | ||
/// </summary> | ||
/// <param name="app">The <see cref="DistributedApplication" /> instance to watch.</param> | ||
/// <param name="logText">The text to wait for.</param> | ||
/// <param name="resourceName">An optional resource name to filter the logs for.</param> | ||
/// <param name="cancellationToken">The cancellation token.</param> | ||
/// <returns></returns> | ||
public static Task WaitForTextAsync(this DistributedApplication app, string logText, string? resourceName = null, CancellationToken cancellationToken = default) | ||
{ | ||
ArgumentNullException.ThrowIfNull(app); | ||
ArgumentException.ThrowIfNullOrEmpty(logText); | ||
|
||
return WaitForTextAsync(app, (log) => log.Contains(logText), resourceName, cancellationToken); | ||
} | ||
|
||
/// <summary> | ||
/// Waits for the specified text to be logged. | ||
/// </summary> | ||
/// <param name="app">The <see cref="DistributedApplication" /> instance to watch.</param> | ||
/// <param name="logTexts">Any text to wait for.</param> | ||
/// <param name="resourceName">An optional resource name to filter the logs for.</param> | ||
/// <param name="cancellationToken">The cancellation token.</param> | ||
/// <returns></returns> | ||
public static Task WaitForTextAsync(this DistributedApplication app, IEnumerable<string> logTexts, string? resourceName = null, CancellationToken cancellationToken = default) | ||
{ | ||
ArgumentNullException.ThrowIfNull(app); | ||
ArgumentNullException.ThrowIfNull(logTexts); | ||
|
||
return app.WaitForTextAsync((log) => logTexts.Any(x => log.Contains(x)), resourceName, cancellationToken); | ||
} | ||
|
||
/// <summary> | ||
/// Waits for the specified text to be logged. | ||
/// </summary> | ||
/// <param name="app">The <see cref="DistributedApplication" /> instance to watch.</param> | ||
/// <param name="predicate">A predicate checking the text to wait for.</param> | ||
/// <param name="resourceName">An optional resource name to filter the logs for.</param> | ||
/// <param name="cancellationToken">The cancellation token.</param> | ||
/// <returns></returns> | ||
public static Task WaitForTextAsync(this DistributedApplication app, Predicate<string> predicate, string? resourceName = null, CancellationToken cancellationToken = default) | ||
{ | ||
ArgumentNullException.ThrowIfNull(app); | ||
ArgumentNullException.ThrowIfNull(predicate); | ||
|
||
var hostApplicationLifetime = app.Services.GetRequiredService<IHostApplicationLifetime>(); | ||
|
||
var watchCts = CancellationTokenSource.CreateLinkedTokenSource(hostApplicationLifetime.ApplicationStopping, cancellationToken); | ||
|
||
var tcs = new TaskCompletionSource(); | ||
|
||
_ = Task.Run(() => WatchNotifications(app, resourceName, predicate, tcs, watchCts), watchCts.Token); | ||
|
||
return tcs.Task; | ||
} | ||
|
||
/// <summary> | ||
/// Waits for all the specified texts to be logged. | ||
/// </summary> | ||
/// <param name="app">The <see cref="DistributedApplication" /> instance to watch.</param> | ||
/// <param name="logTexts">Any text to wait for.</param> | ||
/// <param name="resourceName">An optional resource name to filter the logs for.</param> | ||
/// <param name="cancellationToken">The cancellation token.</param> | ||
/// <returns></returns> | ||
public static async Task WaitForAllTextAsync(this DistributedApplication app, IEnumerable<string> logTexts, string? resourceName = null, CancellationToken cancellationToken = default) | ||
{ | ||
var table = logTexts.ToList(); | ||
try | ||
{ | ||
await app.WaitForTextAsync((log) => | ||
{ | ||
foreach (var text in table) | ||
{ | ||
if (log.Contains(text)) | ||
{ | ||
table.Remove(text); | ||
break; | ||
} | ||
} | ||
|
||
return table.Count == 0; | ||
}, resourceName, cancellationToken).ConfigureAwait(false); | ||
} | ||
catch (TaskCanceledException te) when (cancellationToken.IsCancellationRequested) | ||
{ | ||
throw new TaskCanceledException($"Task was canceled before these messages were found: '{string.Join("', '", table)}'", te); | ||
} | ||
} | ||
|
||
private static async Task WatchNotifications(DistributedApplication app, string? resourceName, Predicate<string> predicate, TaskCompletionSource tcs, CancellationTokenSource cancellationTokenSource) | ||
{ | ||
var resourceNotificationService = app.Services.GetRequiredService<ResourceNotificationService>(); | ||
var resourceLoggerService = app.Services.GetRequiredService<ResourceLoggerService>(); | ||
var logger = app.Services.GetRequiredService<ILoggerFactory>().CreateLogger(nameof(LoggerNotificationExtensions)); | ||
|
||
var loggingResourceIds = new HashSet<string>(); | ||
var logWatchTasks = new List<Task>(); | ||
|
||
try | ||
{ | ||
await foreach (var resourceEvent in resourceNotificationService.WatchAsync(cancellationTokenSource.Token).ConfigureAwait(false)) | ||
{ | ||
if (resourceName != null && !string.Equals(resourceEvent.Resource.Name, resourceName, StringComparison.OrdinalIgnoreCase)) | ||
{ | ||
continue; | ||
} | ||
|
||
var resourceId = resourceEvent.ResourceId; | ||
|
||
if (loggingResourceIds.Add(resourceId)) | ||
{ | ||
// Start watching the logs for this resource ID | ||
logWatchTasks.Add(WatchResourceLogs(tcs, resourceId, predicate, resourceLoggerService, cancellationTokenSource)); | ||
} | ||
} | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
// Expected if the application stops prematurely or the text was detected. | ||
tcs.TrySetCanceled(); | ||
} | ||
catch (Exception ex) | ||
{ | ||
logger.LogError(ex, "An error occurred while watching for resource notifications."); | ||
tcs.TrySetException(ex); | ||
} | ||
} | ||
|
||
private static async Task WatchResourceLogs(TaskCompletionSource tcs, string resourceId, Predicate<string> predicate, ResourceLoggerService resourceLoggerService, CancellationTokenSource cancellationTokenSource) | ||
{ | ||
await foreach (var logEvent in resourceLoggerService.WatchAsync(resourceId).WithCancellation(cancellationTokenSource.Token).ConfigureAwait(false)) | ||
{ | ||
foreach (var line in logEvent) | ||
{ | ||
if (predicate(line.Content)) | ||
{ | ||
tcs.SetResult(); | ||
cancellationTokenSource.Cancel(); | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
} |