Skip to content

Commit

Permalink
Implemented a simple SQL based TTL cache (#140)
Browse files Browse the repository at this point in the history
- Implemented a simple SQL based TTL cache.
- Introduced `HttpApi.create()` to support async object creation
pattern.
- Fixed bug in PostgreSQL dialect instantiation.
- 100% new code coverage.
  • Loading branch information
thehenrytsai authored Jul 1, 2024
1 parent 6471773 commit d93d543
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 39 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ Configuration can be set using environment variables
| `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset |
| `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` |
| `DWN_REGISTRATION_PROOF_OF_WORK_INITIAL_MAX_HASH` | Initial maximum allowed hash in 64 char HEX string. The more leading zeros (smaller number) the higher the difficulty. | `false` |
| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset |
| `DWN_STORAGE` | URL to use for storage by default. See [Storage Options](#storage-options) for details | `level://data` |
| `DWN_STORAGE_MESSAGES` | URL to use for storage of messages. | value of `DWN_STORAGE` |
| `DWN_STORAGE_DATA` | URL to use for data storage | value of `DWN_STORAGE` |
| `DWN_STORAGE_EVENTS` | URL to use for event storage | value of `DWN_STORAGE` |
| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset |
| `DWN_TTL_CACHE_URL` | URL of the TTL cache used by the DWN. Currently only supports SQL databases. | `sqlite://` |

### Storage Options

Expand Down
18 changes: 16 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ export const config = {
*/
baseUrl: process.env.DWN_BASE_URL || 'http://localhost',

/**
* Port that server listens on.
*/
port: parseInt(process.env.DS_PORT || '3000'),

/**
* The URL of the TTL cache used by the DWN.
* NOTE: Used for session/state keeping, thus requires the cache to be commonly addressable by nodes in a cloud cluster environment.
*
* Currently only supports SQL databases, e.g.
* Postgres: 'postgres://root:dwn@localhost:5432/dwn'
* MySQL: 'mysql://root:dwn@localhost:3306/dwn'
*/
ttlCacheUrl: process.env.DWN_TTL_CACHE_URL || 'sqlite://',

/**
* Used to populate the `version` and `sdkVersion` properties returned by the `/info` endpoint.
*
Expand All @@ -28,8 +43,7 @@ export const config = {
packageJsonPath: process.env.npm_package_json || process.env.DWN_SERVER_PACKAGE_JSON || '/dwn-server/package.json',
// max size of data that can be provided with a RecordsWrite
maxRecordDataSize: bytes(process.env.MAX_RECORD_DATA_SIZE || '1gb'),
// port that server listens on
port: parseInt(process.env.DS_PORT || '3000'),

// whether to enable 'ws:'
webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,
// where to store persistent data
Expand Down
2 changes: 1 addition & 1 deletion src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class DwnServer {
}));
}

this.#httpApi = new HttpApi(this.config, this.dwn, registrationManager);
this.#httpApi = await HttpApi.create(this.config, this.dwn, registrationManager);

await this.#httpApi.start(this.config.port, () => {
log.info(`HttpServer listening on port ${this.config.port}`);
Expand Down
31 changes: 19 additions & 12 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,45 @@ export class HttpApi {
registrationManager: RegistrationManager;
dwn: Dwn;

constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) {
private constructor() { }

public static async create(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager): Promise<HttpApi> {
const httpApi = new HttpApi();

log.info(config);

this.#packageInfo = {
httpApi.#packageInfo = {
server: config.serverName,
};

try {
// We populate the `version` and `sdkVersion` properties from the `package.json` file.
const packageJson = JSON.parse(readFileSync(config.packageJsonPath).toString());
this.#packageInfo.version = packageJson.version;
this.#packageInfo.sdkVersion = packageJson.dependencies ? packageJson.dependencies['@tbd54566975/dwn-sdk-js'] : undefined;
httpApi.#packageInfo.version = packageJson.version;
httpApi.#packageInfo.sdkVersion = packageJson.dependencies ? packageJson.dependencies['@tbd54566975/dwn-sdk-js'] : undefined;
} catch (error: any) {
log.error('could not read `package.json` for version info', error);
}

this.#config = config;
this.#api = express();
this.#server = http.createServer(this.#api);
this.dwn = dwn;
httpApi.#config = config;
httpApi.#api = express();
httpApi.#server = http.createServer(httpApi.#api);
httpApi.dwn = dwn;

if (registrationManager !== undefined) {
this.registrationManager = registrationManager;
httpApi.registrationManager = registrationManager;
}

// create the Web5 Connect Server
this.web5ConnectServer = new Web5ConnectServer({
httpApi.web5ConnectServer = await Web5ConnectServer.create({
baseUrl: `${config.baseUrl}:${config.port}`,
sqlTtlCacheUrl: config.ttlCacheUrl,
});

this.#setupMiddleware();
this.#setupRoutes();
httpApi.#setupMiddleware();
httpApi.#setupRoutes();

return httpApi;
}

get server(): http.Server {
Expand Down
16 changes: 8 additions & 8 deletions src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,27 @@ function getStore(storeString: string, storeType: EStoreType): StoreType {
}
}

export function getDialectFromURI(u: URL): Dialect {
switch (u.protocol.slice(0, -1)) {
export function getDialectFromURI(connectionUrl: URL): Dialect {
switch (connectionUrl.protocol.slice(0, -1)) {
case BackendTypes.SQLITE:
const path = u.host + u.pathname;
const path = connectionUrl.host + connectionUrl.pathname;
console.log('SQL-lite relative path:', path ? path : undefined); // NOTE, using ? for lose equality comparison

if (u.host && !fs.existsSync(u.host)) {
console.log('SQL-lite directory does not exist, creating:', u.host);
fs.mkdirSync(u.host, { recursive: true });
if (connectionUrl.host && !fs.existsSync(connectionUrl.host)) {
console.log('SQL-lite directory does not exist, creating:', connectionUrl.host);
fs.mkdirSync(connectionUrl.host, { recursive: true });
}

return new SqliteDialect({
database: async () => new Database(path),
});
case BackendTypes.MYSQL:
return new MysqlDialect({
pool: async () => MySQLCreatePool(u.toString()),
pool: async () => MySQLCreatePool(connectionUrl.toString()),
});
case BackendTypes.POSTGRES:
return new PostgresDialect({
pool: async () => new pg.Pool({ u }),
pool: async () => new pg.Pool({ connectionString: connectionUrl.toString() }),
cursor: Cursor,
});
}
Expand Down
123 changes: 123 additions & 0 deletions src/web5-connect/sql-ttl-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import type { Dialect } from '@tbd54566975/dwn-sql-store';
import { Kysely } from 'kysely';

/**
* The SqlTtlCache is responsible for storing and retrieving cache data with TTL (Time-to-Live).
*/
export class SqlTtlCache {
private static readonly cacheTableName = 'cacheEntries';
private static readonly cleanupIntervalInSeconds = 60;

private db: Kysely<CacheDatabase>;
private cleanupTimer: NodeJS.Timeout;

private constructor(sqlDialect: Dialect) {
this.db = new Kysely<CacheDatabase>({ dialect: sqlDialect });
}

/**
* Creates a new SqlTtlCache instance.
*/
public static async create(sqlDialect: Dialect): Promise<SqlTtlCache> {
const cacheManager = new SqlTtlCache(sqlDialect);

await cacheManager.initialize();

return cacheManager;
}

private async initialize(): Promise<void> {
await this.db.schema
.createTable(SqlTtlCache.cacheTableName)
.ifNotExists()
// 512 chars to accommodate potentially large `state` in Web5 Connect flow
.addColumn('key', 'varchar(512)', (column) => column.primaryKey())
.addColumn('value', 'text', (column) => column.notNull())
.addColumn('expiry', 'integer', (column) => column.notNull())
.execute();

// Start the cleanup timer
this.startCleanupTimer();
}

/**
* Starts a timer to periodically clean up expired cache entries.
*/
private startCleanupTimer(): void {
this.cleanupTimer = setInterval(async () => {
await this.cleanUpExpiredEntries();
}, SqlTtlCache.cleanupIntervalInSeconds * 1000);
}

/**
* Inserts a cache entry.
* @param ttl The time-to-live in seconds.
*/
public async insert(key: string, value: object, ttl: number): Promise<void> {
const expiry = Date.now() + (ttl * 1000);

const objectString = JSON.stringify(value);

await this.db
.insertInto(SqlTtlCache.cacheTableName)
.values({ key, value: objectString, expiry })
.execute();
}

/**
* Retrieves a cache entry if it is not expired and cleans up expired entries.
*/
public async get(key: string): Promise<object | undefined> {
const result = await this.db
.selectFrom(SqlTtlCache.cacheTableName)
.select('key')
.select('value')
.select('expiry')
.where('key', '=', key)
.execute();

if (result.length === 0) {
return undefined;
}

const entry = result[0];

// if the entry is expired, don't return it and delete it
if (Date.now() >= entry.expiry) {
this.delete(key); // no need to await
return undefined;
}

return JSON.parse(entry.value);
}

/**
* Deletes a cache entry.
*/
public async delete(key: string): Promise<void> {
await this.db
.deleteFrom(SqlTtlCache.cacheTableName)
.where('key', '=', key)
.execute();
}

/**
* Periodically clean up expired cache entries.
*/
public async cleanUpExpiredEntries(): Promise<void> {
await this.db
.deleteFrom(SqlTtlCache.cacheTableName)
.where('expiry', '<', Date.now())
.execute();
}
}

interface CacheEntry {
key: string;
value: string;
expiry: number;
}

interface CacheDatabase {
cacheEntries: CacheEntry;
}
35 changes: 26 additions & 9 deletions src/web5-connect/web5-connect-server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { getDialectFromURI } from "../storage.js";
import { randomUuid } from '@web5/crypto/utils';
import { SqlTtlCache } from "./sql-ttl-cache.js";

/**
* The Web5 Connect Request object.
Expand Down Expand Up @@ -29,16 +31,31 @@ export type SetWeb5ConnectRequestResult = {
* The Web5 Connect Server is responsible for handling the Web5 Connect flow.
*/
export class Web5ConnectServer {
public static readonly ttlInSeconds = 600;

private baseUrl: string;
private dataStore = new Map(); // TODO: turn this into a TTL cache (https://github.com/TBD54566975/dwn-server/issues/138)
private cache: SqlTtlCache;

/**
* Creates a new instance of the Web5 Connect Server.
* @param params.baseUrl The the base URL of the connect server including the port.
* This is given to the Identity Provider (wallet) to fetch the Web5 Connect Request object.
* @param params.sqlTtlCacheUrl The URL of the SQL database to use as the TTL cache.
*/
public constructor({ baseUrl }: {
public static async create({ baseUrl, sqlTtlCacheUrl }: {
baseUrl: string;
sqlTtlCacheUrl: string;
}): Promise<Web5ConnectServer> {
const web5ConnectServer = new Web5ConnectServer({ baseUrl });

// Initialize TTL cache.
const sqlDialect = getDialectFromURI(new URL(sqlTtlCacheUrl));
web5ConnectServer.cache = await SqlTtlCache.create(sqlDialect);

return web5ConnectServer;
}

private constructor({ baseUrl }: {
baseUrl: string;
}) {
this.baseUrl = baseUrl;
Expand All @@ -54,22 +71,22 @@ export class Web5ConnectServer {
const request_uri = `${this.baseUrl}/connect/${requestId}.jwt`;

// Store the Request Object.
this.dataStore.set(`request:${requestId}`, request);
this.cache.insert(`request:${requestId}`, request, Web5ConnectServer.ttlInSeconds);

return {
request_uri,
expires_in : 600,
expires_in : Web5ConnectServer.ttlInSeconds,
};
}

/**
* Returns the Web5 Connect Request object. The request ID can only be used once.
*/
public async getWeb5ConnectRequest(requestId: string): Promise<Web5ConnectRequest | undefined> {
const request = this.dataStore.get(`request:${requestId}`);
const request = this.cache.get(`request:${requestId}`);

// Delete the Request Object from the data store now that it has been retrieved.
this.dataStore.delete(`request:${requestId}`);
this.cache.delete(`request:${requestId}`);

return request;
}
Expand All @@ -78,17 +95,17 @@ export class Web5ConnectServer {
* Sets the Web5 Connect Response object, which is also an OIDC ID token.
*/
public async setWeb5ConnectResponse(state: string, response: Web5ConnectResponse): Promise<any> {
this.dataStore.set(`response:${state}`, response);
this.cache.insert(`response:${state}`, response, Web5ConnectServer.ttlInSeconds);
}

/**
* Gets the Web5 Connect Response object. The `state` string can only be used once.
*/
public async getWeb5ConnectResponse(state: string): Promise<Web5ConnectResponse | undefined> {
const response = this. dataStore.get(`response:${state}`);
const response = this. cache.get(`response:${state}`);

// Delete the Response object from the data store now that it has been retrieved.
this.dataStore.delete(`response:${state}`);
this.cache.delete(`response:${state}`);

return response;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/connection/connection-manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('InMemoryConnectionManager', () => {
beforeEach(async () => {
dwn = await getTestDwn({ withEvents: true });
connectionManager = new InMemoryConnectionManager(dwn);
const httpApi = new HttpApi(config, dwn);
const httpApi = await HttpApi.create(config, dwn);
server = await httpApi.start(9002);
wsApi = new WsApi(server, dwn, connectionManager);
wsApi.start();
Expand Down
Loading

0 comments on commit d93d543

Please sign in to comment.