Skip to content

Commit

Permalink
watcher: add supervisor to monitor watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
panoel committed Apr 1, 2024
1 parent 90abed6 commit d5c6c3d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 1 deletion.
7 changes: 6 additions & 1 deletion watcher/src/consts.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { ChainName, CONTRACTS, Network } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { Environment } from '@wormhole-foundation/wormhole-monitor-common';
import { AxiosRequestConfig } from 'axios';

export const TIMEOUT = 0.5 * 1000;
export const HB_INTERVAL = 5 * 60 * 1000; // 5 Minutes
export type WorkerData = {
network: Environment;
chain: ChainName;
};

// Notes about RPCs
// Ethereum
Expand Down
2 changes: 2 additions & 0 deletions watcher/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getEnvironment,
getMode,
} from '@wormhole-foundation/wormhole-monitor-common';
import { startSupervisor } from './workers/supervisor';

initDb();

Expand Down Expand Up @@ -100,3 +101,4 @@ if (mode === 'vaa') {
} else {
throw new Error(`Unknown mode: ${mode}`);
}
startSupervisor(supportedChains);
4 changes: 4 additions & 0 deletions watcher/src/watchers/Watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { TIMEOUT } from '../consts';
import { VaasByBlock } from '../databases/types';
import { getResumeBlockByChain, storeLatestBlock, storeVaasByBlock } from '../databases/utils';
import { getLogger, WormholeLogger } from '../utils/logger';
import { parentPort } from 'worker_threads';

export class Watcher {
chain: ChainName;
Expand Down Expand Up @@ -105,6 +106,9 @@ export class Watcher {
this.logger.warn(`backing off for ${expoBacko}ms`);
await sleep(expoBacko);
}
if (parentPort) {
parentPort.postMessage('heartbeat');
}
}
}
}
63 changes: 63 additions & 0 deletions watcher/src/workers/supervisor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { ChainName } from '@certusone/wormhole-sdk';
import { Worker } from 'worker_threads';
import { HB_INTERVAL, WorkerData } from '../consts';
import { getLogger } from '../utils/logger';
import { Environment, getEnvironment } from '@wormhole-foundation/wormhole-monitor-common';

interface WorkerInfo {
worker: Worker;
data: WorkerData;
lastHB: number;
}

const workers: { [key: string]: WorkerInfo } = {};
const logger = getLogger('supervisor');
const network: Environment = getEnvironment();

function spawnWorker(data: WorkerData) {
const workerName = `${data.chain}Worker`;
logger.info(`Spawning worker ${workerName} on network ${network}...`);
const worker = new Worker('./dist/src/workers/worker.js', { workerData: data });

worker.on('message', (message) => {
if (message === 'heartbeat') {
logger.debug(`Worker ${workerName} sent HB`);
workers[workerName].lastHB = Date.now();
}
});

worker.on('exit', (code) => {
logger.warn(`Worker ${workerName} exited with code ${code}`);
if (code !== 0) {
logger.error(`Restarting worker ${workerName}...`);
spawnWorker(data);
}
});

workers[workerName] = { worker, data, lastHB: Date.now() };
logger.debug('Finished spawning worker:', workerName);
}

function monitorWorkers() {
setInterval(() => {
for (const [workerName, workerInfo] of Object.entries(workers)) {
logger.debug(
`Checking worker ${workerName} with lastHB of ${new Date(workerInfo.lastHB)}...`
);
if (Date.now() - workerInfo.lastHB > HB_INTERVAL) {
logger.error(`Worker ${workerName} missed HB, restarting...`);
workerInfo.worker.terminate();
spawnWorker(workerInfo.data);
}
}
}, HB_INTERVAL);
}

export function startSupervisor(supportedChains: ChainName[]) {
supportedChains.forEach((chain) => {
const workerData: WorkerData = { network, chain };
spawnWorker(workerData);
});

monitorWorkers();
}
10 changes: 10 additions & 0 deletions watcher/src/workers/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { initDb } from '../databases/utils';
import { makeFinalizedWatcher } from '../watchers/utils';
import { workerData } from 'worker_threads';

initDb(false);
const network = workerData.network;
const chain = workerData.chain;
console.log(`Making watcher for ${network} ${chain}...`);
makeFinalizedWatcher(network, chain).watch();
console.log(`Watcher for ${network} ${chain} started.`);

0 comments on commit d5c6c3d

Please sign in to comment.