Skip to content

Commit

Permalink
Merge pull request #499 from AArnott/addMoreTracing
Browse files Browse the repository at this point in the history
Fix MultiplexingStream failure when one Channel fails to pass data to reader
  • Loading branch information
AArnott authored Jul 2, 2022
2 parents c462ce7 + 6da2056 commit 0c14546
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 44 deletions.
26 changes: 26 additions & 0 deletions src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,32 @@ await Assert.ThrowsAsync<NotSupportedException>(() => Task.WhenAll(
MultiplexingStream.CreateAsync(pair.Item2, options, this.TimeoutToken)));
}

/// <summary>
/// Verifies that faulting a <see cref="PipeReader"/> that receives channel data will not adversely impact other channels on the <see cref="MultiplexingStream"/>.
/// </summary>
[Fact]
public async Task FaultingChannelReader()
{
Task<MultiplexingStream.Channel> baselineOffer = this.mx1.OfferChannelAsync("baseline", cancellationToken: this.TimeoutToken);
Task<MultiplexingStream.Channel> sketchyOffer = this.mx1.OfferChannelAsync("sketchy", cancellationToken: this.TimeoutToken);

MultiplexingStream.Channel mx2Baseline = await this.mx2.AcceptChannelAsync("baseline", this.TimeoutToken);
MultiplexingStream.Channel mx2Sketchy = await this.mx2.AcceptChannelAsync("sketchy", this.TimeoutToken);

MultiplexingStream.Channel mx1Baseline = await baselineOffer;
MultiplexingStream.Channel mx1Sketchy = await sketchyOffer;

// Now fault one reader
await mx2Sketchy.Input.CompleteAsync(new InvalidOperationException("Sketchy reader fail."));

// Transmit data on this channel from the other side to force the mxstream to notice that we faulted a reader.
await mx1Sketchy.Output.WriteAsync(new byte[3], this.TimeoutToken);

// Verify communication over the good channel.
await mx1Baseline.Output.WriteAsync(new byte[3], this.TimeoutToken);
await this.ReadAtLeastAsync(mx2Baseline.Input, 3);
}

protected static Task CompleteChannelsAsync(params MultiplexingStream.Channel[] channels)
{
foreach (var channel in channels)
Expand Down
88 changes: 55 additions & 33 deletions src/Nerdbank.Streams/MultiplexingStream.Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -398,51 +398,73 @@ internal async Task OnChannelTerminatedAsync()
}
}

internal async ValueTask OnContentAsync(FrameHeader header, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
/// <summary>
/// Receives content from the <see cref="MultiplexingStream"/> that is bound for this channel.
/// </summary>
/// <param name="payload">The content for this channel.</param>
/// <param name="cancellationToken">A token that is canceled if the overall <see cref="MultiplexingStream"/> gets disposed of.</param>
/// <returns>
/// A task that completes when content has been accepted.
/// All multiplexing stream reads are held up till this completes, so this should only pause in exceptional circumstances.
/// Faulting the returned <see cref="ValueTask"/> will fault the whole multiplexing stream.
/// </returns>
internal async ValueTask OnContentAsync(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
PipeWriter writer = this.GetReceivedMessagePipeWriter();
foreach (var segment in payload)
try
{
try
PipeWriter writer = this.GetReceivedMessagePipeWriter();
foreach (var segment in payload)
{
var memory = writer.GetMemory(segment.Length);
segment.CopyTo(memory);
writer.Advance(segment.Length);
try
{
var memory = writer.GetMemory(segment.Length);
segment.CopyTo(memory);
writer.Advance(segment.Length);
}
catch (InvalidOperationException)
{
// Someone completed the writer.
return;
}
}
catch (InvalidOperationException)

if (!payload.IsEmpty && this.MultiplexingStream.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
{
// Someone completed the writer.
return;
this.MultiplexingStream.TraceSource.TraceData(TraceEventType.Verbose, (int)TraceEventId.FrameReceivedPayload, payload);
}
}

if (!payload.IsEmpty && this.MultiplexingStream.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
{
this.MultiplexingStream.TraceSource.TraceData(TraceEventType.Verbose, (int)TraceEventId.FrameReceivedPayload, payload);
}
ValueTask<FlushResult> flushResult = writer.FlushAsync(cancellationToken);
if (this.BackpressureSupportEnabled)
{
if (!flushResult.IsCompleted)
{
// The incoming data has overrun the size of the write buffer inside the PipeWriter.
// This should never happen if we created the Pipe because we specify the Pause threshold to exceed the window size.
// If it happens, it should be because someone specified an ExistingPipe with an inappropriately sized buffer in its PipeWriter.
Assumes.True(this.existingPipeGiven == true); // Make sure this isn't an internal error
this.Fault(new InvalidOperationException(Strings.ExistingPipeOutputHasPauseThresholdSetTooLow));
}
}
else
{
await flushResult.ConfigureAwait(false);
}

ValueTask<FlushResult> flushResult = writer.FlushAsync(cancellationToken);
if (this.BackpressureSupportEnabled)
{
if (!flushResult.IsCompleted)
if (flushResult.IsCanceled)
{
// The incoming data has overrun the size of the write buffer inside the PipeWriter.
// This should never happen if we created the Pipe because we specify the Pause threshold to exceed the window size.
// If it happens, it should be because someone specified an ExistingPipe with an inappropriately sized buffer in its PipeWriter.
Assumes.True(this.existingPipeGiven == true); // Make sure this isn't an internal error
this.Fault(new InvalidOperationException(Strings.ExistingPipeOutputHasPauseThresholdSetTooLow));
// This happens when the channel is disposed (while or before flushing).
Assumes.True(this.IsDisposed);
await writer.CompleteAsync().ConfigureAwait(false);
}
}
else
catch (ObjectDisposedException) when (this.IsDisposed)
{
await flushResult.ConfigureAwait(false);
// Just eat these.
}

if (flushResult.IsCanceled)
catch (Exception ex)
{
// This happens when the channel is disposed (while or before flushing).
Assumes.True(this.IsDisposed);
await writer.CompleteAsync().ConfigureAwait(false);
// Contain the damage for any other failure so we don't fault the entire multiplexing stream.
this.Fault(ex);
}
}

Expand Down Expand Up @@ -771,9 +793,9 @@ private async Task ProcessOutboundTransmissionsAsync()

if (isCompleted)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceEvent(TraceEventType.Verbose, 0, "Transmission terminated because the writer completed.");
this.TraceSource.TraceEvent(TraceEventType.Information, 0, "Transmission terminated because the writer completed.");
}

break;
Expand Down
50 changes: 39 additions & 11 deletions src/Nerdbank.Streams/MultiplexingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ private enum TraceEventId
OfferChannelCanceled,
FrameSent,
FrameReceived,
FrameNotReceived,
FrameSentPayload,
FrameReceivedPayload,

Expand Down Expand Up @@ -787,6 +788,11 @@ private async Task ReadStreamAsync()
var frame = await this.formatter.ReadFrameAsync(this.DisposalToken).ConfigureAwait(false);
if (!frame.HasValue)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.FrameNotReceived, "Clean end of stream.");
}

break;
}

Expand Down Expand Up @@ -825,9 +831,37 @@ private async Task ReadStreamAsync()
catch (EndOfStreamException)
{
// When we unexpectedly hit an end of stream, just close up shop.
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Error))
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.FatalError, "End of stream in the middle of a frame.");
}
}
catch (Exception ex)
{
if (ex is OperationCanceledException && this.DisposalToken.IsCancellationRequested)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEventId.ChannelDisposed, $"{nameof(MultiplexingStream)}.{nameof(this.ReadStreamAsync)} shutting down due to cancellation and disposal.");
}
}
else
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Error))
{
this.TraceSource.TraceEvent(TraceEventType.Error, (int)TraceEventId.FatalError, $"Exception thrown in {nameof(MultiplexingStream)}.{nameof(this.ReadStreamAsync)} leading to stream shutdown: {{0}}", ex);
}
}

throw;
}
finally
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceEvent(TraceEventType.Information, 0, $"{nameof(MultiplexingStream)}.{nameof(this.ReadStreamAsync)} is shutting down all channels before exiting.");
}

lock (this.syncObject)
{
foreach (var entry in this.openChannels)
Expand Down Expand Up @@ -896,20 +930,14 @@ private async ValueTask OnContentAsync(FrameHeader header, ReadOnlySequence<byte
channel = this.openChannels[channelId];
}

try
if (channelId.Source == ChannelSource.Local && !channel.IsAccepted)
{
if (channelId.Source == ChannelSource.Local && !channel.IsAccepted)
{
throw new MultiplexingProtocolException($"Remote party sent content for channel {channelId} before accepting it.");
}

if (!payload.IsEmpty)
{
await channel.OnContentAsync(header, payload, cancellationToken).ConfigureAwait(false);
}
throw new MultiplexingProtocolException($"Remote party sent content for channel {channelId} before accepting it.");
}
catch (ObjectDisposedException) when (channel.IsDisposed)

if (!payload.IsEmpty)
{
await channel.OnContentAsync(payload, cancellationToken).ConfigureAwait(false);
}
}

Expand Down

0 comments on commit 0c14546

Please sign in to comment.