Skip to content

Commit

Permalink
Merge pull request #417 from AArnott/fixChannelDisposalOnMxStreamTear…
Browse files Browse the repository at this point in the history
…down

Fix channel disposal on mx stream teardown
  • Loading branch information
AArnott authored Nov 18, 2021
2 parents bca2d43 + 7d1e55f commit aa28991
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/nerdbank-streams/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export class ChannelClass extends Channel {
}
}

public dispose() {
public async dispose() {
if (!this.isDisposed) {
super.dispose();

Expand All @@ -256,7 +256,7 @@ export class ChannelClass extends Channel {
this._duplex.push(null);

this._completion.resolve();
this._multiplexingStream.onChannelDisposed(this);
await this._multiplexingStream.onChannelDisposed(this);
}
}

Expand Down
15 changes: 12 additions & 3 deletions src/nerdbank-streams/src/MultiplexingStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ export abstract class MultiplexingStream implements IDisposableObservable {

/**
* Offers a new, named channel to the remote party so they may accept it with
* [acceptChannelAsync](#acceptChannelAsync).
* {@link acceptChannelAsync}.
* @param name A name for the channel, which must be accepted on the remote end to complete creation.
* It need not be unique, and may be empty but must not be null.
* Any characters are allowed, and max length is determined by the maximum frame payload (based on UTF-8 encoding).
Expand Down Expand Up @@ -561,9 +561,18 @@ export class MultiplexingStreamClass extends MultiplexingStream {
}
}

public onChannelDisposed(channel: ChannelClass) {
public async onChannelDisposed(channel: ChannelClass) {
if (!this._completionSource.isCompleted) {
this.sendFrame(ControlCode.ChannelTerminated, channel.qualifiedId);
try {
await this.sendFrame(ControlCode.ChannelTerminated, channel.qualifiedId);
} catch (err) {
// Swallow exceptions thrown about channel disposal if the whole stream has been taken down.
if (this.isDisposed) {
return;
}

throw err;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/nerdbank-streams/src/MultiplexingStreamOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ChannelOptions } from "./ChannelOptions";
export interface MultiplexingStreamOptions {
/**
* The protocol version to be used.
* @description 1 is the original version. 2 is a protocol breaking change and adds backpressure support.
* @description 1 is the original version. 2 is a protocol breaking change and adds backpressure support. 3 is a protocol breaking change, eliminates the handshake packet and adds seeded channels support.
*/
protocolMajorVersion?: number;

Expand Down
18 changes: 9 additions & 9 deletions src/nerdbank-streams/src/Utilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import { Deferred } from "./Deferred";
import { IDisposableObservable } from "./IDisposableObservable";

export async function writeAsync(stream: NodeJS.WritableStream, chunk: any) {
const deferred = new Deferred<void>();
stream.write(chunk, (err: Error | null | undefined) => {
if (err) {
deferred.reject(err);
} else {
deferred.resolve();
}
return new Promise<void>((resolve, reject) => {
stream.write(chunk, (err: Error | null | undefined) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
return deferred.promise;
}

export function writeSubstream(stream: NodeJS.WritableStream): NodeJS.WritableStream {
Expand All @@ -25,7 +25,7 @@ export function writeSubstream(stream: NodeJS.WritableStream): NodeJS.WritableSt
await writeAsync(stream, chunk);
callback();
} catch (err) {
callback(err);
callback(err as Error);
}
},
final(callback: (error?: Error | null) => void) {
Expand Down

0 comments on commit aa28991

Please sign in to comment.