Skip to content

Commit

Permalink
Exposed config for EventStream + Updated plugin test
Browse files Browse the repository at this point in the history
  • Loading branch information
thehenrytsai committed Jul 10, 2024
1 parent c1078c6 commit bcd669b
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 324 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ Configuration can be set using environment variables
| `DS_MAX_RECORD_DATA_SIZE` | Maximum size for `RecordsWrite` data. use `b`, `kb`, `mb`, `gb` for value | `1gb` |
| `DS_WEBSOCKET_SERVER` | Whether to enable listening over `ws:`. values: `on`,`off` | `on` |
| `DWN_BASE_URL` | Base external URL of this DWN. Used to construct URL paths such as the `Request URI` for the Web5 Connect flow. | `http://localhost` |
| `DWN_EVENT_STREAM_PLUGIN_PATH` | Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty. | unset |
| `DWN_REGISTRATION_STORE_URL` | URL to use for storage of registered DIDs. Leave unset to if DWN does not require registration (ie. open for all) | unset |
| `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset |
| `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` |
Expand Down
6 changes: 6 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ export const config = {

// whether to enable 'ws:'
webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,

/**
* Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty.
*/
eventStreamPluginPath: process.env.DWN_EVENT_STREAM_PLUGIN_PATH,

// where to store persistent data
messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data',
dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data',
Expand Down
11 changes: 8 additions & 3 deletions src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { config as defaultConfig } from './config.js';
import { getDwnConfig } from './storage.js';
import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js';
import { HttpApi } from './http-api.js';
import { PluginLoader } from './plugin-loader.js';
import { RegistrationManager } from './registration/registration-manager.js';
import { WsApi } from './ws-api.js';
import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js';
Expand Down Expand Up @@ -100,9 +101,13 @@ export class DwnServer {

let eventStream: EventStream | undefined;
if (this.config.webSocketSupport) {
// setting `EventEmitterStream` as default the default `EventStream
// if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options.
eventStream = new EventEmitterStream();
// If Even Stream plugin is not specified, use `EventEmitterStream` implementation as default.
if (this.config.eventStreamPluginPath === undefined || this.config.eventStreamPluginPath === '') {
eventStream = new EventEmitterStream();
} else {
eventStream = await PluginLoader.loadPlugin<EventStream>(this.config.eventStreamPluginPath);
}

}

const dwnConfig = await getDwnConfig(this.config, {
Expand Down
13 changes: 9 additions & 4 deletions src/pluginLoader.ts → src/plugin-loader.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/**
* Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class.
* A utility class for dynamically loading plugins from file paths.
*/
export async function loadPlugin<T>(filePath: string): Promise<T> {
export class PluginLoader {
/**
* Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class.
*/
public static async loadPlugin<T>(filePath: string): Promise<T> {
try {
const module = await import(filePath);

// Check if the default export is a class
if (typeof module.default === 'function') {
const instance: T = new module.default() as T;
Expand All @@ -15,4 +19,5 @@ export async function loadPlugin<T>(filePath: string): Promise<T> {
} catch (error) {
throw new Error(`Failed to load component at ${filePath}: ${error.message}`);
}
}
}
}
39 changes: 19 additions & 20 deletions src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
import * as fs from 'fs';

import {
DataStoreLevel,
EventLogLevel,
MessageStoreLevel,
ResumableTaskStoreLevel,
} from '@tbd54566975/dwn-sdk-js';
import type { DidResolver } from '@web5/dids';
import type {
DataStore,
Expand All @@ -17,6 +9,21 @@ import type {
TenantGate,
} from '@tbd54566975/dwn-sdk-js';
import type { Dialect } from '@tbd54566975/dwn-sql-store';
import type { DwnServerConfig } from './config.js';

import * as fs from 'fs';
import Cursor from 'pg-cursor';
import Database from 'better-sqlite3';
import pg from 'pg';
import { createPool as MySQLCreatePool } from 'mysql2';
import { PluginLoader } from './plugin-loader.js';

import {
DataStoreLevel,
EventLogLevel,
MessageStoreLevel,
ResumableTaskStoreLevel,
} from '@tbd54566975/dwn-sdk-js';
import {
DataStoreSql,
EventLogSql,
Expand All @@ -27,14 +34,6 @@ import {
SqliteDialect,
} from '@tbd54566975/dwn-sql-store';

import Database from 'better-sqlite3';
import { createPool as MySQLCreatePool } from 'mysql2';
import pg from 'pg';
import Cursor from 'pg-cursor';

import type { DwnServerConfig } from './config.js';
import { loadPlugin } from './pluginLoader.js';

export enum StoreType {
DataStore,
MessageStore,
Expand Down Expand Up @@ -158,13 +157,13 @@ async function loadStoreFromFilePath(
): Promise<DwnStore> {
switch (storeType) {
case StoreType.DataStore:
return await loadPlugin<DataStore>(filePath);
return await PluginLoader.loadPlugin<DataStore>(filePath);
case StoreType.EventLog:
return await loadPlugin<EventLog>(filePath);
return await PluginLoader.loadPlugin<EventLog>(filePath);
case StoreType.MessageStore:
return await loadPlugin<MessageStore>(filePath);
return await PluginLoader.loadPlugin<MessageStore>(filePath);
case StoreType.ResumableTaskStore:
return await loadPlugin<ResumableTaskStore>(filePath);
return await PluginLoader.loadPlugin<ResumableTaskStore>(filePath);
default:
throw new Error(`Loading store for unsupported store type ${storeType} from path ${filePath}`);
}
Expand Down
142 changes: 142 additions & 0 deletions tests/common-scenario-validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import type { JsonRpcSuccessResponse } from '../src/lib/json-rpc.js';
import type { Persona } from '@tbd54566975/dwn-sdk-js';
import type { Readable } from 'readable-stream';

import chaiAsPromised from 'chai-as-promised';
import chai, { expect } from 'chai';
import fetch from 'node-fetch';

import { createJsonRpcRequest } from '../src/lib/json-rpc.js';
import { getFileAsReadStream } from './utils.js';
import { v4 as uuidv4 } from 'uuid';

import { Cid, DwnConstant, Jws, ProtocolsConfigure, RecordsRead, RecordsWrite, TestDataGenerator } from '@tbd54566975/dwn-sdk-js';


// node.js 18 and earlier needs globalThis.crypto polyfill
if (!globalThis.crypto) {
// @ts-ignore
globalThis.crypto = webcrypto;
}

chai.use(chaiAsPromised);

/**
* Validator of common scenarios.
*/
export default class CommonScenarioValidator {
/**
* Sanity test RecordsWrite and RecordsRead on the DWN instance.
*/
public static async sanityTestDwnReadWrite(dwnUrl: string, persona?: Persona): Promise<void> {
const alice = persona || await TestDataGenerator.generateDidKeyPersona();
const aliceSigner = Jws.createSigner(alice);

// install minimal protocol on Alice's DWN
const protocolDefinition = {
protocol: 'http://minimal.xyz',
published: false,
types: {
foo: {}
},
structure: {
foo: {}
}
};

const protocolsConfig = await ProtocolsConfigure.create({
signer: aliceSigner,
definition: protocolDefinition
});

const protocolConfigureRequestId = uuidv4();
const protocolConfigureRequest = createJsonRpcRequest(protocolConfigureRequestId, 'dwn.processMessage', {
target: alice.did,
message: protocolsConfig.message,
});
const protocolConfigureResponse = await fetch(dwnUrl, {
method: 'POST',
headers: {
'dwn-request': JSON.stringify(protocolConfigureRequest),
}
});
const protocolConfigureResponseBody = await protocolConfigureResponse.json() as JsonRpcSuccessResponse;

expect(protocolConfigureResponse.status).to.equal(200);
expect(protocolConfigureResponseBody.result.reply.status.code).to.equal(202);

// Alice writing a file larger than max data size allowed to be encoded directly in the DWN Message Store.
const filePath = './fixtures/test.jpeg';
const {
cid: dataCid,
size: dataSize,
stream
} = await getFileAsReadStream(filePath);
expect(dataSize).to.be.greaterThan(DwnConstant.maxDataSizeAllowedToBeEncoded);

const recordsWrite = await RecordsWrite.create({
signer: aliceSigner,
dataFormat: 'image/jpeg',
dataCid,
dataSize
});

const recordsWriteRequestId = uuidv4();
const recordsWriteRequest = createJsonRpcRequest(recordsWriteRequestId, 'dwn.processMessage', {
target: alice.did,
message: recordsWrite.message,
});
const recordsWriteResponse = await fetch(dwnUrl, {
method: 'POST',
headers: {
'dwn-request': JSON.stringify(recordsWriteRequest),
},
body: stream
});
const recordsWriteResponseBody = await recordsWriteResponse.json() as JsonRpcSuccessResponse;

expect(recordsWriteResponse.status).to.equal(200);
expect(recordsWriteResponseBody.result.reply.status.code).to.equal(202);

// Alice reading the file back out.
const recordsRead = await RecordsRead.create({
signer: aliceSigner,
filter: {
recordId: recordsWrite.message.recordId,
},
});

const recordsReadRequestId = uuidv4();
const recordsReadRequest = createJsonRpcRequest(recordsReadRequestId, 'dwn.processMessage', {
target: alice.did,
message: recordsRead.message
});

const recordsReadResponse = await fetch(dwnUrl, {
method: 'POST',
headers: {
'dwn-request': JSON.stringify(recordsReadRequest),
},
});

expect(recordsReadResponse.status).to.equal(200);

const { headers } = recordsReadResponse;
const contentType = headers.get('content-type');
expect(contentType).to.not.be.undefined;
expect(contentType).to.equal('application/octet-stream');

const recordsReadDwnResponse = headers.get('dwn-response');
expect(recordsReadDwnResponse).to.not.be.undefined;

const recordsReadJsonRpcResponse = JSON.parse(recordsReadDwnResponse) as JsonRpcSuccessResponse;
expect(recordsReadJsonRpcResponse.id).to.equal(recordsReadRequestId);
expect(recordsReadJsonRpcResponse.error).to.not.exist;
expect(recordsReadJsonRpcResponse.result.reply.status.code).to.equal(200);
expect(recordsReadJsonRpcResponse.result.reply.record).to.exist;

// can't get response as stream from supertest :(
const cid = await Cid.computeDagPbCidFromStream(recordsReadResponse.body as Readable);
expect(cid).to.equal(dataCid);
}
}
Loading

0 comments on commit bcd669b

Please sign in to comment.