From c1078c61f45feed1a3a85be5ad66b540faf20f92 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Tue, 9 Jul 2024 16:39:36 -0700 Subject: [PATCH 1/6] Added plugin support --- src/dwn-server.ts | 4 +- src/index.ts | 2 +- src/pluginLoader.ts | 18 ++ src/registration/registration-manager.ts | 4 +- src/storage.ts | 119 ++++++----- src/web5-connect/web5-connect-server.ts | 4 +- tests/http-api.spec.ts | 1 - tests/plugins/data-store-sqlite.ts | 26 +++ .../scenarios/dynamic-plugin-loading.spec.ts | 194 ++++++++++++++++++ tests/test-dwn.ts | 16 +- 10 files changed, 325 insertions(+), 63 deletions(-) create mode 100644 src/pluginLoader.ts create mode 100644 tests/plugins/data-store-sqlite.ts create mode 100644 tests/scenarios/dynamic-plugin-loading.spec.ts diff --git a/src/dwn-server.ts b/src/dwn-server.ts index c4ad608..3c983dc 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -8,7 +8,7 @@ import type { DwnServerConfig } from './config.js'; import log from 'loglevel'; import prefix from 'loglevel-plugin-prefix'; import { config as defaultConfig } from './config.js'; -import { getDWNConfig } from './storage.js'; +import { getDwnConfig } from './storage.js'; import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js'; import { HttpApi } from './http-api.js'; import { RegistrationManager } from './registration/registration-manager.js'; @@ -105,7 +105,7 @@ export class DwnServer { eventStream = new EventEmitterStream(); } - const dwnConfig = getDWNConfig(this.config, { + const dwnConfig = await getDwnConfig(this.config, { didResolver: this.didResolver, tenantGate: registrationManager, eventStream, diff --git a/src/index.ts b/src/index.ts index 77b43b5..9b22c69 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,5 +2,5 @@ export { DwnServerConfig } from './config.js'; export { DwnServer, DwnServerOptions } from './dwn-server.js'; export { HttpApi } from './http-api.js'; export { jsonRpcRouter } from './json-rpc-api.js'; -export { EStoreType, BackendTypes, StoreType } from './storage.js'; +export { StoreType, BackendTypes, DwnStore } from './storage.js'; export { WsApi } from './ws-api.js'; diff --git a/src/pluginLoader.ts b/src/pluginLoader.ts new file mode 100644 index 0000000..b10f82c --- /dev/null +++ b/src/pluginLoader.ts @@ -0,0 +1,18 @@ +/** + * Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class. + */ +export async function loadPlugin(filePath: string): Promise { + try { + const module = await import(filePath); + + // Check if the default export is a class + if (typeof module.default === 'function') { + const instance: T = new module.default() as T; + return instance; + } else { + throw new Error(`Default export at ${filePath} is not a class.`); + } + } catch (error) { + throw new Error(`Failed to load component at ${filePath}: ${error.message}`); + } + } \ No newline at end of file diff --git a/src/registration/registration-manager.ts b/src/registration/registration-manager.ts index d7c221e..518d884 100644 --- a/src/registration/registration-manager.ts +++ b/src/registration/registration-manager.ts @@ -5,7 +5,7 @@ import type { RegistrationData, RegistrationRequest } from "./registration-types import type { ProofOfWorkChallengeModel } from "./proof-of-work-types.js"; import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; import type { ActiveTenantCheckResult, TenantGate } from "@tbd54566975/dwn-sdk-js"; -import { getDialectFromURI } from "../storage.js"; +import { getDialectFromUrl } from "../storage.js"; import { readFileSync } from "fs"; /** @@ -77,7 +77,7 @@ export class RegistrationManager implements TenantGate { }); // Initialize RegistrationStore. - const sqlDialect = getDialectFromURI(new URL(registrationStoreUrl)); + const sqlDialect = getDialectFromUrl(new URL(registrationStoreUrl)); const registrationStore = await RegistrationStore.create(sqlDialect); registrationManager.registrationStore = registrationStore; diff --git a/src/storage.ts b/src/storage.ts index e74b3d4..4949119 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -33,8 +33,9 @@ import pg from 'pg'; import Cursor from 'pg-cursor'; import type { DwnServerConfig } from './config.js'; +import { loadPlugin } from './pluginLoader.js'; -export enum EStoreType { +export enum StoreType { DataStore, MessageStore, EventLog, @@ -48,44 +49,44 @@ export enum BackendTypes { POSTGRES = 'postgres', } -export type StoreType = DataStore | EventLog | MessageStore | ResumableTaskStore; +export type DwnStore = DataStore | EventLog | MessageStore | ResumableTaskStore; -export function getDWNConfig( +export async function getDwnConfig( config : DwnServerConfig, options : { didResolver? : DidResolver, tenantGate? : TenantGate, eventStream? : EventStream, } -): DwnConfig { +): Promise { const { tenantGate, eventStream, didResolver } = options; - const dataStore: DataStore = getStore(config.dataStore, EStoreType.DataStore); - const eventLog: EventLog = getStore(config.eventLog, EStoreType.EventLog); - const messageStore: MessageStore = getStore(config.messageStore, EStoreType.MessageStore); - const resumableTaskStore: ResumableTaskStore = getStore(config.messageStore, EStoreType.ResumableTaskStore); + const dataStore: DataStore = await getStore(config.dataStore, StoreType.DataStore); + const eventLog: EventLog = await getStore(config.eventLog, StoreType.EventLog); + const messageStore: MessageStore = await getStore(config.messageStore, StoreType.MessageStore); + const resumableTaskStore: ResumableTaskStore = await getStore(config.messageStore, StoreType.ResumableTaskStore); return { didResolver, eventStream, eventLog, dataStore, messageStore, resumableTaskStore, tenantGate }; } function getLevelStore( storeURI: URL, - storeType: EStoreType, -): DataStore | MessageStore | EventLog | ResumableTaskStore { + storeType: StoreType, +): DwnStore { switch (storeType) { - case EStoreType.DataStore: + case StoreType.DataStore: return new DataStoreLevel({ blockstoreLocation: storeURI.host + storeURI.pathname + '/DATASTORE', }); - case EStoreType.MessageStore: + case StoreType.MessageStore: return new MessageStoreLevel({ blockstoreLocation: storeURI.host + storeURI.pathname + '/MESSAGESTORE', indexLocation: storeURI.host + storeURI.pathname + '/INDEX', }); - case EStoreType.EventLog: + case StoreType.EventLog: return new EventLogLevel({ location: storeURI.host + storeURI.pathname + '/EVENTLOG', }); - case EStoreType.ResumableTaskStore: + case StoreType.ResumableTaskStore: return new ResumableTaskStoreLevel({ location: storeURI.host + storeURI.pathname + '/RESUMABLE-TASK-STORE', }); @@ -94,42 +95,45 @@ function getLevelStore( } } -function getDBStore( - db: Dialect, - storeType: EStoreType, -): DataStore | MessageStore | EventLog | ResumableTaskStore { +function getSqlStore( + connectionUrl: URL, + storeType: StoreType, +): DwnStore { + const dialect = getDialectFromUrl(connectionUrl); + switch (storeType) { - case EStoreType.DataStore: - return new DataStoreSql(db); - case EStoreType.MessageStore: - return new MessageStoreSql(db); - case EStoreType.EventLog: - return new EventLogSql(db); - case EStoreType.ResumableTaskStore: - return new ResumableTaskStoreSql(db); + case StoreType.DataStore: + return new DataStoreSql(dialect); + case StoreType.MessageStore: + return new MessageStoreSql(dialect); + case StoreType.EventLog: + return new EventLogSql(dialect); + case StoreType.ResumableTaskStore: + return new ResumableTaskStoreSql(dialect); default: - throw new Error('Unexpected db store type'); + throw new Error(`Unsupported store type ${storeType} for SQL store.`); } } -function getStore( - storeString: string, - storeType: EStoreType.DataStore, -): DataStore; -function getStore( - storeString: string, - storeType: EStoreType.EventLog, -): EventLog; -function getStore( - storeString: string, - storeType: EStoreType.MessageStore, -): MessageStore; -function getStore( - storeString: string, - storeType: EStoreType.ResumableTaskStore, -): ResumableTaskStore; -function getStore(storeString: string, storeType: EStoreType): StoreType { - const storeURI = new URL(storeString); +/** + * Check if the given string is a file path. + */ +function isFilePath(configString: string): boolean { + const filePathPrefixes = ['/', './', '../']; + return filePathPrefixes.some(prefix => configString.startsWith(prefix)); +} + +async function getStore(storeString: string, storeType: StoreType.DataStore): Promise; +async function getStore(storeString: string, storeType: StoreType.EventLog): Promise; +async function getStore(storeString: string, storeType: StoreType.MessageStore): Promise; +async function getStore(storeString: string, storeType: StoreType.ResumableTaskStore): Promise; +async function getStore(storeConfigString: string, storeType: StoreType): Promise { + if (isFilePath(storeConfigString)) { + return await loadStoreFromFilePath(storeConfigString, storeType); + } + // else treat the `storeConfigString` as a connection string + + const storeURI = new URL(storeConfigString); switch (storeURI.protocol.slice(0, -1)) { case BackendTypes.LEVEL: @@ -138,14 +142,35 @@ function getStore(storeString: string, storeType: EStoreType): StoreType { case BackendTypes.SQLITE: case BackendTypes.MYSQL: case BackendTypes.POSTGRES: - return getDBStore(getDialectFromURI(storeURI), storeType); + return getSqlStore(storeURI, storeType); default: throw invalidStorageSchemeMessage(storeURI.protocol); } } -export function getDialectFromURI(connectionUrl: URL): Dialect { +/** + * Loads a DWN store plugin of the given type from the given file path. + */ +async function loadStoreFromFilePath( + filePath: string, + storeType: StoreType, +): Promise { + switch (storeType) { + case StoreType.DataStore: + return await loadPlugin(filePath); + case StoreType.EventLog: + return await loadPlugin(filePath); + case StoreType.MessageStore: + return await loadPlugin(filePath); + case StoreType.ResumableTaskStore: + return await loadPlugin(filePath); + default: + throw new Error(`Loading store for unsupported store type ${storeType} from path ${filePath}`); + } +} + +export function getDialectFromUrl(connectionUrl: URL): Dialect { switch (connectionUrl.protocol.slice(0, -1)) { case BackendTypes.SQLITE: const path = connectionUrl.host + connectionUrl.pathname; diff --git a/src/web5-connect/web5-connect-server.ts b/src/web5-connect/web5-connect-server.ts index bf2b1a2..007faa7 100644 --- a/src/web5-connect/web5-connect-server.ts +++ b/src/web5-connect/web5-connect-server.ts @@ -1,4 +1,4 @@ -import { getDialectFromURI } from "../storage.js"; +import { getDialectFromUrl } from "../storage.js"; import { randomUuid } from '@web5/crypto/utils'; import { SqlTtlCache } from "./sql-ttl-cache.js"; @@ -49,7 +49,7 @@ export class Web5ConnectServer { const web5ConnectServer = new Web5ConnectServer({ baseUrl }); // Initialize TTL cache. - const sqlDialect = getDialectFromURI(new URL(sqlTtlCacheUrl)); + const sqlDialect = getDialectFromUrl(new URL(sqlTtlCacheUrl)); web5ConnectServer.cache = await SqlTtlCache.create(sqlDialect); return web5ConnectServer; diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index 2d938f4..d4042ee 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -53,7 +53,6 @@ describe('http api', function () { before(async function () { clock = useFakeTimers({ shouldAdvanceTime: true }); - config.registrationStoreUrl = 'sqlite://'; config.registrationProofOfWorkEnabled = true; config.termsOfServiceFilePath = './tests/fixtures/terms-of-service.txt'; diff --git a/tests/plugins/data-store-sqlite.ts b/tests/plugins/data-store-sqlite.ts new file mode 100644 index 0000000..796d5b0 --- /dev/null +++ b/tests/plugins/data-store-sqlite.ts @@ -0,0 +1,26 @@ +import type { DataStore } from "@tbd54566975/dwn-sdk-js"; +import { DataStoreSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a DataStore plugin that is used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class DataStoreSqlite extends DataStoreSql implements DataStore { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + DataStoreSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/scenarios/dynamic-plugin-loading.spec.ts b/tests/scenarios/dynamic-plugin-loading.spec.ts new file mode 100644 index 0000000..8323df6 --- /dev/null +++ b/tests/scenarios/dynamic-plugin-loading.spec.ts @@ -0,0 +1,194 @@ +import type { JsonRpcSuccessResponse } from '../../src/lib/json-rpc.js'; +import type { Readable } from 'readable-stream'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; +import DataStoreSqlite from '../plugins/data-store-sqlite.js'; +import fetch from 'node-fetch'; +import sinon from 'sinon'; + +import { config } from '../../src/config.js'; +import { createJsonRpcRequest } from '../../src/lib/json-rpc.js'; +import { DwnServer } from '../../src/dwn-server.js'; +import { getFileAsReadStream } from '../utils.js'; +import { v4 as uuidv4 } from 'uuid'; + +import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; +import { DidDht, DidKey, UniversalResolver } from '@web5/dids'; + +// node.js 18 and earlier needs globalThis.crypto polyfill +if (!globalThis.crypto) { + // @ts-ignore + globalThis.crypto = webcrypto; +} + +chai.use(chaiAsPromised); + +describe('Dynamic DWN plugin loading', function () { + let dwnServer: DwnServer; + + afterEach(async () => { + // clock.restore(); + if (dwnServer !== undefined) { + await dwnServer.stop(); + } + }); + + it('should fail dynamically loading a non-existent plugin', async () => { + const dwnServerConfigCopy = { ...config }; // not touching the original config + dwnServerConfigCopy.dataStore = './non-existent-plugin.js'; + + const invalidDwnServer = new DwnServer({ config: dwnServerConfigCopy }); + await expect(invalidDwnServer.start()).to.be.rejectedWith('Failed to load component at ./non-existent-plugin.js'); + }); + + it('should be able to dynamically load and use custom data store implementation', async () => { + // Scenario: + // 1. Configure DWN to load a custom data store plugin. + // 2. Validate that the constructor of the plugin is called. + // 3. Validate that the DWN instance is using the custom data store plugin. + + // NOTE: was not able to spy on constructor directly, so spying on a method that is called in the constructor + const customDataStoreConstructorSpy = sinon.spy(DataStoreSqlite, 'spyingTheConstructor'); + + // 1. Configure DWN to load a custom data store plugin. + const dwnServerConfigCopy = { ...config }; // not touching the original config + dwnServerConfigCopy.registrationStoreUrl = undefined; // allow all traffic + dwnServerConfigCopy.messageStore = 'sqlite://'; + dwnServerConfigCopy.dataStore = '../tests/plugins/data-store-sqlite.js'; + dwnServerConfigCopy.eventLog = 'sqlite://'; + + // 2. Validate that the constructor of the plugin is called. + // CRITICAL: We need to create a custom DID resolver that does not use a LevelDB based cache (which is the default cache used in `DWN`) + // otherwise we will receive a `Database is not open` coming from LevelDB. + // This is likely due to the fact that LevelDB is the default cache used in `DWN`, and we have tests creating default DWN instances, + // so here we have to create a DWN that does not use the same LevelDB cache to avoid hitting LevelDB locked issues. + // Long term we should investigate and unify approach of DWN instantiation taken by tests to avoid this "workaround" entirely. + const didResolver = new UniversalResolver({ + didResolvers : [DidDht, DidKey], + }); + dwnServer = new DwnServer({ config: dwnServerConfigCopy, didResolver }); + await dwnServer.start(); + expect(customDataStoreConstructorSpy.calledOnce).to.be.true; + + // 3. Validate that the DWN instance is using the custom data store plugin. + const dwnUrl = `${dwnServerConfigCopy.baseUrl}:${dwnServerConfigCopy.port}`; + await sanityTestDwnReadWrite(dwnUrl); + }); +}); + +/** + * Sanity test RecordsWrite and RecordsRead on the DWN instance. + */ +async function sanityTestDwnReadWrite(dwnUrl: string): Promise { + const alice = await TestDataGenerator.generateDidKeyPersona(); + const aliceSigner = Jws.createSigner(alice); + // await registrationManager.recordTenantRegistration({ did: alice.did, termsOfServiceHash: registrationManager.getTermsOfServiceHash()}); + + // install minimal protocol on Alice's DWN + const protocolDefinition = { + protocol: 'http://minimal.xyz', + published: false, + types: { + foo: {} + }, + structure: { + foo: {} + } + }; + + const protocolsConfig = await ProtocolsConfigure.create({ + signer: aliceSigner, + definition: protocolDefinition + }); + + const protocolConfigureRequestId = uuidv4(); + const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', { + target: alice.did, + message: protocolsConfig.message, + }); + const protocolConfigureResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(protocolConfigureRequest), + } + }); + const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse; + + expect(protocolConfigureResponse.status).to.equal(200); + expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202); + + // Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store. + const filePath = './fixtures/test.jpeg'; + const { + cid: dataCid, + size: dataSize, + stream + } = await getFileAsReadStream(filePath); + expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded); + + const recordsWrite = await RecordsWrite.create({ + signer: aliceSigner, + dataFormat: 'image/jpeg', + dataCid, + dataSize + }); + + const recordsWriteRequestId = uuidv4(); + const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsWrite.message, + }); + const recordsWriteResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsWriteRequest), + }, + body: stream + }); + const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse; + + expect(recordsWriteResponse.status).to.equal(200); + expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202); + + // Alice reading the file back out. + const recordsRead = await RecordsRead.create({ + signer: aliceSigner, + filter: { + recordId: recordsWrite.message.recordId, + }, + }); + + const recordsReadRequestId = uuidv4(); + const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsRead.message + }); + + const recordsReadResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsReadRequest), + }, + }); + + expect(recordsReadResponse.status).to.equal(200); + + const { headers } = recordsReadResponse; + const contentType = headers.get('content-type'); + expect(contentType).to.not.be.undefined; + expect(contentType).to.equal('application/octet-stream'); + + const recordsReadDwnResponse = headers.get('dwn-response'); + expect(recordsReadDwnResponse).to.not.be.undefined; + + const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse; + expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId); + expect(recordsReadJsonRpcResponse.error).to.not.exist; + expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200); + expect(recordsReadJsonRpcResponse.result.reply.record).to.exist; + + // can't get response as stream from supertest :( + const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable); + expect(cid).to.equal(dataCid); +} diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index 005e00d..8ce33d7 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,25 +1,25 @@ import type { TenantGate } from '@tbd54566975/dwn-sdk-js'; -import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; + +import { getDialectFromUrl } from '../src/storage.js'; import { DataStoreSql, EventLogSql, MessageStoreSql, ResumableTaskStoreSql, } from '@tbd54566975/dwn-sql-store'; - -import { getDialectFromURI } from '../src/storage.js'; import { DidDht, DidIon, DidKey, UniversalResolver } from '@web5/dids'; +import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; export async function getTestDwn(options: { tenantGate?: TenantGate, withEvents?: boolean, } = {}): Promise { const { tenantGate, withEvents = false } = options; - const db = getDialectFromURI(new URL('sqlite://')); - const dataStore = new DataStoreSql(db); - const eventLog = new EventLogSql(db); - const messageStore = new MessageStoreSql(db); - const resumableTaskStore = new ResumableTaskStoreSql(db); + const dialect = getDialectFromUrl(new URL('sqlite://')); + const dataStore = new DataStoreSql(dialect); + const eventLog = new EventLogSql(dialect); + const messageStore = new MessageStoreSql(dialect); + const resumableTaskStore = new ResumableTaskStoreSql(dialect); const eventStream = withEvents ? new EventEmitterStream() : undefined; // NOTE: no resolver cache used here to avoid locking LevelDB From bcd669b992b3bd25fe3c0924c9833c1254015aa0 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Tue, 9 Jul 2024 18:35:37 -0700 Subject: [PATCH 2/6] Exposed config for EventStream + Updated plugin test --- README.md | 1 + src/config.ts | 6 + src/dwn-server.ts | 11 +- src/{pluginLoader.ts => plugin-loader.ts} | 13 +- src/storage.ts | 39 +++-- tests/common-scenario-validator.ts | 142 ++++++++++++++++++ tests/http-api.spec.ts | 130 +--------------- tests/plugins/data-store-sqlite.ts | 2 +- tests/plugins/event-log-sqlite.ts | 26 ++++ tests/plugins/event-stream-in-memory.ts | 24 +++ tests/plugins/message-store-sqlite.ts | 26 ++++ .../scenarios/dynamic-plugin-loading.spec.ts | 141 ++--------------- tests/utils.ts | 48 ------ 13 files changed, 285 insertions(+), 324 deletions(-) rename src/{pluginLoader.ts => plugin-loader.ts} (60%) create mode 100644 tests/common-scenario-validator.ts create mode 100644 tests/plugins/event-log-sqlite.ts create mode 100644 tests/plugins/event-stream-in-memory.ts create mode 100644 tests/plugins/message-store-sqlite.ts diff --git a/README.md b/README.md index e5e680f..5490ef2 100644 --- a/README.md +++ b/README.md @@ -282,6 +282,7 @@ Configuration can be set using environment variables | `DS_MAX_RECORD_DATA_SIZE` | Maximum size for `RecordsWrite` data. use `b`, `kb`, `mb`, `gb` for value | `1gb` | | `DS_WEBSOCKET_SERVER` | Whether to enable listening over `ws:`. values: `on`,`off` | `on` | | `DWN_BASE_URL` | Base external URL of this DWN. Used to construct URL paths such as the `Request URI` for the Web5 Connect flow. | `http://localhost` | +| `DWN_EVENT_STREAM_PLUGIN_PATH` | Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty. | unset | | `DWN_REGISTRATION_STORE_URL` | URL to use for storage of registered DIDs. Leave unset to if DWN does not require registration (ie. open for all) | unset | | `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset | | `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` | diff --git a/src/config.ts b/src/config.ts index f6ce95a..de3c5ba 100644 --- a/src/config.ts +++ b/src/config.ts @@ -46,6 +46,12 @@ export const config = { // whether to enable 'ws:' webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true, + + /** + * Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty. + */ + eventStreamPluginPath: process.env.DWN_EVENT_STREAM_PLUGIN_PATH, + // where to store persistent data messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data', dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data', diff --git a/src/dwn-server.ts b/src/dwn-server.ts index 3c983dc..316e078 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -11,6 +11,7 @@ import { config as defaultConfig } from './config.js'; import { getDwnConfig } from './storage.js'; import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js'; import { HttpApi } from './http-api.js'; +import { PluginLoader } from './plugin-loader.js'; import { RegistrationManager } from './registration/registration-manager.js'; import { WsApi } from './ws-api.js'; import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; @@ -100,9 +101,13 @@ export class DwnServer { let eventStream: EventStream | undefined; if (this.config.webSocketSupport) { - // setting `EventEmitterStream` as default the default `EventStream - // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. - eventStream = new EventEmitterStream(); + // If Even Stream plugin is not specified, use `EventEmitterStream` implementation as default. + if (this.config.eventStreamPluginPath === undefined || this.config.eventStreamPluginPath === '') { + eventStream = new EventEmitterStream(); + } else { + eventStream = await PluginLoader.loadPlugin(this.config.eventStreamPluginPath); + } + } const dwnConfig = await getDwnConfig(this.config, { diff --git a/src/pluginLoader.ts b/src/plugin-loader.ts similarity index 60% rename from src/pluginLoader.ts rename to src/plugin-loader.ts index b10f82c..fd5bc4f 100644 --- a/src/pluginLoader.ts +++ b/src/plugin-loader.ts @@ -1,10 +1,14 @@ /** - * Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class. + * A utility class for dynamically loading plugins from file paths. */ -export async function loadPlugin(filePath: string): Promise { +export class PluginLoader { + /** + * Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class. + */ + public static async loadPlugin(filePath: string): Promise { try { const module = await import(filePath); - + // Check if the default export is a class if (typeof module.default === 'function') { const instance: T = new module.default() as T; @@ -15,4 +19,5 @@ export async function loadPlugin(filePath: string): Promise { } catch (error) { throw new Error(`Failed to load component at ${filePath}: ${error.message}`); } - } \ No newline at end of file + } +} diff --git a/src/storage.ts b/src/storage.ts index 4949119..8c7b50c 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -1,11 +1,3 @@ -import * as fs from 'fs'; - -import { - DataStoreLevel, - EventLogLevel, - MessageStoreLevel, - ResumableTaskStoreLevel, -} from '@tbd54566975/dwn-sdk-js'; import type { DidResolver } from '@web5/dids'; import type { DataStore, @@ -17,6 +9,21 @@ import type { TenantGate, } from '@tbd54566975/dwn-sdk-js'; import type { Dialect } from '@tbd54566975/dwn-sql-store'; +import type { DwnServerConfig } from './config.js'; + +import * as fs from 'fs'; +import Cursor from 'pg-cursor'; +import Database from 'better-sqlite3'; +import pg from 'pg'; +import { createPool as MySQLCreatePool } from 'mysql2'; +import { PluginLoader } from './plugin-loader.js'; + +import { + DataStoreLevel, + EventLogLevel, + MessageStoreLevel, + ResumableTaskStoreLevel, +} from '@tbd54566975/dwn-sdk-js'; import { DataStoreSql, EventLogSql, @@ -27,14 +34,6 @@ import { SqliteDialect, } from '@tbd54566975/dwn-sql-store'; -import Database from 'better-sqlite3'; -import { createPool as MySQLCreatePool } from 'mysql2'; -import pg from 'pg'; -import Cursor from 'pg-cursor'; - -import type { DwnServerConfig } from './config.js'; -import { loadPlugin } from './pluginLoader.js'; - export enum StoreType { DataStore, MessageStore, @@ -158,13 +157,13 @@ async function loadStoreFromFilePath( ): Promise { switch (storeType) { case StoreType.DataStore: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); case StoreType.EventLog: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); case StoreType.MessageStore: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); case StoreType.ResumableTaskStore: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); default: throw new Error(`Loading store for unsupported store type ${storeType} from path ${filePath}`); } diff --git a/tests/common-scenario-validator.ts b/tests/common-scenario-validator.ts new file mode 100644 index 0000000..46050b4 --- /dev/null +++ b/tests/common-scenario-validator.ts @@ -0,0 +1,142 @@ +import type { JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; +import type { Persona } from '@tbd54566975/dwn-sdk-js'; +import type { Readable } from 'readable-stream'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; +import fetch from 'node-fetch'; + +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { getFileAsReadStream } from './utils.js'; +import { v4 as uuidv4 } from 'uuid'; + +import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + + +// node.js 18 and earlier needs globalThis.crypto polyfill +if (!globalThis.crypto) { + // @ts-ignore + globalThis.crypto = webcrypto; +} + +chai.use(chaiAsPromised); + +/** + * Validator of common scenarios. + */ +export default class CommonScenarioValidator { + /** + * Sanity test RecordsWrite and RecordsRead on the DWN instance. + */ + public static async sanityTestDwnReadWrite(dwnUrl: string, persona?: Persona): Promise { + const alice = persona || await TestDataGenerator.generateDidKeyPersona(); + const aliceSigner = Jws.createSigner(alice); + + // install minimal protocol on Alice's DWN + const protocolDefinition = { + protocol: 'http://minimal.xyz', + published: false, + types: { + foo: {} + }, + structure: { + foo: {} + } + }; + + const protocolsConfig = await ProtocolsConfigure.create({ + signer: aliceSigner, + definition: protocolDefinition + }); + + const protocolConfigureRequestId = uuidv4(); + const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', { + target: alice.did, + message: protocolsConfig.message, + }); + const protocolConfigureResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(protocolConfigureRequest), + } + }); + const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse; + + expect(protocolConfigureResponse.status).to.equal(200); + expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202); + + // Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store. + const filePath = './fixtures/test.jpeg'; + const { + cid: dataCid, + size: dataSize, + stream + } = await getFileAsReadStream(filePath); + expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded); + + const recordsWrite = await RecordsWrite.create({ + signer: aliceSigner, + dataFormat: 'image/jpeg', + dataCid, + dataSize + }); + + const recordsWriteRequestId = uuidv4(); + const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsWrite.message, + }); + const recordsWriteResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsWriteRequest), + }, + body: stream + }); + const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse; + + expect(recordsWriteResponse.status).to.equal(200); + expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202); + + // Alice reading the file back out. + const recordsRead = await RecordsRead.create({ + signer: aliceSigner, + filter: { + recordId: recordsWrite.message.recordId, + }, + }); + + const recordsReadRequestId = uuidv4(); + const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsRead.message + }); + + const recordsReadResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsReadRequest), + }, + }); + + expect(recordsReadResponse.status).to.equal(200); + + const { headers } = recordsReadResponse; + const contentType = headers.get('content-type'); + expect(contentType).to.not.be.undefined; + expect(contentType).to.equal('application/octet-stream'); + + const recordsReadDwnResponse = headers.get('dwn-response'); + expect(recordsReadDwnResponse).to.not.be.undefined; + + const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse; + expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId); + expect(recordsReadJsonRpcResponse.error).to.not.exist; + expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200); + expect(recordsReadJsonRpcResponse.result.reply.record).to.exist; + + // can't get response as stream from supertest :( + const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable); + expect(cid).to.equal(dataCid); + } +} diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index d4042ee..c49a3a4 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -1,12 +1,10 @@ // node.js 18 and earlier, needs globalThis.crypto polyfill import sinon from 'sinon'; import { - Cid, DataStream, DwnErrorCode, ProtocolsConfigure, RecordsQuery, - RecordsRead, TestDataGenerator, Time, } from '@tbd54566975/dwn-sdk-js'; @@ -35,9 +33,9 @@ import { createRecordsWriteMessage, getDwnResponse, getFileAsReadStream, - streamHttpRequest, } from './utils.js'; import { RegistrationManager } from '../src/registration/registration-manager.js'; +import CommonScenarioValidator from './common-scenario-validator.js'; if (!globalThis.crypto) { // @ts-ignore @@ -53,6 +51,7 @@ describe('http api', function () { before(async function () { clock = useFakeTimers({ shouldAdvanceTime: true }); + // TODO: Remove direct use of default config to avoid changes bleed/pollute between tests - https://github.com/TBD54566975/dwn-server/issues/144 config.registrationStoreUrl = 'sqlite://'; config.registrationProofOfWorkEnabled = true; config.termsOfServiceFilePath = './tests/fixtures/terms-of-service.txt'; @@ -198,44 +197,14 @@ describe('http api', function () { }); }); - describe('RecordsWrite', function () { - it('handles RecordsWrite with request body', async function () { - const filePath = './fixtures/test.jpeg'; - const { cid, size, stream } = await getFileAsReadStream(filePath); - - const { recordsWrite } = await createRecordsWriteMessage(alice, { - dataCid: cid, - dataSize: size, - }); - - const requestId = uuidv4(); - const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - message: recordsWrite.toJSON(), - target: alice.did, - }); - - const resp = await streamHttpRequest( - 'http://localhost:3000', - { - method: 'POST', - headers: { - 'content-type': 'application/octet-stream', - 'dwn-request': JSON.stringify(dwnRequest), - }, - }, - stream, - ); - - expect(resp.status).to.equal(200); - - const body = JSON.parse(resp.body) as JsonRpcResponse; - expect(body.id).to.equal(requestId); - expect(body.error).to.not.exist; - - const { reply } = body.result; - expect(reply.status.code).to.equal(202); + describe('P0 Scenarios', function () { + it('should be able to read and write a protocol record', async function () { + const dwnUrl = `${config.baseUrl}:${config.port}`; + await CommonScenarioValidator.sanityTestDwnReadWrite(dwnUrl, alice) }); + }); + describe('RecordsWrite', function () { it('handles RecordsWrite overwrite that does not mutate data', async function () { // First RecordsWrite that creates the record. const { recordsWrite: initialWrite, dataStream } = @@ -331,89 +300,6 @@ describe('http api', function () { }); }); - describe('RecordsRead', function () { - it('returns message in response header and data in body', async function () { - const filePath = './fixtures/test.jpeg'; - const { - cid: expectedCid, - size, - stream, - } = await getFileAsReadStream(filePath); - - const { recordsWrite } = await createRecordsWriteMessage(alice, { - dataCid: expectedCid, - dataSize: size, - }); - - let requestId = uuidv4(); - let dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - message: recordsWrite.toJSON(), - target: alice.did, - }); - - let response = await fetch('http://localhost:3000', { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(dwnRequest), - }, - body: stream, - }); - - expect(response.status).to.equal(200); - - const body = (await response.json()) as JsonRpcResponse; - expect(body.id).to.equal(requestId); - expect(body.error).to.not.exist; - - const { reply } = body.result; - expect(reply.status.code).to.equal(202); - - const recordsRead = await RecordsRead.create({ - signer: alice.signer, - filter: { - recordId: recordsWrite.message.recordId, - }, - }); - - requestId = uuidv4(); - dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - target: alice.did, - message: recordsRead.toJSON(), - }); - - response = await fetch('http://localhost:3000', { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(dwnRequest), - }, - }); - - expect(response.status).to.equal(200); - - const { headers } = response; - - const contentType = headers.get('content-type'); - expect(contentType).to.not.be.undefined; - expect(contentType).to.equal('application/octet-stream'); - - const dwnResponse = headers.get('dwn-response'); - expect(dwnResponse).to.not.be.undefined; - - const jsonRpcResponse = JSON.parse(dwnResponse) as JsonRpcResponse; - - expect(jsonRpcResponse.id).to.equal(requestId); - expect(jsonRpcResponse.error).to.not.exist; - - const { reply: recordsReadReply } = jsonRpcResponse.result; - expect(recordsReadReply.status.code).to.equal(200); - expect(recordsReadReply.record).to.exist; - - // can't get response as stream from supertest :( - const cid = await Cid.computeDagPbCidFromStream(response.body as any); - expect(cid).to.equal(expectedCid); - }); - }); - describe('/:did/records/:id', function () { it('returns record data if record is published', async function () { const filePath = './fixtures/test.jpeg'; diff --git a/tests/plugins/data-store-sqlite.ts b/tests/plugins/data-store-sqlite.ts index 796d5b0..9f46c68 100644 --- a/tests/plugins/data-store-sqlite.ts +++ b/tests/plugins/data-store-sqlite.ts @@ -3,7 +3,7 @@ import { DataStoreSql } from "@tbd54566975/dwn-sql-store"; import { getDialectFromUrl } from "../../src/storage.js"; /** - * An example of a DataStore plugin that is used for testing. + * An example of a plugin that is used for testing. * The points to note are: * - The class must be a default export. * - The constructor must not take any arguments. diff --git a/tests/plugins/event-log-sqlite.ts b/tests/plugins/event-log-sqlite.ts new file mode 100644 index 0000000..c3078b1 --- /dev/null +++ b/tests/plugins/event-log-sqlite.ts @@ -0,0 +1,26 @@ +import type { EventLog } from "@tbd54566975/dwn-sdk-js"; +import { EventLogSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a plugin. Used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class EventLogSqlite extends EventLogSql implements EventLog { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + EventLogSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/plugins/event-stream-in-memory.ts b/tests/plugins/event-stream-in-memory.ts new file mode 100644 index 0000000..7653ed5 --- /dev/null +++ b/tests/plugins/event-stream-in-memory.ts @@ -0,0 +1,24 @@ +import type { EventStream } from "@tbd54566975/dwn-sdk-js"; +import { EventEmitterStream } from "@tbd54566975/dwn-sdk-js"; + +/** + * An example of a plugin that is used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class EventStreamInMemory extends EventEmitterStream implements EventStream { + constructor() { + super(); + + // NOTE: the following line is added purely to test the constructor invocation. + EventStreamInMemory.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/plugins/message-store-sqlite.ts b/tests/plugins/message-store-sqlite.ts new file mode 100644 index 0000000..7cb65c6 --- /dev/null +++ b/tests/plugins/message-store-sqlite.ts @@ -0,0 +1,26 @@ +import type { MessageStore } from "@tbd54566975/dwn-sdk-js"; +import { MessageStoreSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a plugin. Used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class MessageStoreSqlite extends MessageStoreSql implements MessageStore { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + MessageStoreSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/scenarios/dynamic-plugin-loading.spec.ts b/tests/scenarios/dynamic-plugin-loading.spec.ts index 8323df6..84ac281 100644 --- a/tests/scenarios/dynamic-plugin-loading.spec.ts +++ b/tests/scenarios/dynamic-plugin-loading.spec.ts @@ -1,20 +1,15 @@ -import type { JsonRpcSuccessResponse } from '../../src/lib/json-rpc.js'; -import type { Readable } from 'readable-stream'; - import chaiAsPromised from 'chai-as-promised'; import chai, { expect } from 'chai'; import DataStoreSqlite from '../plugins/data-store-sqlite.js'; -import fetch from 'node-fetch'; +import EventLogSqlite from '../plugins/event-log-sqlite.js'; +import EventStreamInMemory from '../plugins/event-stream-in-memory.js'; import sinon from 'sinon'; import { config } from '../../src/config.js'; -import { createJsonRpcRequest } from '../../src/lib/json-rpc.js'; import { DwnServer } from '../../src/dwn-server.js'; -import { getFileAsReadStream } from '../utils.js'; -import { v4 as uuidv4 } from 'uuid'; -import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; import { DidDht, DidKey, UniversalResolver } from '@web5/dids'; +import CommonScenarioValidator from '../common-scenario-validator.js'; // node.js 18 and earlier needs globalThis.crypto polyfill if (!globalThis.crypto) { @@ -50,13 +45,21 @@ describe('Dynamic DWN plugin loading', function () { // NOTE: was not able to spy on constructor directly, so spying on a method that is called in the constructor const customDataStoreConstructorSpy = sinon.spy(DataStoreSqlite, 'spyingTheConstructor'); + const customEventLogConstructorSpy = sinon.spy(EventLogSqlite, 'spyingTheConstructor'); + const customEventStreamConstructorSpy = sinon.spy(EventStreamInMemory, 'spyingTheConstructor'); // 1. Configure DWN to load a custom data store plugin. const dwnServerConfigCopy = { ...config }; // not touching the original config + + // TODO: remove below after https://github.com/TBD54566975/dwn-server/issues/144 is resolved + // The default config is not reliable because other tests modify it. dwnServerConfigCopy.registrationStoreUrl = undefined; // allow all traffic + + // dwnServerConfigCopy.messageStore = '../tests/plugins/message-store-sqlite.js'; // not working dwnServerConfigCopy.messageStore = 'sqlite://'; dwnServerConfigCopy.dataStore = '../tests/plugins/data-store-sqlite.js'; - dwnServerConfigCopy.eventLog = 'sqlite://'; + dwnServerConfigCopy.eventLog = '../tests/plugins/event-log-sqlite.js'; + dwnServerConfigCopy.eventStreamPluginPath = '../tests/plugins/event-stream-in-memory.js'; // 2. Validate that the constructor of the plugin is called. // CRITICAL: We need to create a custom DID resolver that does not use a LevelDB based cache (which is the default cache used in `DWN`) @@ -70,125 +73,11 @@ describe('Dynamic DWN plugin loading', function () { dwnServer = new DwnServer({ config: dwnServerConfigCopy, didResolver }); await dwnServer.start(); expect(customDataStoreConstructorSpy.calledOnce).to.be.true; + expect(customEventLogConstructorSpy.calledOnce).to.be.true; + expect(customEventStreamConstructorSpy.calledOnce).to.be.true; // 3. Validate that the DWN instance is using the custom data store plugin. const dwnUrl = `${dwnServerConfigCopy.baseUrl}:${dwnServerConfigCopy.port}`; - await sanityTestDwnReadWrite(dwnUrl); + await CommonScenarioValidator.sanityTestDwnReadWrite(dwnUrl); }); }); - -/** - * Sanity test RecordsWrite and RecordsRead on the DWN instance. - */ -async function sanityTestDwnReadWrite(dwnUrl: string): Promise { - const alice = await TestDataGenerator.generateDidKeyPersona(); - const aliceSigner = Jws.createSigner(alice); - // await registrationManager.recordTenantRegistration({ did: alice.did, termsOfServiceHash: registrationManager.getTermsOfServiceHash()}); - - // install minimal protocol on Alice's DWN - const protocolDefinition = { - protocol: 'http://minimal.xyz', - published: false, - types: { - foo: {} - }, - structure: { - foo: {} - } - }; - - const protocolsConfig = await ProtocolsConfigure.create({ - signer: aliceSigner, - definition: protocolDefinition - }); - - const protocolConfigureRequestId = uuidv4(); - const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', { - target: alice.did, - message: protocolsConfig.message, - }); - const protocolConfigureResponse = await fetch(dwnUrl, { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(protocolConfigureRequest), - } - }); - const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse; - - expect(protocolConfigureResponse.status).to.equal(200); - expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202); - - // Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store. - const filePath = './fixtures/test.jpeg'; - const { - cid: dataCid, - size: dataSize, - stream - } = await getFileAsReadStream(filePath); - expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded); - - const recordsWrite = await RecordsWrite.create({ - signer: aliceSigner, - dataFormat: 'image/jpeg', - dataCid, - dataSize - }); - - const recordsWriteRequestId = uuidv4(); - const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', { - target: alice.did, - message: recordsWrite.message, - }); - const recordsWriteResponse = await fetch(dwnUrl, { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(recordsWriteRequest), - }, - body: stream - }); - const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse; - - expect(recordsWriteResponse.status).to.equal(200); - expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202); - - // Alice reading the file back out. - const recordsRead = await RecordsRead.create({ - signer: aliceSigner, - filter: { - recordId: recordsWrite.message.recordId, - }, - }); - - const recordsReadRequestId = uuidv4(); - const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', { - target: alice.did, - message: recordsRead.message - }); - - const recordsReadResponse = await fetch(dwnUrl, { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(recordsReadRequest), - }, - }); - - expect(recordsReadResponse.status).to.equal(200); - - const { headers } = recordsReadResponse; - const contentType = headers.get('content-type'); - expect(contentType).to.not.be.undefined; - expect(contentType).to.equal('application/octet-stream'); - - const recordsReadDwnResponse = headers.get('dwn-response'); - expect(recordsReadDwnResponse).to.not.be.undefined; - - const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse; - expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId); - expect(recordsReadJsonRpcResponse.error).to.not.exist; - expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200); - expect(recordsReadJsonRpcResponse.result.reply.record).to.exist; - - // can't get response as stream from supertest :( - const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable); - expect(cid).to.equal(dataCid); -} diff --git a/tests/utils.ts b/tests/utils.ts index 105be82..33f67d4 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -2,9 +2,7 @@ import type { GenericMessage, Persona, UnionMessageReply } from '@tbd54566975/dw import type { Response } from 'node-fetch'; import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js'; -import type { ReadStream } from 'node:fs'; import fs from 'node:fs'; -import http from 'node:http'; import path from 'path'; import { v4 as uuidv4 } from 'uuid'; import fetch from 'node-fetch'; @@ -107,52 +105,6 @@ export function getDwnResponse(response: Response): UnionMessageReply { return JSON.parse(response.headers.get('dwn-response') as string) as UnionMessageReply; } -type HttpResponse = { - status: number; - headers: http.IncomingHttpHeaders; - body?: any; -}; - -export function streamHttpRequest( - url: string, - opts: http.RequestOptions, - bodyStream: ReadStream, -): Promise { - return new Promise((resolve, reject) => { - const request = http.request(url, opts, (rawResponse) => { - rawResponse.setEncoding('utf8'); - - const response: HttpResponse = { - status: rawResponse.statusCode, - headers: rawResponse.headers, - }; - - let body = ''; - rawResponse.on('data', (chunk) => { - body += chunk; - }); - - rawResponse.on('end', () => { - if (body) { - response.body = body; - } - - return resolve(response); - }); - }); - - request.on('error', (e) => { - return reject(e); - }); - - bodyStream.on('end', () => { - request.end(); - }); - - bodyStream.pipe(request); - }); -} - export async function sendHttpMessage(options: { url: string, target: string, From c25f790b88a8b36fe85d8d4baf5388ad0369a3cc Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Tue, 9 Jul 2024 18:58:13 -0700 Subject: [PATCH 3/6] Exposed resumable task store in config --- src/config.ts | 1 + src/storage.ts | 2 +- tests/plugins/resumable-task-store-sqlite.ts | 26 +++++++++++++++++++ .../scenarios/dynamic-plugin-loading.spec.ts | 11 +++++--- tests/scenarios/registration.spec.ts | 1 + 5 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 tests/plugins/resumable-task-store-sqlite.ts diff --git a/src/config.ts b/src/config.ts index de3c5ba..389dd3a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -56,6 +56,7 @@ export const config = { messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data', dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data', eventLog: process.env.DWN_STORAGE_EVENTS || process.env.DWN_STORAGE || 'level://data', + resumableTaskStore: process.env.DWN_STORAGE_RESUMABLE_TASKS || process.env.DWN_STORAGE || 'level://data', // tenant registration feature configuration registrationStoreUrl: process.env.DWN_REGISTRATION_STORE_URL || process.env.DWN_STORAGE, diff --git a/src/storage.ts b/src/storage.ts index 8c7b50c..087797c 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -62,7 +62,7 @@ export async function getDwnConfig( const dataStore: DataStore = await getStore(config.dataStore, StoreType.DataStore); const eventLog: EventLog = await getStore(config.eventLog, StoreType.EventLog); const messageStore: MessageStore = await getStore(config.messageStore, StoreType.MessageStore); - const resumableTaskStore: ResumableTaskStore = await getStore(config.messageStore, StoreType.ResumableTaskStore); + const resumableTaskStore: ResumableTaskStore = await getStore(config.resumableTaskStore, StoreType.ResumableTaskStore); return { didResolver, eventStream, eventLog, dataStore, messageStore, resumableTaskStore, tenantGate }; } diff --git a/tests/plugins/resumable-task-store-sqlite.ts b/tests/plugins/resumable-task-store-sqlite.ts new file mode 100644 index 0000000..474e8a7 --- /dev/null +++ b/tests/plugins/resumable-task-store-sqlite.ts @@ -0,0 +1,26 @@ +import type { ResumableTaskStore } from "@tbd54566975/dwn-sdk-js"; +import { ResumableTaskStoreSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a plugin that is used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class ResumableTaskStoreSqlite extends ResumableTaskStoreSql implements ResumableTaskStore { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + ResumableTaskStoreSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/scenarios/dynamic-plugin-loading.spec.ts b/tests/scenarios/dynamic-plugin-loading.spec.ts index 84ac281..6bdbb36 100644 --- a/tests/scenarios/dynamic-plugin-loading.spec.ts +++ b/tests/scenarios/dynamic-plugin-loading.spec.ts @@ -10,6 +10,8 @@ import { DwnServer } from '../../src/dwn-server.js'; import { DidDht, DidKey, UniversalResolver } from '@web5/dids'; import CommonScenarioValidator from '../common-scenario-validator.js'; +import MessageStoreSqlite from '../plugins/message-store-sqlite.js'; +import ResumableTaskStoreSqlite from '../plugins/resumable-task-store-sqlite.js'; // node.js 18 and earlier needs globalThis.crypto polyfill if (!globalThis.crypto) { @@ -23,7 +25,6 @@ describe('Dynamic DWN plugin loading', function () { let dwnServer: DwnServer; afterEach(async () => { - // clock.restore(); if (dwnServer !== undefined) { await dwnServer.stop(); } @@ -44,7 +45,9 @@ describe('Dynamic DWN plugin loading', function () { // 3. Validate that the DWN instance is using the custom data store plugin. // NOTE: was not able to spy on constructor directly, so spying on a method that is called in the constructor + const customMessageStoreConstructorSpy = sinon.spy(MessageStoreSqlite, 'spyingTheConstructor'); const customDataStoreConstructorSpy = sinon.spy(DataStoreSqlite, 'spyingTheConstructor'); + const customResumableTaskStoreConstructorSpy = sinon.spy(ResumableTaskStoreSqlite, 'spyingTheConstructor'); const customEventLogConstructorSpy = sinon.spy(EventLogSqlite, 'spyingTheConstructor'); const customEventStreamConstructorSpy = sinon.spy(EventStreamInMemory, 'spyingTheConstructor'); @@ -55,9 +58,9 @@ describe('Dynamic DWN plugin loading', function () { // The default config is not reliable because other tests modify it. dwnServerConfigCopy.registrationStoreUrl = undefined; // allow all traffic - // dwnServerConfigCopy.messageStore = '../tests/plugins/message-store-sqlite.js'; // not working - dwnServerConfigCopy.messageStore = 'sqlite://'; + dwnServerConfigCopy.messageStore = '../tests/plugins/message-store-sqlite.js'; dwnServerConfigCopy.dataStore = '../tests/plugins/data-store-sqlite.js'; + dwnServerConfigCopy.resumableTaskStore = '../tests/plugins/resumable-task-store-sqlite.js'; dwnServerConfigCopy.eventLog = '../tests/plugins/event-log-sqlite.js'; dwnServerConfigCopy.eventStreamPluginPath = '../tests/plugins/event-stream-in-memory.js'; @@ -72,7 +75,9 @@ describe('Dynamic DWN plugin loading', function () { }); dwnServer = new DwnServer({ config: dwnServerConfigCopy, didResolver }); await dwnServer.start(); + expect(customMessageStoreConstructorSpy.calledOnce).to.be.true; expect(customDataStoreConstructorSpy.calledOnce).to.be.true; + expect(customResumableTaskStoreConstructorSpy.calledOnce).to.be.true; expect(customEventLogConstructorSpy.calledOnce).to.be.true; expect(customEventStreamConstructorSpy.calledOnce).to.be.true; diff --git a/tests/scenarios/registration.spec.ts b/tests/scenarios/registration.spec.ts index 987a3c9..aa8e032 100644 --- a/tests/scenarios/registration.spec.ts +++ b/tests/scenarios/registration.spec.ts @@ -50,6 +50,7 @@ describe('Registration scenarios', function () { // and dwn-server.spec.ts already uses LevelDB. dwnServerConfig.messageStore = 'sqlite://', dwnServerConfig.dataStore = 'sqlite://', + dwnServerConfig.resumableTaskStore = 'sqlite://', dwnServerConfig.eventLog = 'sqlite://', // registration config From 63444cef49f50d466ee447c9a184196ccef867b8 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Tue, 9 Jul 2024 19:04:49 -0700 Subject: [PATCH 4/6] minor fixes --- tests/common-scenario-validator.ts | 2 +- tests/scenarios/web5-connect.spec.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/common-scenario-validator.ts b/tests/common-scenario-validator.ts index 46050b4..4d755b2 100644 --- a/tests/common-scenario-validator.ts +++ b/tests/common-scenario-validator.ts @@ -9,10 +9,10 @@ import fetch from 'node-fetch'; import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { getFileAsReadStream } from './utils.js'; import { v4 as uuidv4 } from 'uuid'; +import { webcrypto } from 'node:crypto'; import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; - // node.js 18 and earlier needs globalThis.crypto polyfill if (!globalThis.crypto) { // @ts-ignore diff --git a/tests/scenarios/web5-connect.spec.ts b/tests/scenarios/web5-connect.spec.ts index 6199048..969c5ab 100644 --- a/tests/scenarios/web5-connect.spec.ts +++ b/tests/scenarios/web5-connect.spec.ts @@ -27,6 +27,7 @@ describe('Web5 Connect scenarios', function () { // and dwn-server.spec.ts already uses LevelDB. dwnServerConfig.messageStore = 'sqlite://', dwnServerConfig.dataStore = 'sqlite://', + dwnServerConfig.resumableTaskStore = 'sqlite://', dwnServerConfig.eventLog = 'sqlite://', dwnServer = new DwnServer({ config: dwnServerConfig }); From 2a2db0302746897bfed1b77048a98d1be9ca08b2 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Wed, 10 Jul 2024 11:37:56 -0700 Subject: [PATCH 5/6] Added documentation --- README.md | 27 +++++++++++++++++++++++---- src/plugin-loader.ts | 10 ++-------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 5490ef2..54b7d45 100644 --- a/README.md +++ b/README.md @@ -288,15 +288,16 @@ Configuration can be set using environment variables | `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` | | `DWN_REGISTRATION_PROOF_OF_WORK_INITIAL_MAX_HASH` | Initial maximum allowed hash in 64 char HEX string. The more leading zeros (smaller number) the higher the difficulty. | `false` | | `DWN_STORAGE` | URL to use for storage by default. See [Storage Options](#storage-options) for details | `level://data` | -| `DWN_STORAGE_MESSAGES` | URL to use for storage of messages. | value of `DWN_STORAGE` | -| `DWN_STORAGE_DATA` | URL to use for data storage | value of `DWN_STORAGE` | -| `DWN_STORAGE_EVENTS` | URL to use for event storage | value of `DWN_STORAGE` | +| `DWN_STORAGE_MESSAGES` | Connection URL or file path to custom plugin to use for the message store. | value of `DWN_STORAGE` | +| `DWN_STORAGE_DATA` | Connection URL or file path to custom plugin to use for the data store. | value of `DWN_STORAGE` | +| `DWN_STORAGE_RESUMABLE_TASKS` | Connection URL or file path to custom plugin to use for the resumable task store. | value of `DWN_STORAGE` | +| `DWN_STORAGE_EVENTS` | Connection URL or file path to custom plugin to use for the event store. | value of `DWN_STORAGE` | | `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset | | `DWN_TTL_CACHE_URL` | URL of the TTL cache used by the DWN. Currently only supports SQL databases. | `sqlite://` | ### Storage Options -Several storage formats are supported, and may be configured with the `DWN_STORAGE_*` environment variables: +Several built storage options are supported, and may be configured with the `DWN_STORAGE_*` environment variables: | Database | Example | Notes | | ---------- | ----------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | @@ -305,6 +306,24 @@ Several storage formats are supported, and may be configured with the `DWN_STORA | MySQL | `mysql://user:pass@host/db?debug=true&timezone=-0700` | [all URL options documented here](https://github.com/mysqljs/mysql#connection-options) | | PostgreSQL | `postgres:///dwn` | any options other than the URL scheme (`postgres://`) may also be specified via [standard environment variables](https://node-postgres.com/features/connecting#environment-variables) | +### Plugins +In some scenarios, you may want to provide a custom implementation of a pluggable module for the DWN Server. The following interfaces defined in `dwn-sdk-js` package are supported: + +- `DataStore` +- `MessageStore` +- `ResumableDataStore` +- `EventLog` +- `EventStream` + +To load your custom plugin, specify the absolute path to the `.js` file of your custom implementation using the corresponding environment variable. For instance, use `DWN_STORAGE_DATA` for a custom DWN Data Store. + +Refer to the `tests/plugins/*.ts` files for examples of plugin implementations. In summary, you need to: + +- Implement the corresponding interface from the `dwn-sdk-js` package. For example, implement the `DataStore` interface for a DWN Data Store. +- Ensure that the built `.js` file that will be referenced by the DWN Server config environment variable contains a class that: + 1. Is a default export. This is how DWN Server locates the correct class for instantiation. + 1. Has a public constructor that does not take any arguments. This is how DWN Server instantiates the plugin. + ## Registration Requirements There are multiple optional registration gates, each of which can be enabled (all are disabled by default). Tenants (DIDs) must comply with whatever diff --git a/src/plugin-loader.ts b/src/plugin-loader.ts index fd5bc4f..44a3089 100644 --- a/src/plugin-loader.ts +++ b/src/plugin-loader.ts @@ -8,14 +8,8 @@ export class PluginLoader { public static async loadPlugin(filePath: string): Promise { try { const module = await import(filePath); - - // Check if the default export is a class - if (typeof module.default === 'function') { - const instance: T = new module.default() as T; - return instance; - } else { - throw new Error(`Default export at ${filePath} is not a class.`); - } + const instance: T = new module.default() as T; + return instance; } catch (error) { throw new Error(`Failed to load component at ${filePath}: ${error.message}`); } From d12c607cc962213f5b513fad4675363d76a407e7 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Wed, 10 Jul 2024 11:42:26 -0700 Subject: [PATCH 6/6] md liint --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 54b7d45..5485a89 100644 --- a/README.md +++ b/README.md @@ -307,6 +307,7 @@ Several built storage options are supported, and may be configured with the `DWN | PostgreSQL | `postgres:///dwn` | any options other than the URL scheme (`postgres://`) may also be specified via [standard environment variables](https://node-postgres.com/features/connecting#environment-variables) | ### Plugins + In some scenarios, you may want to provide a custom implementation of a pluggable module for the DWN Server. The following interfaces defined in `dwn-sdk-js` package are supported: - `DataStore`