Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a simple SQL based TTL cache #140

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ Configuration can be set using environment variables
| `DWN_REGISTRATION_PROOF_OF_WORK_SEED` | Seed to generate the challenge nonce from, this allows all DWN instances in a cluster to generate the same challenge. | unset |
| `DWN_REGISTRATION_PROOF_OF_WORK_ENABLED` | Require new users to complete a proof-of-work challenge | `false` |
| `DWN_REGISTRATION_PROOF_OF_WORK_INITIAL_MAX_HASH` | Initial maximum allowed hash in 64 char HEX string. The more leading zeros (smaller number) the higher the difficulty. | `false` |
| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset |
| `DWN_STORAGE` | URL to use for storage by default. See [Storage Options](#storage-options) for details | `level://data` |
| `DWN_STORAGE_MESSAGES` | URL to use for storage of messages. | value of `DWN_STORAGE` |
| `DWN_STORAGE_DATA` | URL to use for data storage | value of `DWN_STORAGE` |
| `DWN_STORAGE_EVENTS` | URL to use for event storage | value of `DWN_STORAGE` |
| `DWN_TERMS_OF_SERVICE_FILE_PATH` | Required terms of service agreement if set. Value is path to the terms of service file. | unset |
| `DWN_TTL_CACHE_URL` | URL of the TTL cache used by the DWN. Currently only supports SQL databases. | `sqlite://` |

### Storage Options

Expand Down
18 changes: 16 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ export const config = {
*/
baseUrl: process.env.DWN_BASE_URL || 'http://localhost',

/**
* Port that server listens on.
*/
port: parseInt(process.env.DS_PORT || '3000'),

/**
* The URL of the TTL cache used by the DWN.
* NOTE: Used for session/state keeping, thus requires the cache to be commonly addressable by nodes in a cloud cluster environment.
*
* Currently only supports SQL databases, e.g.
* Postgres: 'postgres://root:dwn@localhost:5432/dwn'
* MySQL: 'mysql://root:dwn@localhost:3306/dwn'
*/
ttlCacheUrl: process.env.DWN_TTL_CACHE_URL || 'sqlite://',

/**
* Used to populate the `version` and `sdkVersion` properties returned by the `/info` endpoint.
*
Expand All @@ -28,8 +43,7 @@ export const config = {
packageJsonPath: process.env.npm_package_json || process.env.DWN_SERVER_PACKAGE_JSON || '/dwn-server/package.json',
// max size of data that can be provided with a RecordsWrite
maxRecordDataSize: bytes(process.env.MAX_RECORD_DATA_SIZE || '1gb'),
// port that server listens on
port: parseInt(process.env.DS_PORT || '3000'),

// whether to enable 'ws:'
webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,
// where to store persistent data
Expand Down
2 changes: 1 addition & 1 deletion src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class DwnServer {
}));
}

this.#httpApi = new HttpApi(this.config, this.dwn, registrationManager);
this.#httpApi = await HttpApi.create(this.config, this.dwn, registrationManager);

await this.#httpApi.start(this.config.port, () => {
log.info(`HttpServer listening on port ${this.config.port}`);
Expand Down
31 changes: 19 additions & 12 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,45 @@ export class HttpApi {
registrationManager: RegistrationManager;
dwn: Dwn;

constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) {
private constructor() { }

public static async create(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager): Promise<HttpApi> {
const httpApi = new HttpApi();

log.info(config);

this.#packageInfo = {
httpApi.#packageInfo = {
server: config.serverName,
};

try {
// We populate the `version` and `sdkVersion` properties from the `package.json` file.
const packageJson = JSON.parse(readFileSync(config.packageJsonPath).toString());
this.#packageInfo.version = packageJson.version;
this.#packageInfo.sdkVersion = packageJson.dependencies ? packageJson.dependencies['@tbd54566975/dwn-sdk-js'] : undefined;
httpApi.#packageInfo.version = packageJson.version;
httpApi.#packageInfo.sdkVersion = packageJson.dependencies ? packageJson.dependencies['@tbd54566975/dwn-sdk-js'] : undefined;
} catch (error: any) {
log.error('could not read `package.json` for version info', error);
}

this.#config = config;
this.#api = express();
this.#server = http.createServer(this.#api);
this.dwn = dwn;
httpApi.#config = config;
httpApi.#api = express();
httpApi.#server = http.createServer(httpApi.#api);
httpApi.dwn = dwn;

if (registrationManager !== undefined) {
this.registrationManager = registrationManager;
httpApi.registrationManager = registrationManager;
}

// create the Web5 Connect Server
this.web5ConnectServer = new Web5ConnectServer({
httpApi.web5ConnectServer = await Web5ConnectServer.create({
baseUrl: `${config.baseUrl}:${config.port}`,
sqlTtlCacheUrl: config.ttlCacheUrl,
});

this.#setupMiddleware();
this.#setupRoutes();
httpApi.#setupMiddleware();
httpApi.#setupRoutes();

return httpApi;
}

get server(): http.Server {
Expand Down
16 changes: 8 additions & 8 deletions src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,27 @@ function getStore(storeString: string, storeType: EStoreType): StoreType {
}
}

export function getDialectFromURI(u: URL): Dialect {
switch (u.protocol.slice(0, -1)) {
export function getDialectFromURI(connectionUrl: URL): Dialect {
switch (connectionUrl.protocol.slice(0, -1)) {
case BackendTypes.SQLITE:
const path = u.host + u.pathname;
const path = connectionUrl.host + connectionUrl.pathname;
console.log('SQL-lite relative path:', path ? path : undefined); // NOTE, using ? for lose equality comparison

if (u.host && !fs.existsSync(u.host)) {
console.log('SQL-lite directory does not exist, creating:', u.host);
fs.mkdirSync(u.host, { recursive: true });
if (connectionUrl.host && !fs.existsSync(connectionUrl.host)) {
console.log('SQL-lite directory does not exist, creating:', connectionUrl.host);
fs.mkdirSync(connectionUrl.host, { recursive: true });
}

return new SqliteDialect({
database: async () => new Database(path),
});
case BackendTypes.MYSQL:
return new MysqlDialect({
pool: async () => MySQLCreatePool(u.toString()),
pool: async () => MySQLCreatePool(connectionUrl.toString()),
});
case BackendTypes.POSTGRES:
return new PostgresDialect({
pool: async () => new pg.Pool({ u }),
pool: async () => new pg.Pool({ connectionString: connectionUrl.toString() }),
cursor: Cursor,
});
}
Expand Down
126 changes: 126 additions & 0 deletions src/web5-connect/sql-ttl-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import type { Dialect } from '@tbd54566975/dwn-sql-store';
import { Kysely } from 'kysely';

/**
* The SqlTtlCache is responsible for storing and retrieving cache data with TTL (Time-to-Live).
*/
export class SqlTtlCache {
private static readonly cacheTableName = 'cacheEntries';
private static readonly cleanupIntervalInSeconds = 60;

private db: Kysely<CacheDatabase>;
private cleanupTimer: NodeJS.Timeout;

private constructor(sqlDialect: Dialect) {
this.db = new Kysely<CacheDatabase>({ dialect: sqlDialect });
}

/**
* Creates a new SqlTtlCache instance.
*/
public static async create(sqlDialect: Dialect): Promise<SqlTtlCache> {
const cacheManager = new SqlTtlCache(sqlDialect);

await cacheManager.initialize();

return cacheManager;
}

private async initialize(): Promise<void> {
await this.db.schema
.createTable(SqlTtlCache.cacheTableName)
.ifNotExists()
// 512 chars to accommodate potentially large `state` in Web5 Connect flow
.addColumn('key', 'varchar(512)', (column) => column.primaryKey())
.addColumn('value', 'text', (column) => column.notNull())
.addColumn('expiry', 'integer', (column) => column.notNull())
.execute();

// Start the cleanup timer
this.startCleanupTimer();
}

/**
* Starts a timer to periodically clean up expired cache entries.
*/
private startCleanupTimer(): void {
this.cleanupTimer = setInterval(async () => {
await this.cleanUpExpiredEntries();
}, SqlTtlCache.cleanupIntervalInSeconds * 1000);
}

/**
* Inserts a cache entry.
* @param ttl The time-to-live in seconds.
*/
public async insert(key: string, value: object, ttl: number): Promise<void> {
const expiry = Date.now() + (ttl * 1000);

const objectString = JSON.stringify(value);

await this.db
.insertInto(SqlTtlCache.cacheTableName)
.values({ key, value: objectString, expiry })
.execute();
}

/**
* Retrieves a cache entry if it is not expired and cleans up expired entries.
*/
public async get(key: string): Promise<object | undefined> {
// clean up expired entries but no need to await for it to finish
this.cleanUpExpiredEntries();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanity: What is the purpose of calling this here since it's running on an interval + the particular record's expiry is checked/deleted below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, keeping it simple. Removed.


const result = await this.db
.selectFrom(SqlTtlCache.cacheTableName)
.select('key')
.select('value')
.select('expiry')
.where('key', '=', key)
.execute();

if (result.length === 0) {
return undefined;
}

const entry = result[0];

// if the entry is expired, don't return it and delete it
if (Date.now() >= entry.expiry) {
this.delete(key); // no need to await
return undefined;
}

return JSON.parse(entry.value);
}

/**
* Deletes a cache entry.
*/
public async delete(key: string): Promise<void> {
await this.db
.deleteFrom(SqlTtlCache.cacheTableName)
.where('key', '=', key)
.execute();
}

/**
* Periodically clean up expired cache entries.
*/
public async cleanUpExpiredEntries(): Promise<void> {
await this.db
.deleteFrom(SqlTtlCache.cacheTableName)
.where('expiry', '<', Date.now())
.execute();
}
}

interface CacheEntry {
key: string;
value: string;
expiry: number;
}

interface CacheDatabase {
cacheEntries: CacheEntry;
}
34 changes: 25 additions & 9 deletions src/web5-connect/web5-connect-server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { getDialectFromURI } from "../storage.js";
import { randomUuid } from '@web5/crypto/utils';
import { SqlTtlCache } from "./sql-ttl-cache.js";

/**
* The Web5 Connect Request object.
Expand Down Expand Up @@ -29,16 +31,30 @@ export type SetWeb5ConnectRequestResult = {
* The Web5 Connect Server is responsible for handling the Web5 Connect flow.
*/
export class Web5ConnectServer {
public static readonly ttlInSeconds = 600;

private baseUrl: string;
private dataStore = new Map(); // TODO: turn this into a TTL cache (https://github.com/TBD54566975/dwn-server/issues/138)
private cache: SqlTtlCache;

/**
* Creates a new instance of the Web5 Connect Server.
* @param params.baseUrl The the base URL of the connect server including the port.
LiranCohen marked this conversation as resolved.
Show resolved Hide resolved
* This is given to the Identity Provider (wallet) to fetch the Web5 Connect Request object.
*/
public constructor({ baseUrl }: {
public static async create({ baseUrl, sqlTtlCacheUrl }: {
baseUrl: string;
sqlTtlCacheUrl: string;
}): Promise<Web5ConnectServer> {
const web5ConnectServer = new Web5ConnectServer({ baseUrl });

// Initialize TTL cache.
const sqlDialect = getDialectFromURI(new URL(sqlTtlCacheUrl));
web5ConnectServer.cache = await SqlTtlCache.create(sqlDialect);

return web5ConnectServer;
}

private constructor({ baseUrl }: {
baseUrl: string;
}) {
this.baseUrl = baseUrl;
Expand All @@ -54,22 +70,22 @@ export class Web5ConnectServer {
const request_uri = `${this.baseUrl}/connect/${requestId}.jwt`;

// Store the Request Object.
this.dataStore.set(`request:${requestId}`, request);
this.cache.insert(`request:${requestId}`, request, Web5ConnectServer.ttlInSeconds);

return {
request_uri,
expires_in : 600,
expires_in : Web5ConnectServer.ttlInSeconds,
};
}

/**
* Returns the Web5 Connect Request object. The request ID can only be used once.
*/
public async getWeb5ConnectRequest(requestId: string): Promise<Web5ConnectRequest | undefined> {
const request = this.dataStore.get(`request:${requestId}`);
const request = this.cache.get(`request:${requestId}`);

// Delete the Request Object from the data store now that it has been retrieved.
this.dataStore.delete(`request:${requestId}`);
this.cache.delete(`request:${requestId}`);

return request;
}
Expand All @@ -78,17 +94,17 @@ export class Web5ConnectServer {
* Sets the Web5 Connect Response object, which is also an OIDC ID token.
*/
public async setWeb5ConnectResponse(state: string, response: Web5ConnectResponse): Promise<any> {
this.dataStore.set(`response:${state}`, response);
this.cache.insert(`response:${state}`, response, Web5ConnectServer.ttlInSeconds);
}

/**
* Gets the Web5 Connect Response object. The `state` string can only be used once.
*/
public async getWeb5ConnectResponse(state: string): Promise<Web5ConnectResponse | undefined> {
const response = this. dataStore.get(`response:${state}`);
const response = this. cache.get(`response:${state}`);

// Delete the Response object from the data store now that it has been retrieved.
this.dataStore.delete(`response:${state}`);
this.cache.delete(`response:${state}`);

return response;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/connection/connection-manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('InMemoryConnectionManager', () => {
beforeEach(async () => {
dwn = await getTestDwn({ withEvents: true });
connectionManager = new InMemoryConnectionManager(dwn);
const httpApi = new HttpApi(config, dwn);
const httpApi = await HttpApi.create(config, dwn);
server = await httpApi.start(9002);
wsApi = new WsApi(server, dwn, connectionManager);
wsApi.start();
Expand Down
Loading
Loading