diff --git a/package-lock.json b/package-lock.json index 6885e9eb..40c24c6f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.203.1", + "version": "0.203.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.203.1", + "version": "0.203.2", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", diff --git a/package.json b/package.json index 9a4d2e91..d259095e 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.203.1", + "version": "0.203.2", "type": "module", "exports": { ".": { diff --git a/transport/connection.ts b/transport/connection.ts index 7e05da0d..28f389f6 100644 --- a/transport/connection.ts +++ b/transport/connection.ts @@ -47,6 +47,26 @@ export abstract class Connection { return [...this._errorListeners]; } + onData(msg: Uint8Array) { + for (const cb of this.dataListeners) { + cb(msg); + } + } + + onError(err: Error) { + for (const cb of this.errorListeners) { + cb(err); + } + } + + onClose() { + for (const cb of this.closeListeners) { + cb(); + } + + this.telemetry?.span.end(); + } + /** * Handle adding a callback for when a message is received. * @param msg The message that was received. diff --git a/transport/impls/ws/connection.ts b/transport/impls/ws/connection.ts index f7a1cbe8..c436c3ac 100644 --- a/transport/impls/ws/connection.ts +++ b/transport/impls/ws/connection.ts @@ -40,20 +40,14 @@ export class WebSocketConnection extends Connection { `websocket closed with code and reason: ${code} - ${reason}`, ); - for (const cb of this.errorListeners) { - cb(err); - } + this.onError(err); } - for (const cb of this.closeListeners) { - cb(); - } + this.onClose(); }; this.ws.onmessage = (msg) => { - for (const cb of this.dataListeners) { - cb(msg.data as Uint8Array); - } + this.onData(msg.data as Uint8Array); }; }