Skip to content

Commit

Permalink
Merge pull request #508 from AArnott/main_from_v2.8
Browse files Browse the repository at this point in the history
Merge v2.8 into main
  • Loading branch information
AArnott authored Jul 22, 2022
2 parents bae88b8 + b857951 commit b5d7fe1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
4 changes: 0 additions & 4 deletions src/Nerdbank.Streams/MultiplexingStream.Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,10 +1067,6 @@ public override bool TryRead(out ReadResult readResult)

public override ValueTask CompleteAsync(Exception? exception = null) => this.inner.CompleteAsync(exception);

public override Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default) => this.inner.CopyToAsync(destination, cancellationToken);

public override Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default) => this.inner.CopyToAsync(destination, cancellationToken);

[Obsolete]
public override void OnWriterCompleted(Action<Exception?, object?> callback, object? state) => this.inner.OnWriterCompleted(callback, state);

Expand Down
22 changes: 22 additions & 0 deletions test/Nerdbank.Streams.Tests/MultiplexingStreamV2Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ public async Task Backpressure_ExistingPipe()
await writeTask;
}

[Fact]
public async Task Backpressure_CopyToAsync()
{
long backpressureThreshold = this.mx1.DefaultChannelReceivingWindowSize;
(MultiplexingStream.Channel a, MultiplexingStream.Channel b) = await this.EstablishChannelsAsync("a");

byte[]? hugeChunk = new byte[backpressureThreshold * 2]; // enough to fill the remote and local windows
a.Output.Write(hugeChunk);
Task flushTask = Task.Run(async delegate
{
await a.Output.FlushAsync(this.TimeoutToken);
await a.Output.CompleteAsync();
});

// Now read from the channel and verify it unblocks the writer, using CopyToAsync specifically.
long drainedBytesCount = await this.DrainReaderTillCompletedAsync(b.Input, useCopyToAsync: true);
Assert.Equal(hugeChunk.Length, drainedBytesCount);

await flushTask.WithCancellation(this.TimeoutToken);
await CompleteChannelsAsync(a, b);
}

/// <summary>
/// Regression test for <see href="https://github.com/AArnott/Nerdbank.Streams/issues/253">#253</see>.
/// </summary>
Expand Down
25 changes: 19 additions & 6 deletions test/Nerdbank.Streams.Tests/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,30 @@ public async Task DrainAsync(PipeReader reader, long requiredLength)
}
}

public async Task DrainReaderTillCompletedAsync(PipeReader reader)
public async Task<long> DrainReaderTillCompletedAsync(PipeReader reader, bool useCopyToAsync = false)
{
while (true)
long bytesDrained = 0;
if (useCopyToAsync)
{
ReadResult readResult = await reader.ReadAsync(this.TimeoutToken);
reader.AdvanceTo(readResult.Buffer.End);
if (readResult.IsCompleted)
MemoryStream ms = new();
await reader.CopyToAsync(ms, this.TimeoutToken);
bytesDrained = ms.Length;
}
else
{
while (true)
{
break;
ReadResult readResult = await reader.ReadAsync(this.TimeoutToken);
bytesDrained += readResult.Buffer.Length;
reader.AdvanceTo(readResult.Buffer.End);
if (readResult.IsCompleted)
{
break;
}
}
}

return bytesDrained;
}

internal byte[] GetBuffer(int length)
Expand Down

0 comments on commit b5d7fe1

Please sign in to comment.