Skip to content

Commit

Permalink
Merge pull request #170 from MaceWindu/issue/object-disposed-exception
Browse files Browse the repository at this point in the history
Fix WebSocket connection failure handling + multiple async code fixes
  • Loading branch information
Sicos1977 authored Oct 22, 2024
2 parents 5b1fe9e + 6310334 commit aeb4f90
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 439 deletions.
143 changes: 48 additions & 95 deletions ChromiumHtmlToPdfLib/Browser.cs

Large diffs are not rendered by default.

74 changes: 41 additions & 33 deletions ChromiumHtmlToPdfLib/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class Connection : IDisposable, IAsyncDisposable
/// <summary>
/// The url of the websocket
/// </summary>
private readonly string _url;
private readonly Uri _url;

/// <summary>
/// The current message id
Expand All @@ -88,14 +88,14 @@ public class Connection : IDisposable, IAsyncDisposable
private readonly ClientWebSocket _webSocket;

/// <summary>
/// Websocket open timeout in milliseconds
/// Websocket operation timeout in milliseconds
/// </summary>
private readonly int _timeout;

/// <summary>
/// Task to await for <see cref="ReceiveLoop"/> completion.
/// </summary>
private readonly Task<Task> _receiveTask;
private Task<Task>? _receiveTask;

/// <summary>
/// Keeps track is we already disposed our resources
Expand All @@ -110,15 +110,26 @@ public class Connection : IDisposable, IAsyncDisposable
/// <param name="url">The url</param>
/// <param name="timeout">Websocket open timeout in milliseconds</param>
/// <param name="logger"><see cref="Logger"/></param>
internal Connection(string url, int timeout, Logger? logger)
private Connection(string url, int timeout, Logger? logger)
{
_url = url;
_url = new Uri(url);
_timeout = timeout;
_logger = logger;
_logger?.Info("Creating new websocket connection to url '{url}'", url);
_webSocket = new ClientWebSocket();
_receiveLoopCts = new CancellationTokenSource();
OpenWebSocketAsync().GetAwaiter().GetResult();
}

internal static async Task<Connection> Create(string url, int timeout, Logger? logger, CancellationToken cancellationToken)
{
var connection = new Connection(url, timeout, logger);
await connection.StartAsync(cancellationToken).ConfigureAwait(false);
return connection;
}

private async Task StartAsync(CancellationToken cancellationToken)
{
await OpenWebSocketAsync(cancellationToken).ConfigureAwait(false);
_receiveTask = Task.Factory.StartNew(ReceiveLoop, new ReceiveLoopState(_logger, _webSocket, OnMessageReceived, _receiveLoopCts.Token), _receiveLoopCts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
#endregion
Expand Down Expand Up @@ -186,21 +197,16 @@ private static async Task ReceiveLoop(object? stateData)
#endregion

#region OpenWebSocket
private async Task OpenWebSocketAsync()
private async Task OpenWebSocketAsync(CancellationToken cancellationToken)
{
if (_webSocket.State is WebSocketState.Open or WebSocketState.Connecting) return;

_logger?.Info("Opening websocket connection with a timeout of {timeout} milliseconds", _timeout);

try
{
await _webSocket.ConnectAsync(new Uri(_url), new CancellationTokenSource(_timeout).Token).ConfigureAwait(false);
_logger?.Info("Websocket opened");
}
catch (Exception exception)
{
WebSocketOnError(_logger, new ErrorEventArgs(exception));
}
using var timeoutCts = new CancellationTokenSource(_timeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
await _webSocket.ConnectAsync(_url, linkedCts.Token).ConfigureAwait(false);
_logger?.Info("Websocket opened");
}
#endregion

Expand All @@ -210,9 +216,9 @@ private static void WebSocketOnMessageReceived(Logger? logger, Action<string> on
var response = e.Message;

var error = CheckForError(response);

if (!string.IsNullOrEmpty(error))
logger?.Error("{error}", error);
logger?.Error("Chrome returned error: {error}", error);

onMessageReceived(response);
}
Expand All @@ -224,7 +230,7 @@ private void OnMessageReceived(string response)

private static void WebSocketOnError(Logger? logger, ErrorEventArgs e)
{
logger?.Error(e.Exception, "{exception}", ExceptionHelpers.GetInnerException(e.Exception));
logger?.Error(e.Exception, "WebSocket operation failed with {exception}", ExceptionHelpers.GetInnerException(e.Exception));
}

private void WebSocketOnClosed(EventArgs e)
Expand All @@ -240,30 +246,28 @@ private void WebSocketOnClosed(EventArgs e)
/// <param name="message">The message to send</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/></param>
/// <returns>Response given by <see cref="_webSocket" /></returns>
internal async Task<string> SendForResponseAsync(Message message, CancellationToken cancellationToken = default)
internal async Task<string> SendForResponseAsync(Message message, CancellationToken cancellationToken)
{
_messageId += 1;
message.Id = _messageId;

await OpenWebSocketAsync().ConfigureAwait(false);

var tcs = new TaskCompletionSource<string>();

var receivedHandler = new EventHandler<string>((_, data) =>
{
var messageBase = MessageBase.FromJson(data);
if (messageBase.Id == message.Id)
if (messageBase.Id == message.Id)
tcs.SetResult(data);
});

MessageReceived += receivedHandler;

try
{
if (cancellationToken == default)
cancellationToken = new CancellationTokenSource(_timeout).Token;
using var timeoutCts = new CancellationTokenSource(_timeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);

await _webSocket.SendAsync(MessageToBytes(message), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
await _webSocket.SendAsync(MessageToBytes(message), WebSocketMessageType.Text, true, linkedCts.Token).ConfigureAwait(false);

tcs.Task.Wait(cancellationToken);
return cancellationToken.IsCancellationRequested ? string.Empty : tcs.Task.Result;
Expand All @@ -286,17 +290,18 @@ internal async Task<string> SendForResponseAsync(Message message, CancellationTo
/// Sends a message to the <see cref="_webSocket" /> and awaits no response
/// </summary>
/// <param name="message">The message to send</param>
/// <param name="cancellationToken">The message to send</param>
/// <returns></returns>
internal async Task SendAsync(Message message)
internal async Task SendAsync(Message message, CancellationToken cancellationToken)
{
_messageId += 1;
message.Id = _messageId;

await OpenWebSocketAsync().ConfigureAwait(false);

try
{
await _webSocket.SendAsync(MessageToBytes(message), WebSocketMessageType.Text, true, new CancellationTokenSource(_timeout).Token).ConfigureAwait(false);
using var timeoutCts = new CancellationTokenSource(_timeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
await _webSocket.SendAsync(MessageToBytes(message), WebSocketMessageType.Text, true, linkedCts.Token).ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -342,7 +347,10 @@ public async Task InternalDisposeAsync()

_receiveLoopCts.Cancel();

await (await _receiveTask.ConfigureAwait(false)).ConfigureAwait(false);
if (_receiveTask != null)
await (await _receiveTask.ConfigureAwait(false)).ConfigureAwait(false);

_receiveLoopCts.Dispose();

_logger?.Info("Disposing websocket connection to url '{url}'", _url);

Expand All @@ -352,8 +360,8 @@ public async Task InternalDisposeAsync()

try
{
var timeoutToken = new CancellationTokenSource(5000).Token;
await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Done", timeoutToken).ConfigureAwait(false);
using var timeoutCts = new CancellationTokenSource(5000);
await _webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Done", timeoutCts.Token).ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down
Loading

0 comments on commit aeb4f90

Please sign in to comment.