-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WebSocket Subscriptions via JRPC (#107)
This PR enhances `WebSocket` support via `JSON RPC 2.0`. This now supports long running subscriptions via `JSON RPC`. The new `JsonRpcSocket` abstracts this by providing two methods,`request` and `subscribe`. `request` sends a JSON RPC message over the socket, setting up a message handler to listen to the response, once the response is received it will clean up the listener, and resolve the response to the promise. If a response is not emitted within a given timeout, it will reject the promise. `subscribe` does something similar, however after receiving the response to the subscribe message, it will keep the handler open and look for any other messages that match the JSON RPC Id provided in the subscription request, and emit those to a message handler. The subscribe method returns a close method in order to clean up these listeners. Some things to note: - `RecordsWrite` are currently not supported via sockets, this is due to a poor handling of the data payload in the prior implementation. Would rather add this as a separate effort. TODO is tagged in code with the issue listed below. - `Subscribe` methods are currently only supported via sockets and hot `http`. - A `rpc.subscription.close` JSON RPC Method was added to close a subscription that is active for a connection. I went back and forth between making this a DWN Message vs some other signal. I landed on this for the time being and am open to discussion. More notes below. - As a part of `getDWNConfig`, `tenantGate` and `eventStream` were both made optional, as well as the `registrationManager` for `HttpApi`. I did this mostly out of convenience, but it also seems plausible that someone might run this without any registration manager. Open to discuss/change. - the `sendHttpMessage` method within `tests/utils.ts` will also be replaced by full-fledged client, listed in a separate PR below. - Current implementation allows anyone to connect, this will be addressed in a subsequent PR, issue listed below. ### Usage of `subscription.close` JSON RPC method. Q: Why not use a specific `Unsubscribe` DWN message such as `RecordsUnsubscribe`? A: This would be the first message of it's kind that would need to specifically target the DWN and potentially transport of the DWN which holds the subscription. Instead the DWN `RecordsSubscribe` message returns a close method which the transport can keep a reference to given a specific `JSON RPC Id`. This JSON RPC Id represents a specific request to a transport that was created. Later a user can issue a `rpc.subscription.close` JSON RPC Method to tell the server to close that subscription. Ultimately the owner of the JRPC Socket connection is in charge of closing the subscription, they can close all subscriptions by simply disconnecting. So it makes sense to give them the ability to close a specific JRPC Subscription. ## Initial Effort Subsequent Issues/PRs: - #111 - #109 - #110 ### Separate effort: - #108
- Loading branch information
1 parent
0e14b8d
commit e1396cb
Showing
28 changed files
with
1,823 additions
and
241 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import type { Dwn } from "@tbd54566975/dwn-sdk-js"; | ||
|
||
import type { IncomingMessage } from "http"; | ||
import type { WebSocket } from 'ws'; | ||
|
||
import { SocketConnection } from "./socket-connection.js"; | ||
|
||
/** | ||
* Interface for managing `WebSocket` connections as they arrive. | ||
*/ | ||
export interface ConnectionManager { | ||
/** connect handler used for the `WebSockets` `'connection'` event. */ | ||
connect(socket: WebSocket, request?: IncomingMessage): Promise<void>; | ||
/** closes all of the connections */ | ||
closeAll(): Promise<void> | ||
} | ||
|
||
/** | ||
* A Simple In Memory ConnectionManager implementation. | ||
* It uses a `Map<WebSocket, SocketConnection>` to manage connections. | ||
*/ | ||
export class InMemoryConnectionManager implements ConnectionManager { | ||
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {} | ||
|
||
async connect(socket: WebSocket): Promise<void> { | ||
const connection = new SocketConnection(socket, this.dwn, () => { | ||
// this is the onClose handler to clean up any closed connections. | ||
this.connections.delete(socket); | ||
}); | ||
|
||
this.connections.set(socket, connection); | ||
} | ||
|
||
async closeAll(): Promise<void> { | ||
const closePromises = []; | ||
this.connections.forEach(connection => closePromises.push(connection.close())); | ||
await Promise.all(closePromises); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js"; | ||
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js"; | ||
|
||
import type { WebSocket } from "ws"; | ||
import log from 'loglevel'; | ||
import { v4 as uuidv4 } from 'uuid'; | ||
|
||
import type { RequestContext } from "../lib/json-rpc-router.js"; | ||
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js"; | ||
|
||
import { requestCounter } from "../metrics.js"; | ||
import { jsonRpcRouter } from "../json-rpc-api.js"; | ||
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js"; | ||
import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; | ||
|
||
const HEARTBEAT_INTERVAL = 30_000; | ||
|
||
/** | ||
* SocketConnection handles a WebSocket connection to a DWN using JSON RPC. | ||
* It also manages references to the long running RPC subscriptions for the connection. | ||
*/ | ||
export class SocketConnection { | ||
private heartbeatInterval: NodeJS.Timer; | ||
private subscriptions: Map<JsonRpcId, JsonRpcSubscription> = new Map(); | ||
private isAlive: boolean; | ||
|
||
constructor( | ||
private socket: WebSocket, | ||
private dwn: Dwn, | ||
private onClose?: () => void | ||
){ | ||
socket.on('message', this.message.bind(this)); | ||
socket.on('close', this.close.bind(this)); | ||
socket.on('error', this.error.bind(this)); | ||
socket.on('pong', this.pong.bind(this)); | ||
|
||
// Sometimes connections between client <-> server can get borked in such a way that | ||
// leaves both unaware of the borkage. ping messages can be used as a means to verify | ||
// that the remote endpoint is still responsive. Server will ping each socket every 30s | ||
// if a pong hasn't received from a socket by the next ping, the server will terminate | ||
// the socket connection | ||
this.isAlive = true; | ||
this.heartbeatInterval = setInterval(() => { | ||
if (this.isAlive === false) { | ||
this.close(); | ||
} | ||
this.isAlive = false; | ||
this.socket.ping(); | ||
}, HEARTBEAT_INTERVAL); | ||
} | ||
|
||
/** | ||
* Checks to see if the incoming `JsonRpcId` is already in use for a subscription. | ||
*/ | ||
hasSubscription(id: JsonRpcId): boolean { | ||
return this.subscriptions.has(id); | ||
} | ||
|
||
/** | ||
* Adds a reference for the JSON RPC Subscription to this connection. | ||
* Used for cleanup if the connection is closed. | ||
*/ | ||
async addSubscription(subscription: JsonRpcSubscription): Promise<void> { | ||
if (this.subscriptions.has(subscription.id)) { | ||
throw new DwnServerError( | ||
DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists, | ||
`the subscription with id ${subscription.id} already exists` | ||
) | ||
} | ||
|
||
this.subscriptions.set(subscription.id, subscription); | ||
} | ||
|
||
/** | ||
* Closes and removes the reference for a given subscription from this connection. | ||
* | ||
* @param id the `JsonRpcId` of the JSON RPC subscription request. | ||
*/ | ||
async closeSubscription(id: JsonRpcId): Promise<void> { | ||
if (!this.subscriptions.has(id)) { | ||
throw new DwnServerError( | ||
DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound, | ||
`the subscription with id ${id} was not found` | ||
) | ||
} | ||
|
||
const connection = this.subscriptions.get(id); | ||
await connection.close(); | ||
this.subscriptions.delete(id); | ||
} | ||
|
||
/** | ||
* Closes the existing connection and cleans up any listeners or subscriptions. | ||
*/ | ||
async close(): Promise<void> { | ||
clearInterval(this.heartbeatInterval); | ||
|
||
// clean up all socket event listeners | ||
this.socket.removeAllListeners(); | ||
|
||
const closePromises = []; | ||
for (const [id, subscription] of this.subscriptions) { | ||
closePromises.push(subscription.close()); | ||
this.subscriptions.delete(id); | ||
} | ||
|
||
// close all of the associated subscriptions | ||
await Promise.all(closePromises); | ||
|
||
// close the socket. | ||
this.socket.close(); | ||
|
||
// if there was a close handler passed call it after the connection has been closed | ||
if (this.onClose !== undefined) { | ||
this.onClose(); | ||
} | ||
} | ||
|
||
/** | ||
* Pong messages are automatically sent in response to ping messages as required by | ||
* the websocket spec. So, no need to send explicit pongs. | ||
*/ | ||
private pong(): void { | ||
this.isAlive = true; | ||
} | ||
|
||
/** | ||
* Log the error and close the connection. | ||
*/ | ||
private async error(error:Error): Promise<void>{ | ||
log.error(`SocketConnection error, terminating connection`, error); | ||
this.socket.terminate(); | ||
await this.close(); | ||
} | ||
|
||
/** | ||
* Handles a `JSON RPC 2.0` encoded message. | ||
*/ | ||
private async message(dataBuffer: Buffer): Promise<void> { | ||
const requestData = dataBuffer.toString(); | ||
if (!requestData) { | ||
return this.send(createJsonRpcErrorResponse( | ||
uuidv4(), | ||
JsonRpcErrorCodes.BadRequest, | ||
'request payload required.' | ||
)) | ||
} | ||
|
||
let jsonRequest: JsonRpcRequest; | ||
try { | ||
jsonRequest = JSON.parse(requestData); | ||
} catch(error) { | ||
const errorResponse = createJsonRpcErrorResponse( | ||
uuidv4(), | ||
JsonRpcErrorCodes.BadRequest, | ||
(error as Error).message | ||
); | ||
return this.send(errorResponse); | ||
}; | ||
|
||
const requestContext = await this.buildRequestContext(jsonRequest); | ||
const { jsonRpcResponse } = await jsonRpcRouter.handle(jsonRequest, requestContext); | ||
if (jsonRpcResponse.error) { | ||
requestCounter.inc({ method: jsonRequest.method, error: 1 }); | ||
} else { | ||
requestCounter.inc({ | ||
method: jsonRequest.method, | ||
status: jsonRpcResponse?.result?.reply?.status?.code || 0, | ||
}); | ||
} | ||
this.send(jsonRpcResponse); | ||
} | ||
|
||
/** | ||
* Sends a JSON encoded Buffer through the Websocket. | ||
*/ | ||
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { | ||
this.socket.send(Buffer.from(JSON.stringify(response))); | ||
} | ||
|
||
/** | ||
* Creates a subscription handler to send messages matching the subscription requested. | ||
* | ||
* Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket. | ||
*/ | ||
private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void { | ||
return (event) => { | ||
const response = createJsonRpcSuccessResponse(id, { event }); | ||
this.send(response); | ||
} | ||
} | ||
|
||
/** | ||
* Builds a `RequestContext` object to use with the `JSON RPC API`. | ||
* | ||
* Adds a `subscriptionHandler` for `Subscribe` messages. | ||
*/ | ||
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> { | ||
const { params, method, subscription } = request; | ||
|
||
const requestContext: RequestContext = { | ||
transport : 'ws', | ||
dwn : this.dwn, | ||
socketConnection : this, | ||
} | ||
|
||
// methods that expect a long-running subscription begin with `rpc.subscribe.` | ||
if (method.startsWith('rpc.subscribe.') && subscription) { | ||
const { message } = params as { message?: GenericMessage }; | ||
if (message?.descriptor.method === DwnMethodName.Subscribe) { | ||
const handlerFunc = this.createSubscriptionHandler(subscription.id); | ||
requestContext.subscriptionRequest = { | ||
id: subscription.id, | ||
subscriptionHandler: (message): void => handlerFunc(message), | ||
} | ||
} | ||
} | ||
|
||
return requestContext; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.