Skip to content

Commit

Permalink
fix: stdio transport not sending newline, add test
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 committed Sep 25, 2023
1 parent 2f1a550 commit 46b2cdc
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 12 deletions.
2 changes: 1 addition & 1 deletion __tests__/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Type } from '@sinclair/typebox';
import { ServiceBuilder, serializeService } from '../router/builder';
import { reply } from '../transport/message';
import { afterAll, beforeAll, describe, expect, test } from 'vitest';
import { createWebSocketServer, createWsTransports, onServerReady } from '../transport/ws.util';
import { createWebSocketServer, createWsTransports, onServerReady } from '../transport/util';
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { asClientRpc, asClientStream } from '../router/server.util';
Expand Down
4 changes: 2 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export type {
TransportMessageAck,
} from './transport/message';

export { StdioTransport } from './transport/stdio';
export { StreamTransport } from './transport/stream';
export { WebSocketTransport } from './transport/ws';
export {
createWebSocketServer,
Expand All @@ -44,4 +44,4 @@ export {
waitForMessage,
waitForSocketReady,
createWebSocketClient,
} from './transport/ws.util';
} from './transport/util';
2 changes: 1 addition & 1 deletion router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { Pushable } from 'it-pushable';
import { Server } from './server';
import { OpaqueTransportMessage, msg } from '../transport/message';
import { Static } from '@sinclair/typebox';
import { waitForMessage } from '../transport/ws.util';
import { waitForMessage } from '../transport/util';

type ServiceClient<Router extends Service> = {
[ProcName in keyof Router['procedures']]: ProcType<Router, ProcName> extends 'rpc'
Expand Down
30 changes: 30 additions & 0 deletions transport/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { describe, test, expect } from 'vitest';
import stream from 'node:stream';
import { StreamTransport } from './stream';
import { waitForMessage } from './util';

describe('sending and receiving across node streams works', () => {
test('basic send/receive', async () => {
const clientToServer = new stream.PassThrough();
const serverToClient = new stream.PassThrough();
const serverTransport = new StreamTransport('SERVER', clientToServer, serverToClient);
const clientTransport = new StreamTransport('client', serverToClient, clientToServer);

const msg = {
msg: 'cool',
test: 123,
};

const p = waitForMessage(serverTransport)
clientTransport.send({
id: '1',
from: 'client',
to: 'SERVER',
serviceName: 'test',
procedureName: 'test',
payload: msg,
});

await expect(p).resolves.toStrictEqual(msg);
});
});
15 changes: 9 additions & 6 deletions transport/stdio.ts → transport/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ import { OpaqueTransportMessage, TransportClientId } from './message';
import { Transport } from './types';
import readline from 'readline';

export class StdioTransport extends Transport {
constructor(clientId: TransportClientId) {
export class StreamTransport extends Transport {
input: NodeJS.ReadableStream
output: NodeJS.WritableStream

constructor(clientId: TransportClientId, input: NodeJS.ReadableStream = process.stdin, output: NodeJS.WritableStream = process.stdout) {
super(NaiveJsonCodec, clientId);
const { stdin, stdout } = process;
this.input = input
this.output = output
const rl = readline.createInterface({
input: stdin,
output: stdout,
input: this.input,
});

rl.on('line', (msg) => this.onMessage(msg));
}

send(msg: OpaqueTransportMessage): string {
const id = msg.id;
process.stdout.write(this.codec.toStringBuf(msg));
this.output.write(this.codec.toStringBuf(msg) + "\n");
return id;
}

Expand Down
1 change: 1 addition & 0 deletions transport/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ export abstract class Transport {
abstract send(msg: OpaqueTransportMessage | TransportMessageAck): MessageId;
abstract close(): Promise<void>;
}

2 changes: 1 addition & 1 deletion transport/ws.util.ts → transport/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import http from 'http';
import WebSocket from 'isomorphic-ws';
import { WebSocketServer } from 'ws';
import { Transport } from './types';
import { OpaqueTransportMessage } from './message';
import { WebSocketTransport } from './ws';
import { OpaqueTransportMessage } from './message';

export async function createWebSocketServer(server: http.Server) {
return new WebSocketServer({ server });
Expand Down
2 changes: 1 addition & 1 deletion transport/ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
createWebSocketServer,
onServerReady,
waitForMessage,
} from './ws.util';
} from './util';

const port = 3000;
describe('sending and receiving across websockets works', () => {
Expand Down

0 comments on commit 46b2cdc

Please sign in to comment.