Skip to content

Commit

Permalink
logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 committed Oct 19, 2023
1 parent 10a3771 commit f8708a4
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 23 deletions.
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,3 @@ It's like tRPC but...
- with full-duplex streaming
- with support for service multiplexing
- over WebSockets

## Levels of abstraction

- Router
- Service
- Procedure

## TODO

- support broadcast
4 changes: 2 additions & 2 deletions __tests__/integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import http from 'http';
import { Type } from '@sinclair/typebox';
import { ServiceBuilder, serializeService } from '../router/builder';
import { TransportMessage, reply } from '../transport/message';
import { reply } from '../transport/message';
import { afterAll, describe, expect, test } from 'vitest';
import {
createWebSocketServer,
Expand All @@ -11,7 +11,7 @@ import {
import { createServer } from '../router/server';
import { createClient } from '../router/client';
import { asClientRpc, asClientStream } from '../router/server.util';
import { nanoid } from 'nanoid';
import { bindLogger, log, setLevel } from '../logging';

export const EchoRequest = Type.Object({
msg: Type.String(),
Expand Down
7 changes: 7 additions & 0 deletions codec/codec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ describe('naive json codec', () => {
NaiveJsonCodec.fromStringBuf(NaiveJsonCodec.toStringBuf(msg)),
).toStrictEqual(msg);
});

test('invalid json returns null', () => {
expect(NaiveJsonCodec.fromStringBuf('')).toBeNull();
expect(NaiveJsonCodec.fromStringBuf('[')).toBeNull();
expect(NaiveJsonCodec.fromStringBuf('[{}')).toBeNull();
expect(NaiveJsonCodec.fromStringBuf('{"a":1}[]')).toBeNull();
});
});
8 changes: 7 additions & 1 deletion codec/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@ import { Codec } from './types';

export const NaiveJsonCodec: Codec = {
toStringBuf: JSON.stringify,
fromStringBuf: JSON.parse,
fromStringBuf: (s: string) => {
try {
return JSON.parse(s);
} catch {
return null;
}
},
};
2 changes: 1 addition & 1 deletion codec/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export interface Codec {
toStringBuf(obj: object): string;
fromStringBuf(buf: string): object;
fromStringBuf(buf: string): object | null;
}
43 changes: 43 additions & 0 deletions logging/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const LoggingLevels = {
info: 0,
warn: 1,
error: 2,
} as const;

type LoggingLevel = keyof typeof LoggingLevels;
export type Logger = {
minLevel: LoggingLevel;
} & {
[key in LoggingLevel]: (msg: string) => void;
};

export let log: Logger | undefined;
const defaultLoggingLevel: LoggingLevel = 'warn';

export function bindLogger(write: (msg: string) => void, color?: boolean) {
const info = color ? '\u001b[37minfo\u001b[0m' : 'info';
const warn = color ? '\u001b[33mwarn\u001b[0m' : 'warn';
const error = color ? '\u001b[31merr\u001b[0m' : 'err';

log = {
info: (msg) =>
log &&
LoggingLevels[log.minLevel] <= 0 &&
write(`[river:${info}] ${msg}`),
warn: (msg) =>
log &&
LoggingLevels[log.minLevel] <= 1 &&
write(`[river:${warn}] ${msg}`),
error: (msg) =>
log &&
LoggingLevels[log.minLevel] <= 2 &&
write(`[river:${error}] ${msg}`),
minLevel: log?.minLevel ?? defaultLoggingLevel,
};
}

export function setLevel(level: LoggingLevel) {
if (log) {
log.minLevel = level;
}
}
14 changes: 11 additions & 3 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { pushable } from 'it-pushable';
import type { Pushable } from 'it-pushable';
import { OpaqueTransportMessage, TransportMessage } from '../transport/message';
import { ServiceContext, ServiceContextWithState } from './context';
import { log } from '../logging';

export interface Server<Services> {
services: Services;
Expand Down Expand Up @@ -33,7 +34,9 @@ export async function createServer<Services extends Record<string, AnyService>>(
const context = contextMap.get(service);

if (!context) {
throw new Error(`No context found for ${service.name}`);
const err = `No context found for ${service.name}`;
log?.error(err);
throw new Error(err);
}

return context;
Expand Down Expand Up @@ -75,7 +78,6 @@ export async function createServer<Services extends Record<string, AnyService>>(
}

const handler = async (msg: OpaqueTransportMessage) => {
// TODO: log msgs received
if (msg.to !== 'SERVER') {
return;
}
Expand Down Expand Up @@ -121,10 +123,16 @@ export async function createServer<Services extends Record<string, AnyService>>(
streams.incoming.push(inputMessage);
return;
} else {
// TODO: log invalid payload
log?.error(
`${transport.clientId} -- procedure ${msg.serviceName}.${msg.procedureName} received invalid payload: ${inputMessage.payload}`,
);
}
}
}

log?.warn(
`${transport.clientId} -- couldn't find a matching procedure for ${msg.serviceName}.${msg.procedureName}`,
);
};

transport.addMessageListener(handler);
Expand Down
17 changes: 13 additions & 4 deletions transport/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
TransportMessageAck,
ack,
} from './message';
import { log } from '../logging';

export abstract class Transport {
codec: Codec;
Expand All @@ -24,15 +25,21 @@ export abstract class Transport {
}

onMessage(msg: string) {
// TODO: try catch from string buf
const parsedMsg = this.codec.fromStringBuf(msg.toString());
const parsedMsg = this.codec.fromStringBuf(msg);
if (parsedMsg === null) {
log?.warn(`${this.clientId} -- received malformed msg: ${msg}`);
return;
}

if (Value.Check(TransportAckSchema, parsedMsg)) {
// process ack
log?.info(`${this.clientId} -- received ack: ${msg}`);
if (this.sendBuffer.has(parsedMsg.ack)) {
this.sendBuffer.delete(parsedMsg.ack);
}
} else if (Value.Check(OpaqueTransportMessageSchema, parsedMsg)) {
log?.info(`${this.clientId} -- received msg: ${msg}`);

// ignore if not for us
if (parsedMsg.to !== this.clientId && parsedMsg.to !== 'broadcast') {
return;
Expand All @@ -43,9 +50,11 @@ export abstract class Transport {
handler(parsedMsg);
}

this.send(ack(parsedMsg));
const ackMsg = ack(parsedMsg);
ackMsg.from = this.clientId;
this.send(ackMsg);
} else {
// TODO: warn on malformed
log?.warn(`${this.clientId} -- received invalid transport msg: ${msg}`);
}
}

Expand Down
23 changes: 21 additions & 2 deletions transport/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
OpaqueTransportMessage,
TransportClientId,
} from './message';
import { log } from '../logging';

interface Options {
retryIntervalMs: number;
Expand Down Expand Up @@ -41,6 +42,7 @@ export class WebSocketTransport extends Transport {
private async tryConnect() {
// wait until it's ready or we get an error
this.reconnectPromise ??= new Promise<WebSocketResult>(async (resolve) => {
log?.info(`${this.clientId} -- establishing a new websocket`);
const ws = await this.wsGetter();
if (ws.readyState === ws.OPEN) {
return resolve({ ws });
Expand Down Expand Up @@ -69,6 +71,8 @@ export class WebSocketTransport extends Transport {

// only send if we resolved a valid websocket
if ('ws' in res && res.ws.readyState === res.ws.OPEN) {
log?.info(`${this.clientId} -- websocket ok`);

this.ws = res.ws;
this.ws.onmessage = (msg) => this.onMessage(msg.data.toString());
this.ws.onclose = () => {
Expand All @@ -80,9 +84,12 @@ export class WebSocketTransport extends Transport {
for (const id of this.sendQueue) {
const msg = this.sendBuffer.get(id);
if (!msg) {
throw new Error('tried to resend a message we received an ack for');
const err = 'tried to resend a message we received an ack for';
log?.error(err);
throw new Error(err);
}

log?.info(`${this.clientId} -- sending ${JSON.stringify(msg)}`);
this.ws.send(this.codec.toStringBuf(msg));
}

Expand All @@ -91,20 +98,31 @@ export class WebSocketTransport extends Transport {
}

// otherwise try and reconnect again
log?.warn(
`${this.clientId} -- websocket failed, trying again in ${this.options.retryIntervalMs}ms`,
);
this.reconnectPromise = undefined;
setTimeout(() => this.tryConnect(), this.options.retryIntervalMs);
}

send(msg: OpaqueTransportMessage): MessageId {
const id = msg.id;
if (this.destroyed) {
throw new Error('ws is destroyed, cant send');
const err = 'ws is destroyed, cant send';
log?.error(err);
throw new Error(err);
}

this.sendBuffer.set(id, msg);
if (this.ws && this.ws.readyState === this.ws.OPEN) {
log?.info(`${this.clientId} -- sending ${JSON.stringify(msg)}`);
this.ws.send(this.codec.toStringBuf(msg));
} else {
log?.info(
`${this.clientId} -- transport not ready, queuing ${JSON.stringify(
msg,
)}`,
);
this.sendQueue.push(id);
this.tryConnect().catch();
}
Expand All @@ -113,6 +131,7 @@ export class WebSocketTransport extends Transport {
}

async close() {
log?.info('manually closed ws');
this.destroyed = true;
return this.ws?.close();
}
Expand Down

0 comments on commit f8708a4

Please sign in to comment.