generated from TBD54566975/tbd-project-template
-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
@web5/agent
DWN + Web5 RPC Clients (#433)
- organize rpc clients into `agent/prototyping/clients` until they have a permanent home - copy `JsonRpcSocket` class from `@web5/dwn-server`, using `isomorphic-ws` to allow for isomorphic socket client. - create `WebSocketDwnRpcClient` and `WebSocketWeb5RpcClient` to support web5 requests over sockets.
- Loading branch information
1 parent
393d483
commit ac1e6f1
Showing
15 changed files
with
1,467 additions
and
134 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
--- | ||
"@web5/agent": patch | ||
"@web5/identity-agent": patch | ||
"@web5/proxy-agent": patch | ||
"@web5/user-agent": patch | ||
--- | ||
|
||
Extend and Test RPC DWN/Web5 Clients to support `http` and `ws` | ||
- move `HttpDwnRpcClient` to `/prototyping` folder | ||
- move `JSON RPC` related files to `/prototyping` folder | ||
- create `WebSocketDwnRpcClient` in `/prototyping` folder | ||
- create `WebSocketWeb5RpcClient` wrapper in `rpc-client` | ||
- does not support `sendDidRequest` via sockets | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import type { MessageEvent, RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; | ||
|
||
export interface SerializableDwnMessage { | ||
toJSON(): string; | ||
} | ||
|
||
export type DwnEventSubscriptionHandler = (event: MessageEvent) => void; | ||
|
||
/** | ||
* Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers} | ||
* via JSON-RPC, supporting operations like sending DWN requests. | ||
*/ | ||
export interface DwnRpc { | ||
/** | ||
* Lists the transport protocols supported by the DWN RPC client, such as HTTP or HTTPS. | ||
* @returns An array of strings representing the supported transport protocols. | ||
*/ | ||
get transportProtocols(): string[] | ||
|
||
/** | ||
* Sends a request to a DWN Server using the specified DWN RPC request parameters. | ||
* | ||
* @param request - The DWN RPC request containing the URL, target DID, message, and optional data. | ||
* @returns A promise that resolves to the response from the DWN server. | ||
*/ | ||
sendDwnRequest(request: DwnRpcRequest): Promise<DwnRpcResponse> | ||
} | ||
|
||
|
||
/** | ||
* Represents a JSON RPC request to a DWN server, including the URL, target DID, the message to be | ||
* processed, and optional data. | ||
*/ | ||
export type DwnRpcRequest = { | ||
/** Optional data to be sent with the request. */ | ||
data?: any; | ||
|
||
/** The URL of the DWN server to which the request is sent. */ | ||
dwnUrl: string; | ||
|
||
/** The message to be processed by the DWN server, which can be a serializable DWN message. */ | ||
message: SerializableDwnMessage | any; | ||
|
||
/** The DID of the target to which the message is addressed. */ | ||
targetDid: string; | ||
|
||
/** Optional subscription handler for DWN events. */ | ||
subscriptionHandler?: DwnEventSubscriptionHandler; | ||
} | ||
|
||
/** | ||
* Represents the JSON RPC response from a DWN server to a request, combining the results of various | ||
* DWN operations. | ||
*/ | ||
export type DwnRpcResponse = UnionMessageReply & RecordsReadReply; |
68 changes: 68 additions & 0 deletions
68
packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import type { JsonRpcResponse } from './json-rpc.js'; | ||
import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js'; | ||
|
||
import { createJsonRpcRequest, parseJson } from './json-rpc.js'; | ||
import { utils as cryptoUtils } from '@web5/crypto'; | ||
|
||
/** | ||
* HTTP client that can be used to communicate with Dwn Servers | ||
*/ | ||
export class HttpDwnRpcClient implements DwnRpc { | ||
get transportProtocols() { return ['http:', 'https:']; } | ||
|
||
async sendDwnRequest(request: DwnRpcRequest): Promise<DwnRpcResponse> { | ||
const requestId = cryptoUtils.randomUuid(); | ||
const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { | ||
target : request.targetDid, | ||
message : request.message | ||
}); | ||
|
||
const fetchOpts = { | ||
method : 'POST', | ||
headers : { | ||
'dwn-request': JSON.stringify(jsonRpcRequest) | ||
} | ||
}; | ||
|
||
if (request.data) { | ||
// @ts-expect-error TODO: REMOVE | ||
fetchOpts.headers['content-type'] = 'application/octet-stream'; | ||
// @ts-expect-error TODO: REMOVE | ||
fetchOpts['body'] = request.data; | ||
} | ||
|
||
const resp = await fetch(request.dwnUrl, fetchOpts); | ||
let dwnRpcResponse: JsonRpcResponse; | ||
|
||
// check to see if response is in header first. if it is, that means the response is a ReadableStream | ||
let dataStream; | ||
const { headers } = resp; | ||
if (headers.has('dwn-response')) { | ||
// @ts-expect-error TODO: REMOVE | ||
const jsonRpcResponse = parseJson(headers.get('dwn-response')) as JsonRpcResponse; | ||
|
||
if (jsonRpcResponse == null) { | ||
throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`); | ||
} | ||
|
||
dataStream = resp.body; | ||
dwnRpcResponse = jsonRpcResponse; | ||
} else { | ||
// TODO: wonder if i need to try/catch this? | ||
const responseBody = await resp.text(); | ||
dwnRpcResponse = JSON.parse(responseBody); | ||
} | ||
|
||
if (dwnRpcResponse.error) { | ||
const { code, message } = dwnRpcResponse.error; | ||
throw new Error(`(${code}) - ${message}`); | ||
} | ||
|
||
const { reply } = dwnRpcResponse.result; | ||
if (dataStream) { | ||
reply['record']['data'] = dataStream; | ||
} | ||
|
||
return reply as DwnRpcResponse; | ||
} | ||
} |
169 changes: 169 additions & 0 deletions
169
packages/agent/src/prototyping/clients/json-rpc-socket.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
import { utils as cryptoUtils } from '@web5/crypto'; | ||
import IsomorphicWebSocket from 'isomorphic-ws'; | ||
import { JsonRpcId, JsonRpcRequest, JsonRpcResponse, createJsonRpcSubscriptionRequest, parseJson } from './json-rpc.js'; | ||
|
||
// These were arbitrarily chosen, but can be modified via connect options | ||
const CONNECT_TIMEOUT = 3_000; | ||
const RESPONSE_TIMEOUT = 30_000; | ||
|
||
export interface JsonRpcSocketOptions { | ||
/** socket connection timeout in milliseconds */ | ||
connectTimeout?: number; | ||
/** response timeout for rpc requests in milliseconds */ | ||
responseTimeout?: number; | ||
/** optional connection close handler */ | ||
onclose?: () => void; | ||
/** optional socket error handler */ | ||
onerror?: (error?: any) => void; | ||
} | ||
|
||
/** | ||
* JSON RPC Socket Client for WebSocket request/response and long-running subscriptions. | ||
* | ||
* NOTE: This is temporarily copied over from https://github.com/TBD54566975/dwn-server/blob/main/src/json-rpc-socket.ts | ||
* This was done in order to avoid taking a dependency on the `dwn-server`, until a future time when there will be a `clients` package. | ||
*/ | ||
export class JsonRpcSocket { | ||
private messageHandlers: Map<JsonRpcId, (event: { data: any }) => void> = new Map(); | ||
|
||
private constructor(private socket: IsomorphicWebSocket, private responseTimeout: number) {} | ||
|
||
static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise<JsonRpcSocket> { | ||
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options; | ||
|
||
const socket = new IsomorphicWebSocket(url); | ||
|
||
if (!onclose) { | ||
socket.onclose = ():void => { | ||
console.info(`JSON RPC Socket close ${url}`); | ||
}; | ||
} else { | ||
socket.onclose = onclose; | ||
} | ||
|
||
if (!onerror) { | ||
socket.onerror = (error?: any):void => { | ||
console.error(`JSON RPC Socket error ${url}`, error); | ||
}; | ||
} else { | ||
socket.onerror = onerror; | ||
} | ||
|
||
return new Promise<JsonRpcSocket>((resolve, reject) => { | ||
socket.addEventListener('open', () => { | ||
const jsonRpcSocket = new JsonRpcSocket(socket, responseTimeout); | ||
|
||
socket.addEventListener('message', (event: { data: any }) => { | ||
const jsonRpcResponse = parseJson(event.data) as JsonRpcResponse; | ||
const handler = jsonRpcSocket.messageHandlers.get(jsonRpcResponse.id); | ||
if (handler) { | ||
handler(event); | ||
} | ||
}); | ||
|
||
resolve(jsonRpcSocket); | ||
}); | ||
|
||
socket.addEventListener('error', (error: any) => { | ||
reject(error); | ||
}); | ||
|
||
setTimeout(() => reject, connectTimeout); | ||
}); | ||
} | ||
|
||
close(): void { | ||
this.socket.close(); | ||
} | ||
|
||
/** | ||
* Sends a JSON-RPC request through the socket and waits for a single response. | ||
*/ | ||
async request(request: JsonRpcRequest): Promise<JsonRpcResponse> { | ||
return new Promise((resolve, reject) => { | ||
request.id ??= cryptoUtils.randomUuid(); | ||
|
||
const handleResponse = (event: { data: any }):void => { | ||
const jsonRpsResponse = parseJson(event.data) as JsonRpcResponse; | ||
if (jsonRpsResponse.id === request.id) { | ||
// if the incoming response id matches the request id, we will remove the listener and resolve the response | ||
this.messageHandlers.delete(request.id); | ||
return resolve(jsonRpsResponse); | ||
} | ||
}; | ||
|
||
// add the listener to the map of message handlers | ||
this.messageHandlers.set(request.id, handleResponse); | ||
this.send(request); | ||
|
||
// reject this promise if we don't receive any response back within the timeout period | ||
setTimeout(() => { | ||
this.messageHandlers.delete(request.id!); | ||
reject(new Error('request timed out')); | ||
}, this.responseTimeout); | ||
}); | ||
} | ||
|
||
/** | ||
* Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive. | ||
* Returns a close method to clean up the listener. | ||
*/ | ||
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ | ||
response: JsonRpcResponse; | ||
close?: () => Promise<void>; | ||
}> { | ||
|
||
if (!request.method.startsWith('rpc.subscribe.')) { | ||
throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix'); | ||
} | ||
|
||
if (!request.subscription) { | ||
throw new Error('subscribe rpc requests must include subscribe options'); | ||
} | ||
|
||
const subscriptionId = request.subscription.id; | ||
const socketEventListener = (event: { data: any }):void => { | ||
const jsonRpcResponse = parseJson(event.data.toString()) as JsonRpcResponse; | ||
if (jsonRpcResponse.id === subscriptionId) { | ||
if (jsonRpcResponse.error !== undefined) { | ||
// remove the event listener upon receipt of a JSON RPC Error. | ||
this.messageHandlers.delete(subscriptionId); | ||
this.closeSubscription(subscriptionId); | ||
} | ||
listener(jsonRpcResponse); | ||
} | ||
}; | ||
|
||
this.messageHandlers.set(subscriptionId, socketEventListener); | ||
|
||
const response = await this.request(request); | ||
if (response.error) { | ||
this.messageHandlers.delete(subscriptionId); | ||
return { response }; | ||
} | ||
|
||
// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription | ||
const close = async (): Promise<void> => { | ||
this.messageHandlers.delete(subscriptionId); | ||
await this.closeSubscription(subscriptionId); | ||
}; | ||
|
||
return { | ||
response, | ||
close | ||
}; | ||
} | ||
|
||
private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> { | ||
const requestId = cryptoUtils.randomUuid(); | ||
const request = createJsonRpcSubscriptionRequest(requestId, 'close', id, {}); | ||
return this.request(request); | ||
} | ||
|
||
/** | ||
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response. | ||
*/ | ||
send(request: JsonRpcRequest):void { | ||
this.socket.send(JSON.stringify(request)); | ||
} | ||
} |
Oops, something went wrong.