Skip to content

Commit

Permalink
use redis
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Feb 7, 2025
1 parent 6d64d74 commit 8876d26
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 36 deletions.
86 changes: 75 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"fastify": "^4.26.2",
"graphql": "^16.0.0",
"graphql-request": "^6.1.0",
"lru-cache": "^11.0.2",
"ioredis": "^5.5.0",
"node-fetch": "^2.6.7",
"object-hash": "^3.0.0",
"postgres": "^3.2.4",
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/runWormhole.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import { insertTransactionRow } from "../utils/wrappa/postgres/write";
import { getBridgeID } from "../utils/wrappa/postgres/query";
import dayjs from "dayjs";
import { insertConfigEntriesForAdapter } from "../utils/adapter";
import { cache } from "../utils/cache";
import { getCache, setCache } from "../utils/cache";

const END_TS_KEY = "wormhole_end_ts";

export const handler = async () => {
const previousEndTs = cache.get(END_TS_KEY);
const previousEndTs = await getCache(END_TS_KEY);
let currentEndTs = previousEndTs;
try {
await insertConfigEntriesForAdapter(adapter, "wormhole");
Expand Down Expand Up @@ -132,7 +132,7 @@ export const handler = async () => {
start += BATCH_SIZE;
}
currentEndTs = events[events.length - 1].block_timestamp;
cache.set(END_TS_KEY, currentEndTs);
await setCache(END_TS_KEY, currentEndTs);
} catch (error) {
console.error("Error processing Wormhole events:", error);
throw error;
Expand Down
8 changes: 4 additions & 4 deletions src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import getTransactions from "../handlers/getTransactions";
import runAdapter from "../handlers/runAdapter";
import getBridgeStatsOnDay from "../handlers/getBridgeStatsOnDay";
import cron from "./cron";
import { generateApiCacheKey, cache, registerCacheHandler, warmCache, needsWarming } from "../utils/cache";
import { generateApiCacheKey, registerCacheHandler, warmCache, needsWarming, setCache, getCache } from "../utils/cache";
import { startHealthMonitoring, getHealthStatus } from "./health";

dotenv.config();
Expand All @@ -35,12 +35,12 @@ const lambdaToFastify = (handler: Function) => async (request: any, reply: any)
};

const cacheKey = generateApiCacheKey(event);
const cachedData = cache.get(cacheKey);
const cachedData = await getCache(cacheKey);

registerCacheHandler(cacheKey, () => handler(event));

if (cachedData) {
if (needsWarming(cacheKey)) {
if (await needsWarming(cacheKey)) {
warmCache(cacheKey);
}
return reply.code(200).send(cachedData);
Expand All @@ -55,7 +55,7 @@ const lambdaToFastify = (handler: Function) => async (request: any, reply: any)

const result = await Promise.race([handler(event), timeout]);
const parsedBody = JSON.parse(result.body);
cache.set(cacheKey, parsedBody);
await setCache(cacheKey, parsedBody);
return reply.code(result.statusCode).send(parsedBody);
} catch (error: any) {
request.log.error(error);
Expand Down
6 changes: 3 additions & 3 deletions src/utils/bridgeVolume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
getConfigsWithDestChain,
} from "./wrappa/postgres/query";
import { importBridgeNetwork } from "../data/importBridgeNetwork";
import { cache } from "./cache";
import { getCache, setCache } from "./cache";

const startTimestampToRestrictTo = 1661990400; // Sept. 01, 2022: timestamp data is backfilled to

Expand Down Expand Up @@ -126,7 +126,7 @@ export const getHourlyBridgeVolume = async (
) => {
let bridgeDbName = undefined as any;
const cacheKey = `hourly_bridge_volume_${bridgeNetworkId}_${chain}_${startTimestamp}_${endTimestamp}`;
const cachedData = await cache.get(cacheKey);
const cachedData = await getCache(cacheKey);
if (cachedData) {
return cachedData as any[];
}
Expand Down Expand Up @@ -225,7 +225,7 @@ export const getHourlyBridgeVolume = async (
}
}
*/
await cache.set(cacheKey, hourlyBridgeVolume);
await setCache(cacheKey, hourlyBridgeVolume);

return hourlyBridgeVolume;
};
44 changes: 30 additions & 14 deletions src/utils/cache.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { LRUCache } from "lru-cache";
import Redis from "ioredis";
import hash from "object-hash";

const REDIS_URL = process.env.REDIS_URL;

export const redis = new Redis(REDIS_URL!, {
retryStrategy: (times) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
});

interface APIEvent {
pathParameters?: Record<string, any>;
queryStringParameters?: Record<string, any>;
Expand All @@ -9,12 +18,6 @@ interface APIEvent {

export const handlerRegistry = new Map<string, Function>();

export const cache = new LRUCache<string, any>({
max: 1000,
ttl: 1000 * 60 * 60,
updateAgeOnGet: false,
});

export const generateApiCacheKey = (event: APIEvent): string => {
const eventToNormalize = {
path: event.pathParameters || {},
Expand All @@ -31,12 +34,13 @@ export const generateApiCacheKey = (event: APIEvent): string => {
};

export const CACHE_WARM_THRESHOLD = 1000 * 60 * 10;
export const DEFAULT_TTL = 600;

export const needsWarming = (cacheKey: string): boolean => {
if (!cache.has(cacheKey)) return true;

const ttlRemaining = cache.getRemainingTTL(cacheKey);
return ttlRemaining !== undefined && ttlRemaining < CACHE_WARM_THRESHOLD;
export const needsWarming = async (cacheKey: string): Promise<boolean> => {
const ttl = await redis.ttl(cacheKey);
if (ttl === -2) return true;
if (ttl === -1) return false;
return ttl * 1000 < CACHE_WARM_THRESHOLD;
};

export const warmCache = async (cacheKey: string): Promise<void> => {
Expand All @@ -47,7 +51,7 @@ export const warmCache = async (cacheKey: string): Promise<void> => {
try {
const result = await handler();
const parsedBody = JSON.parse(result.body);
cache.set(cacheKey, parsedBody);
await redis.set(cacheKey, JSON.stringify(parsedBody), "EX", DEFAULT_TTL);
} catch (error) {
throw error;
}
Expand All @@ -59,4 +63,16 @@ export const registerCacheHandler = (cacheKey: string, handler: Function) => {

export const getCacheKey = (...parts: (string | undefined)[]) => parts.filter(Boolean).join(":");

export const DEFAULT_TTL = 600;
export const getCache = async (key: string): Promise<any> => {
const value = await redis.get(key);
console.log("Cache HIT", key);
return value ? JSON.parse(value) : null;
};

export const setCache = async (key: string, value: any, ttl: number = DEFAULT_TTL): Promise<void> => {
await redis.set(key, JSON.stringify(value), "EX", ttl);
};

export const deleteCache = async (key: string): Promise<void> => {
await redis.del(key);
};

0 comments on commit 8876d26

Please sign in to comment.