Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the v5 streaming protocol #2061

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions src/web-socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import stream from 'node:stream';
import { V1Status } from './api';
import { KubeConfig } from './config';

const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
const protocols = [
'v5.channel.k8s.io',
'v4.channel.k8s.io',
'v3.channel.k8s.io',
'v2.channel.k8s.io',
'channel.k8s.io',
];

export interface WebSocketInterface {
connect(
Expand All @@ -14,12 +20,37 @@ export interface WebSocketInterface {
): Promise<WebSocket.WebSocket>;
}

export interface StreamInterface {
stdin: stream.Readable;
stdout: stream.Writable;
stderr: stream.Writable;
}

export class WebSocketHandler implements WebSocketInterface {
public static readonly StdinStream: number = 0;
public static readonly StdoutStream: number = 1;
public static readonly StderrStream: number = 2;
public static readonly StatusStream: number = 3;
public static readonly ResizeStream: number = 4;
public static readonly CloseStream: number = 255;

public static supportsClose(protocol: string): boolean {
return protocol === 'v5.channel.k8s.io';
}

public static closeStream(streamNum: number, streams: StreamInterface): void {
switch (streamNum) {
case WebSocketHandler.StdinStream:
streams.stdin.pause();
break;
case WebSocketHandler.StdoutStream:
streams.stdout.end();
break;
case WebSocketHandler.StderrStream:
streams.stderr.end();
break;
}
}

public static handleStandardStreams(
streamNum: number,
Expand All @@ -36,6 +67,7 @@ export class WebSocketHandler implements WebSocketInterface {
stderr.write(buff);
} else if (streamNum === WebSocketHandler.StatusStream) {
// stream closing.
// Hacky, change tests to use the stream interface
if (stdout && stdout !== process.stdout) {
stdout.end();
}
Expand All @@ -59,6 +91,12 @@ export class WebSocketHandler implements WebSocketInterface {
});

stdin.on('end', () => {
if (WebSocketHandler.supportsClose(ws.protocol)) {
const buff = Buffer.alloc(2);
buff.writeUint8(this.CloseStream, 0);
buff.writeUint8(this.StdinStream, 1);
ws.send(buff);
}
ws.close();
});
// Keep the stream open
Expand Down Expand Up @@ -131,7 +169,16 @@ export class WebSocketHandler implements WebSocketInterface {
// factory is really just for test injection
public constructor(
readonly config: KubeConfig,
readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket,
readonly socketFactory?: (
uri: string,
protocols: string[],
opts: WebSocket.ClientOptions,
) => WebSocket.WebSocket,
readonly streams: StreamInterface = {
stdin: process.stdin,
stdout: process.stdout,
stderr: process.stderr,
},
) {}

/**
Expand Down Expand Up @@ -163,7 +210,7 @@ export class WebSocketHandler implements WebSocketInterface {

return await new Promise<WebSocket.WebSocket>((resolve, reject) => {
const client = this.socketFactory
? this.socketFactory(uri, opts)
? this.socketFactory(uri, protocols, opts)
: new WebSocket(uri, protocols, opts);
let resolved = false;

Expand All @@ -181,11 +228,17 @@ export class WebSocketHandler implements WebSocketInterface {
client.onmessage = ({ data }: { data: WebSocket.Data }) => {
// TODO: support ArrayBuffer and Buffer[] data types?
if (typeof data === 'string') {
if (data.charCodeAt(0) === WebSocketHandler.CloseStream) {
WebSocketHandler.closeStream(data.charCodeAt(1), this.streams);
}
if (textHandler && !textHandler(data)) {
client.close();
}
} else if (data instanceof Buffer) {
const streamNum = data.readInt8(0);
const streamNum = data.readUint8(0);
if (streamNum === WebSocketHandler.CloseStream) {
WebSocketHandler.closeStream(data.readInt8(1), this.streams);
}
if (binaryHandler && !binaryHandler(streamNum, data.slice(1))) {
client.close();
}
Expand Down
111 changes: 106 additions & 5 deletions src/web-socket-handler_test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Readable } from 'node:stream';
import { Readable, Writable } from 'node:stream';
import { setImmediate as setImmediatePromise } from 'node:timers/promises';
import { expect } from 'chai';
import WebSocket from 'isomorphic-ws';
import { WritableStreamBuffer } from 'stream-buffers';
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';

import { V1Status } from './api';
import { KubeConfig } from './config';
Expand Down Expand Up @@ -117,7 +117,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -163,7 +163,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -233,7 +233,7 @@ describe('WebSocket', () => {

const handler = new WebSocketHandler(
kc,
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
Expand Down Expand Up @@ -314,6 +314,107 @@ describe('WebSocket', () => {
});
});

describe('V5 protocol support', () => {
it('should handle close', async () => {
const kc = new KubeConfig();
const host = 'foo.company.com';
const server = `https://${host}`;
kc.clusters = [
{
name: 'cluster',
server,
} as Cluster,
] as Cluster[];
kc.contexts = [
{
cluster: 'cluster',
user: 'user',
} as Context,
] as Context[];
kc.users = [
{
name: 'user',
} as User,
];

const mockWs = {
protocol: 'v5.channel.k8s.io',
} as WebSocket.WebSocket;
let uriOut = '';
let endCalled = false;
const handler = new WebSocketHandler(
kc,
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
uriOut = uri;
return mockWs as WebSocket.WebSocket;
},
{
stdin: process.stdin,
stderr: process.stderr,
stdout: {
end: () => {
endCalled = true;
},
} as Writable,
},
);
const path = '/some/path';

const promise = handler.connect(path, null, null);
await setImmediatePromise();

expect(uriOut).to.equal(`wss://${host}${path}`);

const event = {
target: mockWs,
type: 'open',
};
mockWs.onopen!(event);
const errEvt = {
error: {},
message: 'some message',
type: 'some type',
target: mockWs,
};
const closeBuff = Buffer.alloc(2);
closeBuff.writeUint8(255, 0);
closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1);

mockWs.onmessage!({
data: closeBuff,
type: 'type',
target: mockWs,
});
await promise;
expect(endCalled).to.be.true;
});
it('should handle closing stdin < v4 protocol', () => {
const ws = {
// send is not defined, so this will throw if we try to send the close message.
close: () => {},
} as WebSocket;
const stdinStream = new ReadableStreamBuffer();
WebSocketHandler.handleStandardInput(ws, stdinStream);
stdinStream.emit('end');
});
it('should handle closing stdin v5 protocol', () => {
let sent: Buffer | null = null;
const ws = {
protocol: 'v5.channel.k8s.io',
send: (data) => {
sent = data;
},
close: () => {},
} as WebSocket;
const stdinStream = new ReadableStreamBuffer();
WebSocketHandler.handleStandardInput(ws, stdinStream);
stdinStream.emit('end');
expect(sent).to.not.be.null;
expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal
expect(sent!.readUInt8(1)).to.equal(0); // Stdin stream is #0
});
});

describe('Restartable Handle Standard Input', () => {
it('should throw on negative retry', () => {
const p = new Promise<WebSocket.WebSocket>(() => {});
Expand Down
Loading