Skip to content

Commit

Permalink
Added plugin support (decentralized-identity#145)
Browse files Browse the repository at this point in the history
- Added plugin support for `MessageStore`, `DataStore`,
`ResumableDataStore`, `EventLog` and `EventStream`.
- Exposed config for `EventStream`.
- Exposed config for `ResumableTaskStore`.
- Refactored/consolidated repeated test code. 
- Various renames.
  • Loading branch information
thehenrytsai authored and Bnonni committed Jul 26, 2024
1 parent 6e45f03 commit 92549e5
Show file tree
Hide file tree
Showing 20 changed files with 526 additions and 255 deletions.
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,22 @@ Configuration can be set using environment variables
| `DS_MAX_RECORD_DATA_SIZE` | Maximum size for `RecordsWrite` data. use `b`, `kb`, `mb`, `gb` for value | `1gb` |
| `DS_WEBSOCKET_SERVER` | Whether to enable listening over `ws:`. values: `on`,`off` | `on` |
| `DWN_BASE_URL` | Base external URL of this DWN. Used to construct URL paths such as the `Request URI` for the Web5 Connect flow. | `http://localhost` |
| `DWN_EVENT_STREAM_PLUGIN_PATH` | Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty. | unset |
| `DWN_REGISTRATION_STORE_URL` | URL to use for storage of registered DIDs. Leave unset to if DWN does not require registration (ie. open for all) | unset |
| `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset |
| `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` |
| `DWN_REGISTRATION_PROOF_OF_WORK_INITIAL_MAX_HASH` | Initial maximum allowed hash in 64 char HEX string. The more leading zeros (smaller number) the higher the difficulty. | `false` |
| `DWN_STORAGE` | URL to use for storage by default. See [Storage Options](#storage-options) for details | `level://data` |
| `DWN_STORAGE_MESSAGES` | URL to use for storage of messages. | value of `DWN_STORAGE` |
| `DWN_STORAGE_DATA` | URL to use for data storage | value of `DWN_STORAGE` |
| `DWN_STORAGE_EVENTS` | URL to use for event storage | value of `DWN_STORAGE` |
| `DWN_STORAGE_MESSAGES` | Connection URL or file path to custom plugin to use for the message store. | value of `DWN_STORAGE` |
| `DWN_STORAGE_DATA` | Connection URL or file path to custom plugin to use for the data store. | value of `DWN_STORAGE` |
| `DWN_STORAGE_RESUMABLE_TASKS` | Connection URL or file path to custom plugin to use for the resumable task store. | value of `DWN_STORAGE` |
| `DWN_STORAGE_EVENTS` | Connection URL or file path to custom plugin to use for the event store. | value of `DWN_STORAGE` |
| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset |
| `DWN_TTL_CACHE_URL` | URL of the TTL cache used by the DWN. Currently only supports SQL databases. | `sqlite://` |

### Storage Options

Several storage formats are supported, and may be configured with the `DWN_STORAGE_*` environment variables:
Several built storage options are supported, and may be configured with the `DWN_STORAGE_*` environment variables:

| Database | Example | Notes |
| ---------- | ----------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
Expand All @@ -304,6 +306,25 @@ Several storage formats are supported, and may be configured with the `DWN_STORA
| MySQL | `mysql://user:pass@host/db?debug=true&timezone=-0700` | [all URL options documented here](https://github.com/mysqljs/mysql#connection-options) |
| PostgreSQL | `postgres:///dwn` | any options other than the URL scheme (`postgres://`) may also be specified via [standard environment variables](https://node-postgres.com/features/connecting#environment-variables) |

### Plugins

In some scenarios, you may want to provide a custom implementation of a pluggable module for the DWN Server. The following interfaces defined in `dwn-sdk-js` package are supported:

- `DataStore`
- `MessageStore`
- `ResumableDataStore`
- `EventLog`
- `EventStream`

To load your custom plugin, specify the absolute path to the `.js` file of your custom implementation using the corresponding environment variable. For instance, use `DWN_STORAGE_DATA` for a custom DWN Data Store.

Refer to the `tests/plugins/*.ts` files for examples of plugin implementations. In summary, you need to:

- Implement the corresponding interface from the `dwn-sdk-js` package. For example, implement the `DataStore` interface for a DWN Data Store.
- Ensure that the built `.js` file that will be referenced by the DWN Server config environment variable contains a class that:
1. Is a default export. This is how DWN Server locates the correct class for instantiation.
1. Has a public constructor that does not take any arguments. This is how DWN Server instantiates the plugin.

## Registration Requirements

There are multiple optional registration gates, each of which can be enabled (all are disabled by default). Tenants (DIDs) must comply with whatever
Expand Down
7 changes: 7 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ export const config = {

// whether to enable 'ws:'
webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,

/**
* Path to DWN Event Stream plugin to use. Default single-node implementation will be used if left empty.
*/
eventStreamPluginPath: process.env.DWN_EVENT_STREAM_PLUGIN_PATH,

// where to store persistent data
messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data',
dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data',
eventLog: process.env.DWN_STORAGE_EVENTS || process.env.DWN_STORAGE || 'level://data',
resumableTaskStore: process.env.DWN_STORAGE_RESUMABLE_TASKS || process.env.DWN_STORAGE || 'level://data',

// tenant registration feature configuration
registrationStoreUrl: process.env.DWN_REGISTRATION_STORE_URL || process.env.DWN_STORAGE,
Expand Down
15 changes: 10 additions & 5 deletions src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import type { DwnServerConfig } from './config.js';
import log from 'loglevel';
import prefix from 'loglevel-plugin-prefix';
import { config as defaultConfig } from './config.js';
import { getDWNConfig } from './storage.js';
import { getDwnConfig } from './storage.js';
import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js';
import { HttpApi } from './http-api.js';
import { PluginLoader } from './plugin-loader.js';
import { RegistrationManager } from './registration/registration-manager.js';
import { WsApi } from './ws-api.js';
import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js';
Expand Down Expand Up @@ -100,12 +101,16 @@ export class DwnServer {

let eventStream: EventStream | undefined;
if (this.config.webSocketSupport) {
// setting `EventEmitterStream` as default the default `EventStream
// if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options.
eventStream = new EventEmitterStream();
// If Even Stream plugin is not specified, use `EventEmitterStream` implementation as default.
if (this.config.eventStreamPluginPath === undefined || this.config.eventStreamPluginPath === '') {
eventStream = new EventEmitterStream();
} else {
eventStream = await PluginLoader.loadPlugin<EventStream>(this.config.eventStreamPluginPath);
}

}

const dwnConfig = getDWNConfig(this.config, {
const dwnConfig = await getDwnConfig(this.config, {
didResolver: this.didResolver,
tenantGate: registrationManager,
eventStream,
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ export { DwnServerConfig } from './config.js';
export { DwnServer, DwnServerOptions } from './dwn-server.js';
export { HttpApi } from './http-api.js';
export { jsonRpcRouter } from './json-rpc-api.js';
export { EStoreType, BackendTypes, StoreType } from './storage.js';
export { StoreType, BackendTypes, DwnStore } from './storage.js';
export { WsApi } from './ws-api.js';
17 changes: 17 additions & 0 deletions src/plugin-loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* A utility class for dynamically loading plugins from file paths.
*/
export class PluginLoader {
/**
* Dynamically loads a plugin from a file path by invoking the argument-less constructor of the default exported class.
*/
public static async loadPlugin<T>(filePath: string): Promise<T> {
try {
const module = await import(filePath);
const instance: T = new module.default() as T;
return instance;
} catch (error) {
throw new Error(`Failed to load component at ${filePath}: ${error.message}`);
}
}
}
4 changes: 2 additions & 2 deletions src/registration/registration-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { RegistrationData, RegistrationRequest } from "./registration-types
import type { ProofOfWorkChallengeModel } from "./proof-of-work-types.js";
import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";
import type { ActiveTenantCheckResult, TenantGate } from "@tbd54566975/dwn-sdk-js";
import { getDialectFromURI } from "../storage.js";
import { getDialectFromUrl } from "../storage.js";
import { readFileSync } from "fs";

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ export class RegistrationManager implements TenantGate {
});

// Initialize RegistrationStore.
const sqlDialect = getDialectFromURI(new URL(registrationStoreUrl));
const sqlDialect = getDialectFromUrl(new URL(registrationStoreUrl));
const registrationStore = await RegistrationStore.create(sqlDialect);
registrationManager.registrationStore = registrationStore;

Expand Down
148 changes: 86 additions & 62 deletions src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
import * as fs from 'fs';

import {
DataStoreLevel,
EventLogLevel,
MessageStoreLevel,
ResumableTaskStoreLevel,
} from '@tbd54566975/dwn-sdk-js';
import type { DidResolver } from '@web5/dids';
import type {
DataStore,
Expand All @@ -17,6 +9,21 @@ import type {
TenantGate,
} from '@tbd54566975/dwn-sdk-js';
import type { Dialect } from '@tbd54566975/dwn-sql-store';
import type { DwnServerConfig } from './config.js';

import * as fs from 'fs';
import Cursor from 'pg-cursor';
import Database from 'better-sqlite3';
import pg from 'pg';
import { createPool as MySQLCreatePool } from 'mysql2';
import { PluginLoader } from './plugin-loader.js';

import {
DataStoreLevel,
EventLogLevel,
MessageStoreLevel,
ResumableTaskStoreLevel,
} from '@tbd54566975/dwn-sdk-js';
import {
DataStoreSql,
EventLogSql,
Expand All @@ -27,14 +34,7 @@ import {
SqliteDialect,
} from '@tbd54566975/dwn-sql-store';

import Database from 'better-sqlite3';
import { createPool as MySQLCreatePool } from 'mysql2';
import pg from 'pg';
import Cursor from 'pg-cursor';

import type { DwnServerConfig } from './config.js';

export enum EStoreType {
export enum StoreType {
DataStore,
MessageStore,
EventLog,
Expand All @@ -48,44 +48,44 @@ export enum BackendTypes {
POSTGRES = 'postgres',
}

export type StoreType = DataStore | EventLog | MessageStore | ResumableTaskStore;
export type DwnStore = DataStore | EventLog | MessageStore | ResumableTaskStore;

export function getDWNConfig(
export async function getDwnConfig(
config : DwnServerConfig,
options : {
didResolver? : DidResolver,
tenantGate? : TenantGate,
eventStream? : EventStream,
}
): DwnConfig {
): Promise<DwnConfig> {
const { tenantGate, eventStream, didResolver } = options;
const dataStore: DataStore = getStore(config.dataStore, EStoreType.DataStore);
const eventLog: EventLog = getStore(config.eventLog, EStoreType.EventLog);
const messageStore: MessageStore = getStore(config.messageStore, EStoreType.MessageStore);
const resumableTaskStore: ResumableTaskStore = getStore(config.messageStore, EStoreType.ResumableTaskStore);
const dataStore: DataStore = await getStore(config.dataStore, StoreType.DataStore);
const eventLog: EventLog = await getStore(config.eventLog, StoreType.EventLog);
const messageStore: MessageStore = await getStore(config.messageStore, StoreType.MessageStore);
const resumableTaskStore: ResumableTaskStore = await getStore(config.resumableTaskStore, StoreType.ResumableTaskStore);

return { didResolver, eventStream, eventLog, dataStore, messageStore, resumableTaskStore, tenantGate };
}

function getLevelStore(
storeURI: URL,
storeType: EStoreType,
): DataStore | MessageStore | EventLog | ResumableTaskStore {
storeType: StoreType,
): DwnStore {
switch (storeType) {
case EStoreType.DataStore:
case StoreType.DataStore:
return new DataStoreLevel({
blockstoreLocation: storeURI.host + storeURI.pathname + '/DATASTORE',
});
case EStoreType.MessageStore:
case StoreType.MessageStore:
return new MessageStoreLevel({
blockstoreLocation: storeURI.host + storeURI.pathname + '/MESSAGESTORE',
indexLocation: storeURI.host + storeURI.pathname + '/INDEX',
});
case EStoreType.EventLog:
case StoreType.EventLog:
return new EventLogLevel({
location: storeURI.host + storeURI.pathname + '/EVENTLOG',
});
case EStoreType.ResumableTaskStore:
case StoreType.ResumableTaskStore:
return new ResumableTaskStoreLevel({
location: storeURI.host + storeURI.pathname + '/RESUMABLE-TASK-STORE',
});
Expand All @@ -94,42 +94,45 @@ function getLevelStore(
}
}

function getDBStore(
db: Dialect,
storeType: EStoreType,
): DataStore | MessageStore | EventLog | ResumableTaskStore {
function getSqlStore(
connectionUrl: URL,
storeType: StoreType,
): DwnStore {
const dialect = getDialectFromUrl(connectionUrl);

switch (storeType) {
case EStoreType.DataStore:
return new DataStoreSql(db);
case EStoreType.MessageStore:
return new MessageStoreSql(db);
case EStoreType.EventLog:
return new EventLogSql(db);
case EStoreType.ResumableTaskStore:
return new ResumableTaskStoreSql(db);
case StoreType.DataStore:
return new DataStoreSql(dialect);
case StoreType.MessageStore:
return new MessageStoreSql(dialect);
case StoreType.EventLog:
return new EventLogSql(dialect);
case StoreType.ResumableTaskStore:
return new ResumableTaskStoreSql(dialect);
default:
throw new Error('Unexpected db store type');
throw new Error(`Unsupported store type ${storeType} for SQL store.`);
}
}

function getStore(
storeString: string,
storeType: EStoreType.DataStore,
): DataStore;
function getStore(
storeString: string,
storeType: EStoreType.EventLog,
): EventLog;
function getStore(
storeString: string,
storeType: EStoreType.MessageStore,
): MessageStore;
function getStore(
storeString: string,
storeType: EStoreType.ResumableTaskStore,
): ResumableTaskStore;
function getStore(storeString: string, storeType: EStoreType): StoreType {
const storeURI = new URL(storeString);
/**
* Check if the given string is a file path.
*/
function isFilePath(configString: string): boolean {
const filePathPrefixes = ['/', './', '../'];
return filePathPrefixes.some(prefix => configString.startsWith(prefix));
}

async function getStore(storeString: string, storeType: StoreType.DataStore): Promise<DataStore>;
async function getStore(storeString: string, storeType: StoreType.EventLog): Promise<EventLog>;
async function getStore(storeString: string, storeType: StoreType.MessageStore): Promise<MessageStore>;
async function getStore(storeString: string, storeType: StoreType.ResumableTaskStore): Promise<ResumableTaskStore>;
async function getStore(storeConfigString: string, storeType: StoreType): Promise<DwnStore> {
if (isFilePath(storeConfigString)) {
return await loadStoreFromFilePath(storeConfigString, storeType);
}
// else treat the `storeConfigString` as a connection string

const storeURI = new URL(storeConfigString);

switch (storeURI.protocol.slice(0, -1)) {
case BackendTypes.LEVEL:
Expand All @@ -138,14 +141,35 @@ function getStore(storeString: string, storeType: EStoreType): StoreType {
case BackendTypes.SQLITE:
case BackendTypes.MYSQL:
case BackendTypes.POSTGRES:
return getDBStore(getDialectFromURI(storeURI), storeType);
return getSqlStore(storeURI, storeType);

default:
throw invalidStorageSchemeMessage(storeURI.protocol);
}
}

export function getDialectFromURI(connectionUrl: URL): Dialect {
/**
* Loads a DWN store plugin of the given type from the given file path.
*/
async function loadStoreFromFilePath(
filePath: string,
storeType: StoreType,
): Promise<DwnStore> {
switch (storeType) {
case StoreType.DataStore:
return await PluginLoader.loadPlugin<DataStore>(filePath);
case StoreType.EventLog:
return await PluginLoader.loadPlugin<EventLog>(filePath);
case StoreType.MessageStore:
return await PluginLoader.loadPlugin<MessageStore>(filePath);
case StoreType.ResumableTaskStore:
return await PluginLoader.loadPlugin<ResumableTaskStore>(filePath);
default:
throw new Error(`Loading store for unsupported store type ${storeType} from path ${filePath}`);
}
}

export function getDialectFromUrl(connectionUrl: URL): Dialect {
switch (connectionUrl.protocol.slice(0, -1)) {
case BackendTypes.SQLITE:
const path = connectionUrl.host + connectionUrl.pathname;
Expand Down
4 changes: 2 additions & 2 deletions src/web5-connect/web5-connect-server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getDialectFromURI } from "../storage.js";
import { getDialectFromUrl } from "../storage.js";
import { randomUuid } from '@web5/crypto/utils';
import { SqlTtlCache } from "./sql-ttl-cache.js";

Expand Down Expand Up @@ -49,7 +49,7 @@ export class Web5ConnectServer {
const web5ConnectServer = new Web5ConnectServer({ baseUrl });

// Initialize TTL cache.
const sqlDialect = getDialectFromURI(new URL(sqlTtlCacheUrl));
const sqlDialect = getDialectFromUrl(new URL(sqlTtlCacheUrl));
web5ConnectServer.cache = await SqlTtlCache.create(sqlDialect);

return web5ConnectServer;
Expand Down
Loading

0 comments on commit 92549e5

Please sign in to comment.