diff --git a/src/dwn-server.ts b/src/dwn-server.ts index c4ad608..3c983dc 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -8,7 +8,7 @@ import type { DwnServerConfig } from './config.js'; import log from 'loglevel'; import prefix from 'loglevel-plugin-prefix'; import { config as defaultConfig } from './config.js'; -import { getDWNConfig } from './storage.js'; +import { getDwnConfig } from './storage.js'; import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js'; import { HttpApi } from './http-api.js'; import { RegistrationManager } from './registration/registration-manager.js'; @@ -105,7 +105,7 @@ export class DwnServer { eventStream = new EventEmitterStream(); } - const dwnConfig = getDWNConfig(this.config, { + const dwnConfig = await getDwnConfig(this.config, { didResolver: this.didResolver, tenantGate: registrationManager, eventStream, diff --git a/src/index.ts b/src/index.ts index 77b43b5..9b22c69 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,5 +2,5 @@ export { DwnServerConfig } from './config.js'; export { DwnServer, DwnServerOptions } from './dwn-server.js'; export { HttpApi } from './http-api.js'; export { jsonRpcRouter } from './json-rpc-api.js'; -export { EStoreType, BackendTypes, StoreType } from './storage.js'; +export { StoreType, BackendTypes, DwnStore } from './storage.js'; export { WsApi } from './ws-api.js'; diff --git a/src/pluginLoader.ts b/src/pluginLoader.ts new file mode 100644 index 0000000..b10f82c --- /dev/null +++ b/src/pluginLoader.ts @@ -0,0 +1,18 @@ +/** + * Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class. + */ +export async function loadPlugin(filePath: string): Promise { + try { + const module = await import(filePath); + + // Check if the default export is a class + if (typeof module.default === 'function') { + const instance: T = new module.default() as T; + return instance; + } else { + throw new Error(`Default export at ${filePath} is not a class.`); + } + } catch (error) { + throw new Error(`Failed to load component at ${filePath}: ${error.message}`); + } + } \ No newline at end of file diff --git a/src/registration/registration-manager.ts b/src/registration/registration-manager.ts index d7c221e..518d884 100644 --- a/src/registration/registration-manager.ts +++ b/src/registration/registration-manager.ts @@ -5,7 +5,7 @@ import type { RegistrationData, RegistrationRequest } from "./registration-types import type { ProofOfWorkChallengeModel } from "./proof-of-work-types.js"; import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; import type { ActiveTenantCheckResult, TenantGate } from "@tbd54566975/dwn-sdk-js"; -import { getDialectFromURI } from "../storage.js"; +import { getDialectFromUrl } from "../storage.js"; import { readFileSync } from "fs"; /** @@ -77,7 +77,7 @@ export class RegistrationManager implements TenantGate { }); // Initialize RegistrationStore. - const sqlDialect = getDialectFromURI(new URL(registrationStoreUrl)); + const sqlDialect = getDialectFromUrl(new URL(registrationStoreUrl)); const registrationStore = await RegistrationStore.create(sqlDialect); registrationManager.registrationStore = registrationStore; diff --git a/src/storage.ts b/src/storage.ts index e74b3d4..4949119 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -33,8 +33,9 @@ import pg from 'pg'; import Cursor from 'pg-cursor'; import type { DwnServerConfig } from './config.js'; +import { loadPlugin } from './pluginLoader.js'; -export enum EStoreType { +export enum StoreType { DataStore, MessageStore, EventLog, @@ -48,44 +49,44 @@ export enum BackendTypes { POSTGRES = 'postgres', } -export type StoreType = DataStore | EventLog | MessageStore | ResumableTaskStore; +export type DwnStore = DataStore | EventLog | MessageStore | ResumableTaskStore; -export function getDWNConfig( +export async function getDwnConfig( config : DwnServerConfig, options : { didResolver? : DidResolver, tenantGate? : TenantGate, eventStream? : EventStream, } -): DwnConfig { +): Promise { const { tenantGate, eventStream, didResolver } = options; - const dataStore: DataStore = getStore(config.dataStore, EStoreType.DataStore); - const eventLog: EventLog = getStore(config.eventLog, EStoreType.EventLog); - const messageStore: MessageStore = getStore(config.messageStore, EStoreType.MessageStore); - const resumableTaskStore: ResumableTaskStore = getStore(config.messageStore, EStoreType.ResumableTaskStore); + const dataStore: DataStore = await getStore(config.dataStore, StoreType.DataStore); + const eventLog: EventLog = await getStore(config.eventLog, StoreType.EventLog); + const messageStore: MessageStore = await getStore(config.messageStore, StoreType.MessageStore); + const resumableTaskStore: ResumableTaskStore = await getStore(config.messageStore, StoreType.ResumableTaskStore); return { didResolver, eventStream, eventLog, dataStore, messageStore, resumableTaskStore, tenantGate }; } function getLevelStore( storeURI: URL, - storeType: EStoreType, -): DataStore | MessageStore | EventLog | ResumableTaskStore { + storeType: StoreType, +): DwnStore { switch (storeType) { - case EStoreType.DataStore: + case StoreType.DataStore: return new DataStoreLevel({ blockstoreLocation: storeURI.host + storeURI.pathname + '/DATASTORE', }); - case EStoreType.MessageStore: + case StoreType.MessageStore: return new MessageStoreLevel({ blockstoreLocation: storeURI.host + storeURI.pathname + '/MESSAGESTORE', indexLocation: storeURI.host + storeURI.pathname + '/INDEX', }); - case EStoreType.EventLog: + case StoreType.EventLog: return new EventLogLevel({ location: storeURI.host + storeURI.pathname + '/EVENTLOG', }); - case EStoreType.ResumableTaskStore: + case StoreType.ResumableTaskStore: return new ResumableTaskStoreLevel({ location: storeURI.host + storeURI.pathname + '/RESUMABLE-TASK-STORE', }); @@ -94,42 +95,45 @@ function getLevelStore( } } -function getDBStore( - db: Dialect, - storeType: EStoreType, -): DataStore | MessageStore | EventLog | ResumableTaskStore { +function getSqlStore( + connectionUrl: URL, + storeType: StoreType, +): DwnStore { + const dialect = getDialectFromUrl(connectionUrl); + switch (storeType) { - case EStoreType.DataStore: - return new DataStoreSql(db); - case EStoreType.MessageStore: - return new MessageStoreSql(db); - case EStoreType.EventLog: - return new EventLogSql(db); - case EStoreType.ResumableTaskStore: - return new ResumableTaskStoreSql(db); + case StoreType.DataStore: + return new DataStoreSql(dialect); + case StoreType.MessageStore: + return new MessageStoreSql(dialect); + case StoreType.EventLog: + return new EventLogSql(dialect); + case StoreType.ResumableTaskStore: + return new ResumableTaskStoreSql(dialect); default: - throw new Error('Unexpected db store type'); + throw new Error(`Unsupported store type ${storeType} for SQL store.`); } } -function getStore( - storeString: string, - storeType: EStoreType.DataStore, -): DataStore; -function getStore( - storeString: string, - storeType: EStoreType.EventLog, -): EventLog; -function getStore( - storeString: string, - storeType: EStoreType.MessageStore, -): MessageStore; -function getStore( - storeString: string, - storeType: EStoreType.ResumableTaskStore, -): ResumableTaskStore; -function getStore(storeString: string, storeType: EStoreType): StoreType { - const storeURI = new URL(storeString); +/** + * Check if the given string is a file path. + */ +function isFilePath(configString: string): boolean { + const filePathPrefixes = ['/', './', '../']; + return filePathPrefixes.some(prefix => configString.startsWith(prefix)); +} + +async function getStore(storeString: string, storeType: StoreType.DataStore): Promise; +async function getStore(storeString: string, storeType: StoreType.EventLog): Promise; +async function getStore(storeString: string, storeType: StoreType.MessageStore): Promise; +async function getStore(storeString: string, storeType: StoreType.ResumableTaskStore): Promise; +async function getStore(storeConfigString: string, storeType: StoreType): Promise { + if (isFilePath(storeConfigString)) { + return await loadStoreFromFilePath(storeConfigString, storeType); + } + // else treat the `storeConfigString` as a connection string + + const storeURI = new URL(storeConfigString); switch (storeURI.protocol.slice(0, -1)) { case BackendTypes.LEVEL: @@ -138,14 +142,35 @@ function getStore(storeString: string, storeType: EStoreType): StoreType { case BackendTypes.SQLITE: case BackendTypes.MYSQL: case BackendTypes.POSTGRES: - return getDBStore(getDialectFromURI(storeURI), storeType); + return getSqlStore(storeURI, storeType); default: throw invalidStorageSchemeMessage(storeURI.protocol); } } -export function getDialectFromURI(connectionUrl: URL): Dialect { +/** + * Loads a DWN store plugin of the given type from the given file path. + */ +async function loadStoreFromFilePath( + filePath: string, + storeType: StoreType, +): Promise { + switch (storeType) { + case StoreType.DataStore: + return await loadPlugin(filePath); + case StoreType.EventLog: + return await loadPlugin(filePath); + case StoreType.MessageStore: + return await loadPlugin(filePath); + case StoreType.ResumableTaskStore: + return await loadPlugin(filePath); + default: + throw new Error(`Loading store for unsupported store type ${storeType} from path ${filePath}`); + } +} + +export function getDialectFromUrl(connectionUrl: URL): Dialect { switch (connectionUrl.protocol.slice(0, -1)) { case BackendTypes.SQLITE: const path = connectionUrl.host + connectionUrl.pathname; diff --git a/src/web5-connect/web5-connect-server.ts b/src/web5-connect/web5-connect-server.ts index bf2b1a2..007faa7 100644 --- a/src/web5-connect/web5-connect-server.ts +++ b/src/web5-connect/web5-connect-server.ts @@ -1,4 +1,4 @@ -import { getDialectFromURI } from "../storage.js"; +import { getDialectFromUrl } from "../storage.js"; import { randomUuid } from '@web5/crypto/utils'; import { SqlTtlCache } from "./sql-ttl-cache.js"; @@ -49,7 +49,7 @@ export class Web5ConnectServer { const web5ConnectServer = new Web5ConnectServer({ baseUrl }); // Initialize TTL cache. - const sqlDialect = getDialectFromURI(new URL(sqlTtlCacheUrl)); + const sqlDialect = getDialectFromUrl(new URL(sqlTtlCacheUrl)); web5ConnectServer.cache = await SqlTtlCache.create(sqlDialect); return web5ConnectServer; diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index 2d938f4..d4042ee 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -53,7 +53,6 @@ describe('http api', function () { before(async function () { clock = useFakeTimers({ shouldAdvanceTime: true }); - config.registrationStoreUrl = 'sqlite://'; config.registrationProofOfWorkEnabled = true; config.termsOfServiceFilePath = './tests/fixtures/terms-of-service.txt'; diff --git a/tests/plugins/data-store-sqlite.ts b/tests/plugins/data-store-sqlite.ts new file mode 100644 index 0000000..796d5b0 --- /dev/null +++ b/tests/plugins/data-store-sqlite.ts @@ -0,0 +1,26 @@ +import type { DataStore } from "@tbd54566975/dwn-sdk-js"; +import { DataStoreSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a DataStore plugin that is used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class DataStoreSqlite extends DataStoreSql implements DataStore { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + DataStoreSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/scenarios/dynamic-plugin-loading.spec.ts b/tests/scenarios/dynamic-plugin-loading.spec.ts new file mode 100644 index 0000000..8323df6 --- /dev/null +++ b/tests/scenarios/dynamic-plugin-loading.spec.ts @@ -0,0 +1,194 @@ +import type { JsonRpcSuccessResponse } from '../../src/lib/json-rpc.js'; +import type { Readable } from 'readable-stream'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; +import DataStoreSqlite from '../plugins/data-store-sqlite.js'; +import fetch from 'node-fetch'; +import sinon from 'sinon'; + +import { config } from '../../src/config.js'; +import { createJsonRpcRequest } from '../../src/lib/json-rpc.js'; +import { DwnServer } from '../../src/dwn-server.js'; +import { getFileAsReadStream } from '../utils.js'; +import { v4 as uuidv4 } from 'uuid'; + +import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; +import { DidDht, DidKey, UniversalResolver } from '@web5/dids'; + +// node.js 18 and earlier needs globalThis.crypto polyfill +if (!globalThis.crypto) { + // @ts-ignore + globalThis.crypto = webcrypto; +} + +chai.use(chaiAsPromised); + +describe('Dynamic DWN plugin loading', function () { + let dwnServer: DwnServer; + + afterEach(async () => { + // clock.restore(); + if (dwnServer !== undefined) { + await dwnServer.stop(); + } + }); + + it('should fail dynamically loading a non-existent plugin', async () => { + const dwnServerConfigCopy = { ...config }; // not touching the original config + dwnServerConfigCopy.dataStore = './non-existent-plugin.js'; + + const invalidDwnServer = new DwnServer({ config: dwnServerConfigCopy }); + await expect(invalidDwnServer.start()).to.be.rejectedWith('Failed to load component at ./non-existent-plugin.js'); + }); + + it('should be able to dynamically load and use custom data store implementation', async () => { + // Scenario: + // 1. Configure DWN to load a custom data store plugin. + // 2. Validate that the constructor of the plugin is called. + // 3. Validate that the DWN instance is using the custom data store plugin. + + // NOTE: was not able to spy on constructor directly, so spying on a method that is called in the constructor + const customDataStoreConstructorSpy = sinon.spy(DataStoreSqlite, 'spyingTheConstructor'); + + // 1. Configure DWN to load a custom data store plugin. + const dwnServerConfigCopy = { ...config }; // not touching the original config + dwnServerConfigCopy.registrationStoreUrl = undefined; // allow all traffic + dwnServerConfigCopy.messageStore = 'sqlite://'; + dwnServerConfigCopy.dataStore = '../tests/plugins/data-store-sqlite.js'; + dwnServerConfigCopy.eventLog = 'sqlite://'; + + // 2. Validate that the constructor of the plugin is called. + // CRITICAL: We need to create a custom DID resolver that does not use a LevelDB based cache (which is the default cache used in `DWN`) + // otherwise we will receive a `Database is not open` coming from LevelDB. + // This is likely due to the fact that LevelDB is the default cache used in `DWN`, and we have tests creating default DWN instances, + // so here we have to create a DWN that does not use the same LevelDB cache to avoid hitting LevelDB locked issues. + // Long term we should investigate and unify approach of DWN instantiation taken by tests to avoid this "workaround" entirely. + const didResolver = new UniversalResolver({ + didResolvers : [DidDht, DidKey], + }); + dwnServer = new DwnServer({ config: dwnServerConfigCopy, didResolver }); + await dwnServer.start(); + expect(customDataStoreConstructorSpy.calledOnce).to.be.true; + + // 3. Validate that the DWN instance is using the custom data store plugin. + const dwnUrl = `${dwnServerConfigCopy.baseUrl}:${dwnServerConfigCopy.port}`; + await sanityTestDwnReadWrite(dwnUrl); + }); +}); + +/** + * Sanity test RecordsWrite and RecordsRead on the DWN instance. + */ +async function sanityTestDwnReadWrite(dwnUrl: string): Promise { + const alice = await TestDataGenerator.generateDidKeyPersona(); + const aliceSigner = Jws.createSigner(alice); + // await registrationManager.recordTenantRegistration({ did: alice.did, termsOfServiceHash: registrationManager.getTermsOfServiceHash()}); + + // install minimal protocol on Alice's DWN + const protocolDefinition = { + protocol: 'http://minimal.xyz', + published: false, + types: { + foo: {} + }, + structure: { + foo: {} + } + }; + + const protocolsConfig = await ProtocolsConfigure.create({ + signer: aliceSigner, + definition: protocolDefinition + }); + + const protocolConfigureRequestId = uuidv4(); + const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', { + target: alice.did, + message: protocolsConfig.message, + }); + const protocolConfigureResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(protocolConfigureRequest), + } + }); + const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse; + + expect(protocolConfigureResponse.status).to.equal(200); + expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202); + + // Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store. + const filePath = './fixtures/test.jpeg'; + const { + cid: dataCid, + size: dataSize, + stream + } = await getFileAsReadStream(filePath); + expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded); + + const recordsWrite = await RecordsWrite.create({ + signer: aliceSigner, + dataFormat: 'image/jpeg', + dataCid, + dataSize + }); + + const recordsWriteRequestId = uuidv4(); + const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsWrite.message, + }); + const recordsWriteResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsWriteRequest), + }, + body: stream + }); + const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse; + + expect(recordsWriteResponse.status).to.equal(200); + expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202); + + // Alice reading the file back out. + const recordsRead = await RecordsRead.create({ + signer: aliceSigner, + filter: { + recordId: recordsWrite.message.recordId, + }, + }); + + const recordsReadRequestId = uuidv4(); + const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsRead.message + }); + + const recordsReadResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsReadRequest), + }, + }); + + expect(recordsReadResponse.status).to.equal(200); + + const { headers } = recordsReadResponse; + const contentType = headers.get('content-type'); + expect(contentType).to.not.be.undefined; + expect(contentType).to.equal('application/octet-stream'); + + const recordsReadDwnResponse = headers.get('dwn-response'); + expect(recordsReadDwnResponse).to.not.be.undefined; + + const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse; + expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId); + expect(recordsReadJsonRpcResponse.error).to.not.exist; + expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200); + expect(recordsReadJsonRpcResponse.result.reply.record).to.exist; + + // can't get response as stream from supertest :( + const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable); + expect(cid).to.equal(dataCid); +} diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index 005e00d..8ce33d7 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,25 +1,25 @@ import type { TenantGate } from '@tbd54566975/dwn-sdk-js'; -import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; + +import { getDialectFromUrl } from '../src/storage.js'; import { DataStoreSql, EventLogSql, MessageStoreSql, ResumableTaskStoreSql, } from '@tbd54566975/dwn-sql-store'; - -import { getDialectFromURI } from '../src/storage.js'; import { DidDht, DidIon, DidKey, UniversalResolver } from '@web5/dids'; +import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; export async function getTestDwn(options: { tenantGate?: TenantGate, withEvents?: boolean, } = {}): Promise { const { tenantGate, withEvents = false } = options; - const db = getDialectFromURI(new URL('sqlite://')); - const dataStore = new DataStoreSql(db); - const eventLog = new EventLogSql(db); - const messageStore = new MessageStoreSql(db); - const resumableTaskStore = new ResumableTaskStoreSql(db); + const dialect = getDialectFromUrl(new URL('sqlite://')); + const dataStore = new DataStoreSql(dialect); + const eventLog = new EventLogSql(dialect); + const messageStore = new MessageStoreSql(dialect); + const resumableTaskStore = new ResumableTaskStoreSql(dialect); const eventStream = withEvents ? new EventEmitterStream() : undefined; // NOTE: no resolver cache used here to avoid locking LevelDB