From d93d54303e63d7361063abe4fb7163af8fb9df1a Mon Sep 17 00:00:00 2001 From: Henry Tsai Date: Mon, 1 Jul 2024 10:27:37 -0700 Subject: [PATCH] Implemented a simple SQL based TTL cache (#140) - Implemented a simple SQL based TTL cache. - Introduced `HttpApi.create()` to support async object creation pattern. - Fixed bug in PostgreSQL dialect instantiation. - 100% new code coverage. --- README.md | 3 +- src/config.ts | 18 ++- src/dwn-server.ts | 2 +- src/http-api.ts | 31 +++-- src/storage.ts | 16 +-- src/web5-connect/sql-ttl-cache.ts | 123 ++++++++++++++++++++ src/web5-connect/web5-connect-server.ts | 35 ++++-- tests/connection/connection-manager.spec.ts | 2 +- tests/http-api.spec.ts | 8 +- tests/scenarios/web5-connect.spec.ts | 35 ++++++ tests/ws-api.spec.ts | 2 +- 11 files changed, 236 insertions(+), 39 deletions(-) create mode 100644 src/web5-connect/sql-ttl-cache.ts diff --git a/README.md b/README.md index e8a377e..e5e680f 100644 --- a/README.md +++ b/README.md @@ -286,11 +286,12 @@ Configuration can be set using environment variables | `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset | | `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` | | `DWN_REGISTRATION_PROOF_OF_WORK_INITIAL_MAX_HASH` | Initial maximum allowed hash in 64 char HEX string. The more leading zeros (smaller number) the higher the difficulty. | `false` | -| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset | | `DWN_STORAGE` | URL to use for storage by default. See [Storage Options](#storage-options) for details | `level://data` | | `DWN_STORAGE_MESSAGES` | URL to use for storage of messages. | value of `DWN_STORAGE` | | `DWN_STORAGE_DATA` | URL to use for data storage | value of `DWN_STORAGE` | | `DWN_STORAGE_EVENTS` | URL to use for event storage | value of `DWN_STORAGE` | +| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset | +| `DWN_TTL_CACHE_URL` | URL of the TTL cache used by the DWN. Currently only supports SQL databases. | `sqlite://` | ### Storage Options diff --git a/src/config.ts b/src/config.ts index 320c5e1..f6ce95a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -17,6 +17,21 @@ export const config = { */ baseUrl: process.env.DWN_BASE_URL || 'http://localhost', + /** + * Port that server listens on. + */ + port: parseInt(process.env.DS_PORT || '3000'), + + /** + * The URL of the TTL cache used by the DWN. + * NOTE: Used for session/state keeping, thus requires the cache to be commonly addressable by nodes in a cloud cluster environment. + * + * Currently only supports SQL databases, e.g. + * Postgres: 'postgres://root:dwn@localhost:5432/dwn' + * MySQL: 'mysql://root:dwn@localhost:3306/dwn' + */ + ttlCacheUrl: process.env.DWN_TTL_CACHE_URL || 'sqlite://', + /** * Used to populate the `version` and `sdkVersion` properties returned by the `/info` endpoint. * @@ -28,8 +43,7 @@ export const config = { packageJsonPath: process.env.npm_package_json || process.env.DWN_SERVER_PACKAGE_JSON || '/dwn-server/package.json', // max size of data that can be provided with a RecordsWrite maxRecordDataSize: bytes(process.env.MAX_RECORD_DATA_SIZE || '1gb'), - // port that server listens on - port: parseInt(process.env.DS_PORT || '3000'), + // whether to enable 'ws:' webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true, // where to store persistent data diff --git a/src/dwn-server.ts b/src/dwn-server.ts index cd606ce..eb822a4 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -74,7 +74,7 @@ export class DwnServer { })); } - this.#httpApi = new HttpApi(this.config, this.dwn, registrationManager); + this.#httpApi = await HttpApi.create(this.config, this.dwn, registrationManager); await this.#httpApi.start(this.config.port, () => { log.info(`HttpServer listening on port ${this.config.port}`); diff --git a/src/http-api.ts b/src/http-api.ts index 9a4a533..0c6baec 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -32,38 +32,45 @@ export class HttpApi { registrationManager: RegistrationManager; dwn: Dwn; - constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) { + private constructor() { } + + public static async create(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager): Promise { + const httpApi = new HttpApi(); + log.info(config); - this.#packageInfo = { + httpApi.#packageInfo = { server: config.serverName, }; try { // We populate the `version` and `sdkVersion` properties from the `package.json` file. const packageJson = JSON.parse(readFileSync(config.packageJsonPath).toString()); - this.#packageInfo.version = packageJson.version; - this.#packageInfo.sdkVersion = packageJson.dependencies ? packageJson.dependencies['@tbd54566975/dwn-sdk-js'] : undefined; + httpApi.#packageInfo.version = packageJson.version; + httpApi.#packageInfo.sdkVersion = packageJson.dependencies ? packageJson.dependencies['@tbd54566975/dwn-sdk-js'] : undefined; } catch (error: any) { log.error('could not read `package.json` for version info', error); } - this.#config = config; - this.#api = express(); - this.#server = http.createServer(this.#api); - this.dwn = dwn; + httpApi.#config = config; + httpApi.#api = express(); + httpApi.#server = http.createServer(httpApi.#api); + httpApi.dwn = dwn; if (registrationManager !== undefined) { - this.registrationManager = registrationManager; + httpApi.registrationManager = registrationManager; } // create the Web5 Connect Server - this.web5ConnectServer = new Web5ConnectServer({ + httpApi.web5ConnectServer = await Web5ConnectServer.create({ baseUrl: `${config.baseUrl}:${config.port}`, + sqlTtlCacheUrl: config.ttlCacheUrl, }); - this.#setupMiddleware(); - this.#setupRoutes(); + httpApi.#setupMiddleware(); + httpApi.#setupRoutes(); + + return httpApi; } get server(): http.Server { diff --git a/src/storage.ts b/src/storage.ts index b5dfba8..189f00f 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -143,15 +143,15 @@ function getStore(storeString: string, storeType: EStoreType): StoreType { } } -export function getDialectFromURI(u: URL): Dialect { - switch (u.protocol.slice(0, -1)) { +export function getDialectFromURI(connectionUrl: URL): Dialect { + switch (connectionUrl.protocol.slice(0, -1)) { case BackendTypes.SQLITE: - const path = u.host + u.pathname; + const path = connectionUrl.host + connectionUrl.pathname; console.log('SQL-lite relative path:', path ? path : undefined); // NOTE, using ? for lose equality comparison - if (u.host && !fs.existsSync(u.host)) { - console.log('SQL-lite directory does not exist, creating:', u.host); - fs.mkdirSync(u.host, { recursive: true }); + if (connectionUrl.host && !fs.existsSync(connectionUrl.host)) { + console.log('SQL-lite directory does not exist, creating:', connectionUrl.host); + fs.mkdirSync(connectionUrl.host, { recursive: true }); } return new SqliteDialect({ @@ -159,11 +159,11 @@ export function getDialectFromURI(u: URL): Dialect { }); case BackendTypes.MYSQL: return new MysqlDialect({ - pool: async () => MySQLCreatePool(u.toString()), + pool: async () => MySQLCreatePool(connectionUrl.toString()), }); case BackendTypes.POSTGRES: return new PostgresDialect({ - pool: async () => new pg.Pool({ u }), + pool: async () => new pg.Pool({ connectionString: connectionUrl.toString() }), cursor: Cursor, }); } diff --git a/src/web5-connect/sql-ttl-cache.ts b/src/web5-connect/sql-ttl-cache.ts new file mode 100644 index 0000000..4a0bef1 --- /dev/null +++ b/src/web5-connect/sql-ttl-cache.ts @@ -0,0 +1,123 @@ +import type { Dialect } from '@tbd54566975/dwn-sql-store'; +import { Kysely } from 'kysely'; + +/** + * The SqlTtlCache is responsible for storing and retrieving cache data with TTL (Time-to-Live). + */ +export class SqlTtlCache { + private static readonly cacheTableName = 'cacheEntries'; + private static readonly cleanupIntervalInSeconds = 60; + + private db: Kysely; + private cleanupTimer: NodeJS.Timeout; + + private constructor(sqlDialect: Dialect) { + this.db = new Kysely({ dialect: sqlDialect }); + } + + /** + * Creates a new SqlTtlCache instance. + */ + public static async create(sqlDialect: Dialect): Promise { + const cacheManager = new SqlTtlCache(sqlDialect); + + await cacheManager.initialize(); + + return cacheManager; + } + + private async initialize(): Promise { + await this.db.schema + .createTable(SqlTtlCache.cacheTableName) + .ifNotExists() + // 512 chars to accommodate potentially large `state` in Web5 Connect flow + .addColumn('key', 'varchar(512)', (column) => column.primaryKey()) + .addColumn('value', 'text', (column) => column.notNull()) + .addColumn('expiry', 'integer', (column) => column.notNull()) + .execute(); + + // Start the cleanup timer + this.startCleanupTimer(); + } + + /** + * Starts a timer to periodically clean up expired cache entries. + */ + private startCleanupTimer(): void { + this.cleanupTimer = setInterval(async () => { + await this.cleanUpExpiredEntries(); + }, SqlTtlCache.cleanupIntervalInSeconds * 1000); + } + + /** + * Inserts a cache entry. + * @param ttl The time-to-live in seconds. + */ + public async insert(key: string, value: object, ttl: number): Promise { + const expiry = Date.now() + (ttl * 1000); + + const objectString = JSON.stringify(value); + + await this.db + .insertInto(SqlTtlCache.cacheTableName) + .values({ key, value: objectString, expiry }) + .execute(); + } + + /** + * Retrieves a cache entry if it is not expired and cleans up expired entries. + */ + public async get(key: string): Promise { + const result = await this.db + .selectFrom(SqlTtlCache.cacheTableName) + .select('key') + .select('value') + .select('expiry') + .where('key', '=', key) + .execute(); + + if (result.length === 0) { + return undefined; + } + + const entry = result[0]; + + // if the entry is expired, don't return it and delete it + if (Date.now() >= entry.expiry) { + this.delete(key); // no need to await + return undefined; + } + + return JSON.parse(entry.value); + } + + /** + * Deletes a cache entry. + */ + public async delete(key: string): Promise { + await this.db + .deleteFrom(SqlTtlCache.cacheTableName) + .where('key', '=', key) + .execute(); + } + + /** + * Periodically clean up expired cache entries. + */ + public async cleanUpExpiredEntries(): Promise { + await this.db + .deleteFrom(SqlTtlCache.cacheTableName) + .where('expiry', '<', Date.now()) + .execute(); + } +} + +interface CacheEntry { + key: string; + value: string; + expiry: number; +} + +interface CacheDatabase { + cacheEntries: CacheEntry; +} diff --git a/src/web5-connect/web5-connect-server.ts b/src/web5-connect/web5-connect-server.ts index a0b7405..df158d0 100644 --- a/src/web5-connect/web5-connect-server.ts +++ b/src/web5-connect/web5-connect-server.ts @@ -1,4 +1,6 @@ +import { getDialectFromURI } from "../storage.js"; import { randomUuid } from '@web5/crypto/utils'; +import { SqlTtlCache } from "./sql-ttl-cache.js"; /** * The Web5 Connect Request object. @@ -29,16 +31,31 @@ export type SetWeb5ConnectRequestResult = { * The Web5 Connect Server is responsible for handling the Web5 Connect flow. */ export class Web5ConnectServer { + public static readonly ttlInSeconds = 600; private baseUrl: string; - private dataStore = new Map(); // TODO: turn this into a TTL cache (https://github.com/TBD54566975/dwn-server/issues/138) + private cache: SqlTtlCache; /** * Creates a new instance of the Web5 Connect Server. * @param params.baseUrl The the base URL of the connect server including the port. * This is given to the Identity Provider (wallet) to fetch the Web5 Connect Request object. + * @param params.sqlTtlCacheUrl The URL of the SQL database to use as the TTL cache. */ - public constructor({ baseUrl }: { + public static async create({ baseUrl, sqlTtlCacheUrl }: { + baseUrl: string; + sqlTtlCacheUrl: string; + }): Promise { + const web5ConnectServer = new Web5ConnectServer({ baseUrl }); + + // Initialize TTL cache. + const sqlDialect = getDialectFromURI(new URL(sqlTtlCacheUrl)); + web5ConnectServer.cache = await SqlTtlCache.create(sqlDialect); + + return web5ConnectServer; + } + + private constructor({ baseUrl }: { baseUrl: string; }) { this.baseUrl = baseUrl; @@ -54,11 +71,11 @@ export class Web5ConnectServer { const request_uri = `${this.baseUrl}/connect/${requestId}.jwt`; // Store the Request Object. - this.dataStore.set(`request:${requestId}`, request); + this.cache.insert(`request:${requestId}`, request, Web5ConnectServer.ttlInSeconds); return { request_uri, - expires_in : 600, + expires_in : Web5ConnectServer.ttlInSeconds, }; } @@ -66,10 +83,10 @@ export class Web5ConnectServer { * Returns the Web5 Connect Request object. The request ID can only be used once. */ public async getWeb5ConnectRequest(requestId: string): Promise { - const request = this.dataStore.get(`request:${requestId}`); + const request = this.cache.get(`request:${requestId}`); // Delete the Request Object from the data store now that it has been retrieved. - this.dataStore.delete(`request:${requestId}`); + this.cache.delete(`request:${requestId}`); return request; } @@ -78,17 +95,17 @@ export class Web5ConnectServer { * Sets the Web5 Connect Response object, which is also an OIDC ID token. */ public async setWeb5ConnectResponse(state: string, response: Web5ConnectResponse): Promise { - this.dataStore.set(`response:${state}`, response); + this.cache.insert(`response:${state}`, response, Web5ConnectServer.ttlInSeconds); } /** * Gets the Web5 Connect Response object. The `state` string can only be used once. */ public async getWeb5ConnectResponse(state: string): Promise { - const response = this. dataStore.get(`response:${state}`); + const response = this. cache.get(`response:${state}`); // Delete the Response object from the data store now that it has been retrieved. - this.dataStore.delete(`response:${state}`); + this.cache.delete(`response:${state}`); return response; } diff --git a/tests/connection/connection-manager.spec.ts b/tests/connection/connection-manager.spec.ts index ba6850b..37f246c 100644 --- a/tests/connection/connection-manager.spec.ts +++ b/tests/connection/connection-manager.spec.ts @@ -23,7 +23,7 @@ describe('InMemoryConnectionManager', () => { beforeEach(async () => { dwn = await getTestDwn({ withEvents: true }); connectionManager = new InMemoryConnectionManager(dwn); - const httpApi = new HttpApi(config, dwn); + const httpApi = await HttpApi.create(config, dwn); server = await httpApi.start(9002); wsApi = new WsApi(server, dwn, connectionManager); wsApi.start(); diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index 164ec06..6e882ee 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -69,7 +69,7 @@ describe('http api', function () { dwn = await getTestDwn({ tenantGate: registrationManager }); - httpApi = new HttpApi(config, dwn, registrationManager); + httpApi = await HttpApi.create(config, dwn, registrationManager); }); @@ -1064,7 +1064,7 @@ describe('http api', function () { server.closeAllConnections(); config.webSocketSupport = false; - httpApi = new HttpApi(config, dwn, registrationManager); + httpApi = await HttpApi.create(config, dwn, registrationManager); server = await httpApi.start(3000); resp = await fetch(`http://localhost:3000/info`); @@ -1088,7 +1088,7 @@ describe('http api', function () { // set the config to an invalid file path const packageJsonConfig = config.packageJsonPath; config.packageJsonPath = '/some/invalid/file.json'; - httpApi = new HttpApi(config, dwn, registrationManager); + httpApi = await HttpApi.create(config, dwn, registrationManager); server = await httpApi.start(3000); const resp = await fetch(`http://localhost:3000/info`); @@ -1117,7 +1117,7 @@ describe('http api', function () { // set a custom name for the `serverName` const serverName = config.serverName; config.serverName = '@web5/dwn-server-2' - httpApi = new HttpApi(config, dwn, registrationManager); + httpApi = await HttpApi.create(config, dwn, registrationManager); server = await httpApi.start(3000); const resp = await fetch(`http://localhost:3000/info`); diff --git a/tests/scenarios/web5-connect.spec.ts b/tests/scenarios/web5-connect.spec.ts index 54da59b..d1685f2 100644 --- a/tests/scenarios/web5-connect.spec.ts +++ b/tests/scenarios/web5-connect.spec.ts @@ -1,7 +1,10 @@ import fetch from 'node-fetch'; +import sinon from 'sinon'; import { config } from '../../src/config.js'; import { DwnServer } from '../../src/dwn-server.js'; import { expect } from 'chai'; +import { useFakeTimers } from 'sinon'; +import { Web5ConnectServer } from '../../src/web5-connect/web5-connect-server.js'; import { webcrypto } from 'node:crypto'; // node.js 18 and earlier needs globalThis.crypto polyfill @@ -13,6 +16,7 @@ if (!globalThis.crypto) { describe('Web5 Connect scenarios', function () { const web5ConnectBaseUrl = 'http://localhost:3000'; + let clock: sinon.SinonFakeTimers; let dwnServer: DwnServer; const dwnServerConfig = { ...config } // not touching the original config @@ -33,10 +37,15 @@ describe('Web5 Connect scenarios', function () { }); beforeEach(function () { + sinon.restore(); // wipe all previous stubs/spies/mocks/fakes/clock + + // IMPORTANT: MUST be called AFTER `sinon.restore()` because `sinon.restore()` resets fake timers + clock = useFakeTimers({ shouldAdvanceTime: true }); dwnServer.start(); }); afterEach(function () { + clock.restore(); dwnServer.stop(() => {}); }); @@ -113,4 +122,30 @@ describe('Web5 Connect scenarios', function () { }); expect(getWeb5ConnectResponseResult2.status).to.equal(404); }); + + it('should clean up objects that are expired', async () => { + // Scenario: + // 1. App sends the Web5 Connect Request object to the Web5 Connect server. + // 2. Time passes and the Web5 Connect Request object is expired. + // 3. Should receive 404 when fetching Web5 Connect Request. + + // 1. App sends the Web5 Connect Request object to the Web5 Connect server. + const requestBody = { request: { dummyProperty: 'dummyValue' } }; + const postWeb5ConnectRequestResult = await fetch(`${web5ConnectBaseUrl}/connect/par`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(requestBody), + }); + expect(postWeb5ConnectRequestResult.status).to.equal(201); + + // 2. Time passes and the Web5 Connect Request object is expired. + await clock.tickAsync(Web5ConnectServer.ttlInSeconds * 1000); + + // 3. Should receive 404 when fetching the expired Web5 Connect Request. + const requestUrl = (await postWeb5ConnectRequestResult.json() as any).request_uri; + const getWeb5ConnectRequestResult = await fetch(requestUrl, { + method: 'GET', + }); + expect(getWeb5ConnectRequestResult.status).to.equal(404); + }); }); diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index 34a2150..a5a9ef4 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -40,7 +40,7 @@ describe('websocket api', function () { beforeEach(async function () { dwn = await getTestDwn({ withEvents: true }); - httpApi = new HttpApi(config, dwn); + httpApi = await HttpApi.create(config, dwn); server = await httpApi.start(9002); wsApi = new WsApi(server, dwn); wsApi.start();