Skip to content

Commit

Permalink
async ws constructor, bump to 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 committed Oct 13, 2023
1 parent 9aa64fa commit 6a9f5e1
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 42 deletions.
2 changes: 1 addition & 1 deletion __tests__/typescript-stress.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class MockTransport extends Transport {
super(NaiveJsonCodec, clientId);
}

send(msg: OpaqueTransportMessage): MessageId {
async send(msg: OpaqueTransportMessage): Promise<MessageId> {
const id = msg.id;
return id;
}
Expand Down
3 changes: 1 addition & 2 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ export {
onServerReady,
createWsTransports,
waitForMessage,
waitForSocketReady,
createWebSocketClient,
createLocalWebSocketClient,
} from './transport/util';
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@replit/river",
"sideEffects": false,
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.1.10",
"version": "0.2.0",
"type": "module",
"main": "index.js",
"types": "index.d.ts",
Expand Down
4 changes: 2 additions & 2 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
// this gets cleaned up on i.end() which is called by closeHandler
(async () => {
for await (const rawIn of i) {
transport.send(
await transport.send(
msg(
transport.clientId,
'SERVER',
Expand Down Expand Up @@ -103,7 +103,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
return [i, o, closeHandler];
} else {
// rpc case
const id = transport.send(
const id = await transport.send(
msg(
transport.clientId,
'SERVER',
Expand Down
4 changes: 2 additions & 2 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export async function createServer<Services extends Record<string, AnyService>>(
// sending outgoing messages back to client
(async () => {
for await (const response of outgoing) {
transport.send(response);
await transport.send(response);
}
})(),
]),
Expand Down Expand Up @@ -101,7 +101,7 @@ export async function createServer<Services extends Record<string, AnyService>>(
getContext(service),
inputMessage,
);
transport.send(response);
await transport.send(response);
return;
} else if (
procedure.type === 'stream' &&
Expand Down
2 changes: 2 additions & 0 deletions transport/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export type MessageId = string;
export type OpaqueTransportMessage = TransportMessage<unknown>;
export type TransportClientId = 'SERVER' | string;
export const TransportAckSchema = Type.Object({
id: Type.String(),
from: Type.String(),
ack: Type.String(),
});
Expand Down Expand Up @@ -63,6 +64,7 @@ export function payloadToTransportMessage<Payload extends object>(

export function ack(msg: OpaqueTransportMessage): TransportMessageAck {
return {
id: nanoid(),
from: msg.to,
ack: msg.id,
};
Expand Down
2 changes: 1 addition & 1 deletion transport/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('sending and receiving across node streams works', () => {
};

const p = waitForMessage(serverTransport);
clientTransport.send({
await clientTransport.send({
id: '1',
from: 'client',
to: 'SERVER',
Expand Down
2 changes: 1 addition & 1 deletion transport/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class StreamTransport extends Transport {
rl.on('line', (msg) => this.onMessage(msg));
}

send(msg: OpaqueTransportMessage): string {
async send(msg: OpaqueTransportMessage): Promise<string> {
const id = msg.id;
this.output.write(this.codec.toStringBuf(msg) + '\n');
return id;
Expand Down
4 changes: 3 additions & 1 deletion transport/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ export abstract class Transport {
this.handlers.delete(handler);
}

abstract send(msg: OpaqueTransportMessage | TransportMessageAck): MessageId;
abstract send(
msg: OpaqueTransportMessage | TransportMessageAck,
): Promise<MessageId>;
abstract close(): Promise<void>;
}
22 changes: 7 additions & 15 deletions transport/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,25 @@ export async function onServerReady(
});
}

export async function createLocalWebSocketClient(port: number) {
return new WebSocket(`ws://localhost:${port}`);
}

export async function createWsTransports(
port: number,
wss: WebSocketServer,
): Promise<[Transport, Transport]> {
return new Promise((resolve) => {
const clientSockPromise = createWebSocketClient(port);
const clientSockPromise = createLocalWebSocketClient(port);
wss.on('connection', async (serverSock) => {
resolve([
new WebSocketTransport(await clientSockPromise, 'client'),
new WebSocketTransport(serverSock, 'SERVER'),
new WebSocketTransport(() => clientSockPromise, 'client'),
new WebSocketTransport(() => Promise.resolve(serverSock), 'SERVER'),
]);
});
});
}

export async function waitForSocketReady(socket: WebSocket) {
return new Promise<void>((resolve) => {
socket.addEventListener('open', () => resolve());
});
}

export async function createWebSocketClient(port: number) {
const client = new WebSocket(`ws://localhost:${port}`);
await waitForSocketReady(client);
return client;
}

export async function waitForMessage(
t: Transport,
filter?: (msg: OpaqueTransportMessage) => boolean,
Expand Down
16 changes: 10 additions & 6 deletions transport/ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { WebSocketServer } from 'ws';
import { WebSocketTransport } from './ws';
import { describe, test, expect, beforeAll, afterAll } from 'vitest';
import {
createWebSocketClient,
createLocalWebSocketClient,
createWebSocketServer,
onServerReady,
waitForMessage,
Expand All @@ -28,18 +28,22 @@ describe('sending and receiving across websockets works', () => {
test('basic send/receive', async () => {
let serverTransport: WebSocketTransport | undefined;
wss.on('connection', (conn) => {
serverTransport = new WebSocketTransport(conn, 'SERVER');
serverTransport = new WebSocketTransport(
() => Promise.resolve(conn),
'SERVER',
);
});

const clientSoc = await createWebSocketClient(port);
const clientTransport = new WebSocketTransport(clientSoc, 'client');
const clientTransport = new WebSocketTransport(
() => createLocalWebSocketClient(port),
'client',
);

const msg = {
msg: 'cool',
test: 123,
};

clientTransport.send({
await clientTransport.send({
id: '1',
from: 'client',
to: 'SERVER',
Expand Down
55 changes: 47 additions & 8 deletions transport/ws.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type WebSocket from 'isomorphic-ws';
import WebSocket from 'isomorphic-ws';
import { Transport } from './types';
import { NaiveJsonCodec } from '../codec/json';
import {
Expand All @@ -13,21 +13,60 @@ import {
// - how do we handle forceful client disconnects? (i.e. broken connection, offline)
// - how do we handle forceful service disconnects (i.e. a crash)?
export class WebSocketTransport extends Transport {
ws: WebSocket;
wsGetter: () => Promise<WebSocket>;
ws?: WebSocket;
destroyed: boolean;

constructor(ws: WebSocket, clientId: TransportClientId) {
constructor(wsGetter: () => Promise<WebSocket>, clientId: TransportClientId) {
super(NaiveJsonCodec, clientId);
this.ws = ws;
ws.onmessage = (msg) => this.onMessage(msg.data.toString());
this.destroyed = false;
this.wsGetter = wsGetter;
this.waitForSocketReady();
}

send(msg: OpaqueTransportMessage): MessageId {
// postcondition: ws is concretely a WebSocket
private async waitForSocketReady(): Promise<WebSocket> {
return new Promise<WebSocket>((resolve, reject) => {
if (this.destroyed) {
reject(new Error('ws is destroyed'));
return;
}

if (this.ws) {
// constructed ws but not open
if (this.ws.readyState === this.ws.OPEN) {
return resolve(this.ws);
}

// resolve on open
this.ws.onopen = (evt) => {
return resolve(evt.target);
};

// reject if borked
this.ws.onerror = (err) => reject(err);
} else {
// not constructed
this.wsGetter().then((ws) => {
this.ws = ws;
return resolve(this.waitForSocketReady());
});
}
}).then((ws) => {
ws.onmessage = (msg) => this.onMessage(msg.data.toString());
return ws;
});
}

async send(msg: OpaqueTransportMessage): Promise<MessageId> {
const id = msg.id;
this.ws.send(this.codec.toStringBuf(msg));
const ws = await this.waitForSocketReady();
ws.send(this.codec.toStringBuf(msg));
return id;
}

async close() {
return this.ws.close();
this.destroyed = true;
return this.ws?.close();
}
}

0 comments on commit 6a9f5e1

Please sign in to comment.