From bcd669b992b3bd25fe3c0924c9833c1254015aa0 Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Tue, 9 Jul 2024 18:35:37 -0700 Subject: [PATCH] Exposed config for EventStream + Updated plugin test --- README.md | 1 + src/config.ts | 6 + src/dwn-server.ts | 11 +- src/{pluginLoader.ts => plugin-loader.ts} | 13 +- src/storage.ts | 39 +++-- tests/common-scenario-validator.ts | 142 ++++++++++++++++++ tests/http-api.spec.ts | 130 +--------------- tests/plugins/data-store-sqlite.ts | 2 +- tests/plugins/event-log-sqlite.ts | 26 ++++ tests/plugins/event-stream-in-memory.ts | 24 +++ tests/plugins/message-store-sqlite.ts | 26 ++++ .../scenarios/dynamic-plugin-loading.spec.ts | 141 ++--------------- tests/utils.ts | 48 ------ 13 files changed, 285 insertions(+), 324 deletions(-) rename src/{pluginLoader.ts => plugin-loader.ts} (60%) create mode 100644 tests/common-scenario-validator.ts create mode 100644 tests/plugins/event-log-sqlite.ts create mode 100644 tests/plugins/event-stream-in-memory.ts create mode 100644 tests/plugins/message-store-sqlite.ts diff --git a/README.md b/README.md index e5e680f..5490ef2 100644 --- a/README.md +++ b/README.md @@ -282,6 +282,7 @@ Configuration can be set using environment variables | `DS_MAX_RECORD_DATA_SIZE` | Maximum size for `RecordsWrite` data. use `b`, `kb`, `mb`, `gb` for value | `1gb` | | `DS_WEBSOCKET_SERVER` | Whether to enable listening over `ws:`. values: `on`,`off` | `on` | | `DWN_BASE_URL` | Base external URL of this DWN. Used to construct URL paths such as the `Request URI` for the Web5 Connect flow. | `http://localhost` | +| `DWN_EVENT_STREAM_PLUGIN_PATH` | Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty. | unset | | `DWN_REGISTRATION_STORE_URL` | URL to use for storage of registered DIDs. Leave unset to if DWN does not require registration (ie. open for all) | unset | | `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset | | `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` | diff --git a/src/config.ts b/src/config.ts index f6ce95a..de3c5ba 100644 --- a/src/config.ts +++ b/src/config.ts @@ -46,6 +46,12 @@ export const config = { // whether to enable 'ws:' webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true, + + /** + * Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty. + */ + eventStreamPluginPath: process.env.DWN_EVENT_STREAM_PLUGIN_PATH, + // where to store persistent data messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data', dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data', diff --git a/src/dwn-server.ts b/src/dwn-server.ts index 3c983dc..316e078 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -11,6 +11,7 @@ import { config as defaultConfig } from './config.js'; import { getDwnConfig } from './storage.js'; import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js'; import { HttpApi } from './http-api.js'; +import { PluginLoader } from './plugin-loader.js'; import { RegistrationManager } from './registration/registration-manager.js'; import { WsApi } from './ws-api.js'; import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; @@ -100,9 +101,13 @@ export class DwnServer { let eventStream: EventStream | undefined; if (this.config.webSocketSupport) { - // setting `EventEmitterStream` as default the default `EventStream - // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. - eventStream = new EventEmitterStream(); + // If Even Stream plugin is not specified, use `EventEmitterStream` implementation as default. + if (this.config.eventStreamPluginPath === undefined || this.config.eventStreamPluginPath === '') { + eventStream = new EventEmitterStream(); + } else { + eventStream = await PluginLoader.loadPlugin(this.config.eventStreamPluginPath); + } + } const dwnConfig = await getDwnConfig(this.config, { diff --git a/src/pluginLoader.ts b/src/plugin-loader.ts similarity index 60% rename from src/pluginLoader.ts rename to src/plugin-loader.ts index b10f82c..fd5bc4f 100644 --- a/src/pluginLoader.ts +++ b/src/plugin-loader.ts @@ -1,10 +1,14 @@ /** - * Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class. + * A utility class for dynamically loading plugins from file paths. */ -export async function loadPlugin(filePath: string): Promise { +export class PluginLoader { + /** + * Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class. + */ + public static async loadPlugin(filePath: string): Promise { try { const module = await import(filePath); - + // Check if the default export is a class if (typeof module.default === 'function') { const instance: T = new module.default() as T; @@ -15,4 +19,5 @@ export async function loadPlugin(filePath: string): Promise { } catch (error) { throw new Error(`Failed to load component at ${filePath}: ${error.message}`); } - } \ No newline at end of file + } +} diff --git a/src/storage.ts b/src/storage.ts index 4949119..8c7b50c 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -1,11 +1,3 @@ -import * as fs from 'fs'; - -import { - DataStoreLevel, - EventLogLevel, - MessageStoreLevel, - ResumableTaskStoreLevel, -} from '@tbd54566975/dwn-sdk-js'; import type { DidResolver } from '@web5/dids'; import type { DataStore, @@ -17,6 +9,21 @@ import type { TenantGate, } from '@tbd54566975/dwn-sdk-js'; import type { Dialect } from '@tbd54566975/dwn-sql-store'; +import type { DwnServerConfig } from './config.js'; + +import * as fs from 'fs'; +import Cursor from 'pg-cursor'; +import Database from 'better-sqlite3'; +import pg from 'pg'; +import { createPool as MySQLCreatePool } from 'mysql2'; +import { PluginLoader } from './plugin-loader.js'; + +import { + DataStoreLevel, + EventLogLevel, + MessageStoreLevel, + ResumableTaskStoreLevel, +} from '@tbd54566975/dwn-sdk-js'; import { DataStoreSql, EventLogSql, @@ -27,14 +34,6 @@ import { SqliteDialect, } from '@tbd54566975/dwn-sql-store'; -import Database from 'better-sqlite3'; -import { createPool as MySQLCreatePool } from 'mysql2'; -import pg from 'pg'; -import Cursor from 'pg-cursor'; - -import type { DwnServerConfig } from './config.js'; -import { loadPlugin } from './pluginLoader.js'; - export enum StoreType { DataStore, MessageStore, @@ -158,13 +157,13 @@ async function loadStoreFromFilePath( ): Promise { switch (storeType) { case StoreType.DataStore: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); case StoreType.EventLog: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); case StoreType.MessageStore: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); case StoreType.ResumableTaskStore: - return await loadPlugin(filePath); + return await PluginLoader.loadPlugin(filePath); default: throw new Error(`Loading store for unsupported store type ${storeType} from path ${filePath}`); } diff --git a/tests/common-scenario-validator.ts b/tests/common-scenario-validator.ts new file mode 100644 index 0000000..46050b4 --- /dev/null +++ b/tests/common-scenario-validator.ts @@ -0,0 +1,142 @@ +import type { JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; +import type { Persona } from '@tbd54566975/dwn-sdk-js'; +import type { Readable } from 'readable-stream'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; +import fetch from 'node-fetch'; + +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { getFileAsReadStream } from './utils.js'; +import { v4 as uuidv4 } from 'uuid'; + +import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + + +// node.js 18 and earlier needs globalThis.crypto polyfill +if (!globalThis.crypto) { + // @ts-ignore + globalThis.crypto = webcrypto; +} + +chai.use(chaiAsPromised); + +/** + * Validator of common scenarios. + */ +export default class CommonScenarioValidator { + /** + * Sanity test RecordsWrite and RecordsRead on the DWN instance. + */ + public static async sanityTestDwnReadWrite(dwnUrl: string, persona?: Persona): Promise { + const alice = persona || await TestDataGenerator.generateDidKeyPersona(); + const aliceSigner = Jws.createSigner(alice); + + // install minimal protocol on Alice's DWN + const protocolDefinition = { + protocol: 'http://minimal.xyz', + published: false, + types: { + foo: {} + }, + structure: { + foo: {} + } + }; + + const protocolsConfig = await ProtocolsConfigure.create({ + signer: aliceSigner, + definition: protocolDefinition + }); + + const protocolConfigureRequestId = uuidv4(); + const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', { + target: alice.did, + message: protocolsConfig.message, + }); + const protocolConfigureResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(protocolConfigureRequest), + } + }); + const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse; + + expect(protocolConfigureResponse.status).to.equal(200); + expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202); + + // Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store. + const filePath = './fixtures/test.jpeg'; + const { + cid: dataCid, + size: dataSize, + stream + } = await getFileAsReadStream(filePath); + expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded); + + const recordsWrite = await RecordsWrite.create({ + signer: aliceSigner, + dataFormat: 'image/jpeg', + dataCid, + dataSize + }); + + const recordsWriteRequestId = uuidv4(); + const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsWrite.message, + }); + const recordsWriteResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsWriteRequest), + }, + body: stream + }); + const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse; + + expect(recordsWriteResponse.status).to.equal(200); + expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202); + + // Alice reading the file back out. + const recordsRead = await RecordsRead.create({ + signer: aliceSigner, + filter: { + recordId: recordsWrite.message.recordId, + }, + }); + + const recordsReadRequestId = uuidv4(); + const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', { + target: alice.did, + message: recordsRead.message + }); + + const recordsReadResponse = await fetch(dwnUrl, { + method: 'POST', + headers: { + 'dwn-request': JSON.stringify(recordsReadRequest), + }, + }); + + expect(recordsReadResponse.status).to.equal(200); + + const { headers } = recordsReadResponse; + const contentType = headers.get('content-type'); + expect(contentType).to.not.be.undefined; + expect(contentType).to.equal('application/octet-stream'); + + const recordsReadDwnResponse = headers.get('dwn-response'); + expect(recordsReadDwnResponse).to.not.be.undefined; + + const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse; + expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId); + expect(recordsReadJsonRpcResponse.error).to.not.exist; + expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200); + expect(recordsReadJsonRpcResponse.result.reply.record).to.exist; + + // can't get response as stream from supertest :( + const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable); + expect(cid).to.equal(dataCid); + } +} diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index d4042ee..c49a3a4 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -1,12 +1,10 @@ // node.js 18 and earlier, needs globalThis.crypto polyfill import sinon from 'sinon'; import { - Cid, DataStream, DwnErrorCode, ProtocolsConfigure, RecordsQuery, - RecordsRead, TestDataGenerator, Time, } from '@tbd54566975/dwn-sdk-js'; @@ -35,9 +33,9 @@ import { createRecordsWriteMessage, getDwnResponse, getFileAsReadStream, - streamHttpRequest, } from './utils.js'; import { RegistrationManager } from '../src/registration/registration-manager.js'; +import CommonScenarioValidator from './common-scenario-validator.js'; if (!globalThis.crypto) { // @ts-ignore @@ -53,6 +51,7 @@ describe('http api', function () { before(async function () { clock = useFakeTimers({ shouldAdvanceTime: true }); + // TODO: Remove direct use of default config to avoid changes bleed/pollute between tests - https://github.com/TBD54566975/dwn-server/issues/144 config.registrationStoreUrl = 'sqlite://'; config.registrationProofOfWorkEnabled = true; config.termsOfServiceFilePath = './tests/fixtures/terms-of-service.txt'; @@ -198,44 +197,14 @@ describe('http api', function () { }); }); - describe('RecordsWrite', function () { - it('handles RecordsWrite with request body', async function () { - const filePath = './fixtures/test.jpeg'; - const { cid, size, stream } = await getFileAsReadStream(filePath); - - const { recordsWrite } = await createRecordsWriteMessage(alice, { - dataCid: cid, - dataSize: size, - }); - - const requestId = uuidv4(); - const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - message: recordsWrite.toJSON(), - target: alice.did, - }); - - const resp = await streamHttpRequest( - 'http://localhost:3000', - { - method: 'POST', - headers: { - 'content-type': 'application/octet-stream', - 'dwn-request': JSON.stringify(dwnRequest), - }, - }, - stream, - ); - - expect(resp.status).to.equal(200); - - const body = JSON.parse(resp.body) as JsonRpcResponse; - expect(body.id).to.equal(requestId); - expect(body.error).to.not.exist; - - const { reply } = body.result; - expect(reply.status.code).to.equal(202); + describe('P0 Scenarios', function () { + it('should be able to read and write a protocol record', async function () { + const dwnUrl = `${config.baseUrl}:${config.port}`; + await CommonScenarioValidator.sanityTestDwnReadWrite(dwnUrl, alice) }); + }); + describe('RecordsWrite', function () { it('handles RecordsWrite overwrite that does not mutate data', async function () { // First RecordsWrite that creates the record. const { recordsWrite: initialWrite, dataStream } = @@ -331,89 +300,6 @@ describe('http api', function () { }); }); - describe('RecordsRead', function () { - it('returns message in response header and data in body', async function () { - const filePath = './fixtures/test.jpeg'; - const { - cid: expectedCid, - size, - stream, - } = await getFileAsReadStream(filePath); - - const { recordsWrite } = await createRecordsWriteMessage(alice, { - dataCid: expectedCid, - dataSize: size, - }); - - let requestId = uuidv4(); - let dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - message: recordsWrite.toJSON(), - target: alice.did, - }); - - let response = await fetch('http://localhost:3000', { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(dwnRequest), - }, - body: stream, - }); - - expect(response.status).to.equal(200); - - const body = (await response.json()) as JsonRpcResponse; - expect(body.id).to.equal(requestId); - expect(body.error).to.not.exist; - - const { reply } = body.result; - expect(reply.status.code).to.equal(202); - - const recordsRead = await RecordsRead.create({ - signer: alice.signer, - filter: { - recordId: recordsWrite.message.recordId, - }, - }); - - requestId = uuidv4(); - dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - target: alice.did, - message: recordsRead.toJSON(), - }); - - response = await fetch('http://localhost:3000', { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(dwnRequest), - }, - }); - - expect(response.status).to.equal(200); - - const { headers } = response; - - const contentType = headers.get('content-type'); - expect(contentType).to.not.be.undefined; - expect(contentType).to.equal('application/octet-stream'); - - const dwnResponse = headers.get('dwn-response'); - expect(dwnResponse).to.not.be.undefined; - - const jsonRpcResponse = JSON.parse(dwnResponse) as JsonRpcResponse; - - expect(jsonRpcResponse.id).to.equal(requestId); - expect(jsonRpcResponse.error).to.not.exist; - - const { reply: recordsReadReply } = jsonRpcResponse.result; - expect(recordsReadReply.status.code).to.equal(200); - expect(recordsReadReply.record).to.exist; - - // can't get response as stream from supertest :( - const cid = await Cid.computeDagPbCidFromStream(response.body as any); - expect(cid).to.equal(expectedCid); - }); - }); - describe('/:did/records/:id', function () { it('returns record data if record is published', async function () { const filePath = './fixtures/test.jpeg'; diff --git a/tests/plugins/data-store-sqlite.ts b/tests/plugins/data-store-sqlite.ts index 796d5b0..9f46c68 100644 --- a/tests/plugins/data-store-sqlite.ts +++ b/tests/plugins/data-store-sqlite.ts @@ -3,7 +3,7 @@ import { DataStoreSql } from "@tbd54566975/dwn-sql-store"; import { getDialectFromUrl } from "../../src/storage.js"; /** - * An example of a DataStore plugin that is used for testing. + * An example of a plugin that is used for testing. * The points to note are: * - The class must be a default export. * - The constructor must not take any arguments. diff --git a/tests/plugins/event-log-sqlite.ts b/tests/plugins/event-log-sqlite.ts new file mode 100644 index 0000000..c3078b1 --- /dev/null +++ b/tests/plugins/event-log-sqlite.ts @@ -0,0 +1,26 @@ +import type { EventLog } from "@tbd54566975/dwn-sdk-js"; +import { EventLogSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a plugin. Used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class EventLogSqlite extends EventLogSql implements EventLog { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + EventLogSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/plugins/event-stream-in-memory.ts b/tests/plugins/event-stream-in-memory.ts new file mode 100644 index 0000000..7653ed5 --- /dev/null +++ b/tests/plugins/event-stream-in-memory.ts @@ -0,0 +1,24 @@ +import type { EventStream } from "@tbd54566975/dwn-sdk-js"; +import { EventEmitterStream } from "@tbd54566975/dwn-sdk-js"; + +/** + * An example of a plugin that is used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class EventStreamInMemory extends EventEmitterStream implements EventStream { + constructor() { + super(); + + // NOTE: the following line is added purely to test the constructor invocation. + EventStreamInMemory.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/plugins/message-store-sqlite.ts b/tests/plugins/message-store-sqlite.ts new file mode 100644 index 0000000..7cb65c6 --- /dev/null +++ b/tests/plugins/message-store-sqlite.ts @@ -0,0 +1,26 @@ +import type { MessageStore } from "@tbd54566975/dwn-sdk-js"; +import { MessageStoreSql } from "@tbd54566975/dwn-sql-store"; +import { getDialectFromUrl } from "../../src/storage.js"; + +/** + * An example of a plugin. Used for testing. + * The points to note are: + * - The class must be a default export. + * - The constructor must not take any arguments. + */ +export default class MessageStoreSqlite extends MessageStoreSql implements MessageStore { + constructor() { + const sqliteDialect = getDialectFromUrl(new URL('sqlite://')); + super(sqliteDialect); + + // NOTE: the following line is added purely to test the constructor invocation. + MessageStoreSqlite.spyingTheConstructor(); + } + + /** + * NOTE: This method is introduced purely to indirectly test/spy invocation of the constructor. + * As I was unable to find an easy way to directly spy the constructor. + */ + public static spyingTheConstructor(): void { + } +} \ No newline at end of file diff --git a/tests/scenarios/dynamic-plugin-loading.spec.ts b/tests/scenarios/dynamic-plugin-loading.spec.ts index 8323df6..84ac281 100644 --- a/tests/scenarios/dynamic-plugin-loading.spec.ts +++ b/tests/scenarios/dynamic-plugin-loading.spec.ts @@ -1,20 +1,15 @@ -import type { JsonRpcSuccessResponse } from '../../src/lib/json-rpc.js'; -import type { Readable } from 'readable-stream'; - import chaiAsPromised from 'chai-as-promised'; import chai, { expect } from 'chai'; import DataStoreSqlite from '../plugins/data-store-sqlite.js'; -import fetch from 'node-fetch'; +import EventLogSqlite from '../plugins/event-log-sqlite.js'; +import EventStreamInMemory from '../plugins/event-stream-in-memory.js'; import sinon from 'sinon'; import { config } from '../../src/config.js'; -import { createJsonRpcRequest } from '../../src/lib/json-rpc.js'; import { DwnServer } from '../../src/dwn-server.js'; -import { getFileAsReadStream } from '../utils.js'; -import { v4 as uuidv4 } from 'uuid'; -import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; import { DidDht, DidKey, UniversalResolver } from '@web5/dids'; +import CommonScenarioValidator from '../common-scenario-validator.js'; // node.js 18 and earlier needs globalThis.crypto polyfill if (!globalThis.crypto) { @@ -50,13 +45,21 @@ describe('Dynamic DWN plugin loading', function () { // NOTE: was not able to spy on constructor directly, so spying on a method that is called in the constructor const customDataStoreConstructorSpy = sinon.spy(DataStoreSqlite, 'spyingTheConstructor'); + const customEventLogConstructorSpy = sinon.spy(EventLogSqlite, 'spyingTheConstructor'); + const customEventStreamConstructorSpy = sinon.spy(EventStreamInMemory, 'spyingTheConstructor'); // 1. Configure DWN to load a custom data store plugin. const dwnServerConfigCopy = { ...config }; // not touching the original config + + // TODO: remove below after https://github.com/TBD54566975/dwn-server/issues/144 is resolved + // The default config is not reliable because other tests modify it. dwnServerConfigCopy.registrationStoreUrl = undefined; // allow all traffic + + // dwnServerConfigCopy.messageStore = '../tests/plugins/message-store-sqlite.js'; // not working dwnServerConfigCopy.messageStore = 'sqlite://'; dwnServerConfigCopy.dataStore = '../tests/plugins/data-store-sqlite.js'; - dwnServerConfigCopy.eventLog = 'sqlite://'; + dwnServerConfigCopy.eventLog = '../tests/plugins/event-log-sqlite.js'; + dwnServerConfigCopy.eventStreamPluginPath = '../tests/plugins/event-stream-in-memory.js'; // 2. Validate that the constructor of the plugin is called. // CRITICAL: We need to create a custom DID resolver that does not use a LevelDB based cache (which is the default cache used in `DWN`) @@ -70,125 +73,11 @@ describe('Dynamic DWN plugin loading', function () { dwnServer = new DwnServer({ config: dwnServerConfigCopy, didResolver }); await dwnServer.start(); expect(customDataStoreConstructorSpy.calledOnce).to.be.true; + expect(customEventLogConstructorSpy.calledOnce).to.be.true; + expect(customEventStreamConstructorSpy.calledOnce).to.be.true; // 3. Validate that the DWN instance is using the custom data store plugin. const dwnUrl = `${dwnServerConfigCopy.baseUrl}:${dwnServerConfigCopy.port}`; - await sanityTestDwnReadWrite(dwnUrl); + await CommonScenarioValidator.sanityTestDwnReadWrite(dwnUrl); }); }); - -/** - * Sanity test RecordsWrite and RecordsRead on the DWN instance. - */ -async function sanityTestDwnReadWrite(dwnUrl: string): Promise { - const alice = await TestDataGenerator.generateDidKeyPersona(); - const aliceSigner = Jws.createSigner(alice); - // await registrationManager.recordTenantRegistration({ did: alice.did, termsOfServiceHash: registrationManager.getTermsOfServiceHash()}); - - // install minimal protocol on Alice's DWN - const protocolDefinition = { - protocol: 'http://minimal.xyz', - published: false, - types: { - foo: {} - }, - structure: { - foo: {} - } - }; - - const protocolsConfig = await ProtocolsConfigure.create({ - signer: aliceSigner, - definition: protocolDefinition - }); - - const protocolConfigureRequestId = uuidv4(); - const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', { - target: alice.did, - message: protocolsConfig.message, - }); - const protocolConfigureResponse = await fetch(dwnUrl, { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(protocolConfigureRequest), - } - }); - const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse; - - expect(protocolConfigureResponse.status).to.equal(200); - expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202); - - // Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store. - const filePath = './fixtures/test.jpeg'; - const { - cid: dataCid, - size: dataSize, - stream - } = await getFileAsReadStream(filePath); - expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded); - - const recordsWrite = await RecordsWrite.create({ - signer: aliceSigner, - dataFormat: 'image/jpeg', - dataCid, - dataSize - }); - - const recordsWriteRequestId = uuidv4(); - const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', { - target: alice.did, - message: recordsWrite.message, - }); - const recordsWriteResponse = await fetch(dwnUrl, { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(recordsWriteRequest), - }, - body: stream - }); - const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse; - - expect(recordsWriteResponse.status).to.equal(200); - expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202); - - // Alice reading the file back out. - const recordsRead = await RecordsRead.create({ - signer: aliceSigner, - filter: { - recordId: recordsWrite.message.recordId, - }, - }); - - const recordsReadRequestId = uuidv4(); - const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', { - target: alice.did, - message: recordsRead.message - }); - - const recordsReadResponse = await fetch(dwnUrl, { - method: 'POST', - headers: { - 'dwn-request': JSON.stringify(recordsReadRequest), - }, - }); - - expect(recordsReadResponse.status).to.equal(200); - - const { headers } = recordsReadResponse; - const contentType = headers.get('content-type'); - expect(contentType).to.not.be.undefined; - expect(contentType).to.equal('application/octet-stream'); - - const recordsReadDwnResponse = headers.get('dwn-response'); - expect(recordsReadDwnResponse).to.not.be.undefined; - - const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse; - expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId); - expect(recordsReadJsonRpcResponse.error).to.not.exist; - expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200); - expect(recordsReadJsonRpcResponse.result.reply.record).to.exist; - - // can't get response as stream from supertest :( - const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable); - expect(cid).to.equal(dataCid); -} diff --git a/tests/utils.ts b/tests/utils.ts index 105be82..33f67d4 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -2,9 +2,7 @@ import type { GenericMessage, Persona, UnionMessageReply } from '@tbd54566975/dw import type { Response } from 'node-fetch'; import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js'; -import type { ReadStream } from 'node:fs'; import fs from 'node:fs'; -import http from 'node:http'; import path from 'path'; import { v4 as uuidv4 } from 'uuid'; import fetch from 'node-fetch'; @@ -107,52 +105,6 @@ export function getDwnResponse(response: Response): UnionMessageReply { return JSON.parse(response.headers.get('dwn-response') as string) as UnionMessageReply; } -type HttpResponse = { - status: number; - headers: http.IncomingHttpHeaders; - body?: any; -}; - -export function streamHttpRequest( - url: string, - opts: http.RequestOptions, - bodyStream: ReadStream, -): Promise { - return new Promise((resolve, reject) => { - const request = http.request(url, opts, (rawResponse) => { - rawResponse.setEncoding('utf8'); - - const response: HttpResponse = { - status: rawResponse.statusCode, - headers: rawResponse.headers, - }; - - let body = ''; - rawResponse.on('data', (chunk) => { - body += chunk; - }); - - rawResponse.on('end', () => { - if (body) { - response.body = body; - } - - return resolve(response); - }); - }); - - request.on('error', (e) => { - return reject(e); - }); - - bodyStream.on('end', () => { - request.end(); - }); - - bodyStream.pipe(request); - }); -} - export async function sendHttpMessage(options: { url: string, target: string,