Skip to content

Commit

Permalink
Merge pull request #185 from AArnott/fix184
Browse files Browse the repository at this point in the history
Close open channels from mxstream.dispose()
  • Loading branch information
AArnott authored May 9, 2020
2 parents 25319e0 + 50a8197 commit 5ff298f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/nerdbank-streams/src/MultiplexingStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ export abstract class MultiplexingStream implements IDisposableObservable {
this.disposalTokenSource.cancel();
this._completionSource.resolve();
this.formatter.end();
for (const channelId in this.openChannels) {
const channel = this.openChannels[channelId];

// Acceptance gets rejected when a channel is disposed.
// Avoid a node.js crash or test failure for unobserved channels (e.g. offers for channels from the other party that no one cared to receive on this side).
caught(channel.acceptance);
channel.dispose();
}
}

public on(event: "channelOffered", listener: (args: IChannelOfferEventArgs) => void) {
Expand Down
6 changes: 5 additions & 1 deletion src/nerdbank-streams/src/MultiplexingStreamFormatters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ export class MultiplexingStreamV2Formatter extends MultiplexingStreamFormatter {
}

async readFrameAsync(cancellationToken: CancellationToken): Promise<{ header: FrameHeader; payload: Buffer; } | null> {
const msgpackObject = <any[]>await this.readMessagePackAsync(cancellationToken);
const msgpackObject = <any[] | null>await this.readMessagePackAsync(cancellationToken);
if (msgpackObject === null) {
return null;
}

const header = new FrameHeader(msgpackObject[0], msgpackObject.length > 1 ? msgpackObject[1] : undefined);
return {
header: header,
Expand Down
18 changes: 18 additions & 0 deletions src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,24 @@ import { Channel } from "../Channel";
await channels[1].completion;
});

it("channels complete when mxstream is disposed", async () => {
const channels = await Promise.all([
mx1.offerChannelAsync("test"),
mx2.acceptChannelAsync("test"),
]);
mx1.dispose();

// Verify that both mxstream's complete when one does.
await mx1.completion;
await mx2.completion;

// Verify that the disposed mxstream completes its own channels.
await channels[0].completion;

// Verify that the mxstream that closes because its counterpart closed also completes its own channels.
await channels[1].completion;
});

it("offered channels must have names", async () => {
await expectThrow(mx1.offerChannelAsync(null!));
await expectThrow(mx1.offerChannelAsync(undefined!));
Expand Down

0 comments on commit 5ff298f

Please sign in to comment.