Skip to content

Commit

Permalink
Add heartbeat log
Browse files Browse the repository at this point in the history
  • Loading branch information
Siegrift committed Oct 29, 2023
1 parent 525fabe commit 1e44788
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 8 deletions.
72 changes: 72 additions & 0 deletions packages/pusher/src/heartbeat/heartbeat.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import * as promiseUtilsModule from '@api3/promise-utils';

import { config, parseHeartbeatLog } from '../../test/fixtures';
import { logger } from '../logger';
import * as stateModule from '../state';
import { loadRawConfig } from '../validation/config';

import { initiateHeartbeat, logHeartbeat, createHash } from '.';

// eslint-disable-next-line jest/no-hooks
beforeEach(() => {
jest.useFakeTimers().setSystemTime(new Date('2023-01-20'));
});

afterEach(() => {
jest.useRealTimers();
});

describe(logHeartbeat.name, () => {
const expectedLogMessage = [
'0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
'test',
'0.1.0',
'1674172803',
'1674172800',
'0x6d4306f70c5fe9d8608b4e0c1d72e06a366e6f60b8461a6a9a0833a7401f5778',
'0x6e6379a42b89bdc78286efd6f6ad94142765930c4843784f49ba955ea95c1cb64c432b41aecc1b220dc90d616332bbf7bdc7242a16eb0e31306dac96a6d6ee821b',
].join(' - ');

it('sends the correct heartbeat log', async () => {
const state = stateModule.getInitialState(config);
jest.spyOn(stateModule, 'getState').mockReturnValue(state);
jest.spyOn(logger, 'info');
jest.advanceTimersByTime(1000 * 3); // Advance time by 3 seconds to ensure the timestamp of the log is different from deployment timestamp.

await logHeartbeat();

expect(logger.info).toHaveBeenCalledWith(expectedLogMessage);
});

it('the heartbeat log can be parsed', () => {
const rawConfig = loadRawConfig();
const expectedHeartbeatPayload = {
airnodeAddress: '0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
stage: 'test',
nodeVersion: '0.1.0',
heartbeatTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
configHash: '0x6d4306f70c5fe9d8608b4e0c1d72e06a366e6f60b8461a6a9a0833a7401f5778',
signature:
'0x6e6379a42b89bdc78286efd6f6ad94142765930c4843784f49ba955ea95c1cb64c432b41aecc1b220dc90d616332bbf7bdc7242a16eb0e31306dac96a6d6ee821b',
};

const heartbeatPayload = parseHeartbeatLog(expectedLogMessage);

expect(heartbeatPayload).toStrictEqual(expectedHeartbeatPayload);
expect(heartbeatPayload.configHash).toBe(createHash(JSON.stringify(rawConfig)));
});
});

test('sends heartbeat payload every minute', async () => {
// We would ideally want to assert that the logHeartbeat function is called, but spying on functions that are called
// from the same module is annoying. See: https://jestjs.io/docs/mock-functions#mocking-partials.
//
// Instead we spyOn the "go" which is a third party module that wraps the logHeartbeat call.
jest.spyOn(promiseUtilsModule, 'go');

initiateHeartbeat();

await jest.advanceTimersByTimeAsync(1000 * 60 * 8);
expect(promiseUtilsModule.go).toHaveBeenCalledTimes(8);
});
54 changes: 54 additions & 0 deletions packages/pusher/src/heartbeat/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { go } from '@api3/promise-utils';
import { ethers } from 'ethers';

import { logger } from '../logger';
import { getState } from '../state';
import { loadRawConfig } from '../validation/config';

export const initiateHeartbeat = () => {
logger.debug('Initiating heartbeat loop');
setInterval(async () => {
const goLogHeartbeat = await go(logHeartbeat);
if (!goLogHeartbeat.success) logger.error('Failed to log heartbeat', goLogHeartbeat.error);
}, 1000 * 60); // Frequency is hardcoded to 1 minute.
};

export const signHeartbeat = async (airnodeWallet: ethers.Wallet, heartbeatPayload: unknown[]) => {
logger.debug('Signing heartbeat payload');
const signaturePayload = ethers.utils.arrayify(createHash(JSON.stringify(heartbeatPayload)));
return airnodeWallet.signMessage(signaturePayload);
};

export const createHash = (value: string) => ethers.utils.keccak256(ethers.utils.toUtf8Bytes(value));

export const logHeartbeat = async () => {
logger.debug('Creating heartbeat log');

const rawConfig = loadRawConfig(); // We want to log the raw config, not the one with interpolated secrets.
const rawConfigHash = createHash(JSON.stringify(rawConfig));
const {
airnodeWallet,
deploymentTimestamp,
config: {
nodeSettings: { stage, nodeVersion },
},
} = getState();

logger.debug('Creating heartbeat payload');
const currentTimestamp = Math.floor(Date.now() / 1000);
const heartbeatPayload = [
airnodeWallet.address,
stage,
nodeVersion,
currentTimestamp.toString(),
deploymentTimestamp.toString(),
rawConfigHash,
];
const heartbeatSignature = await signHeartbeat(airnodeWallet, heartbeatPayload);
const heartbeatLog = [...heartbeatPayload, heartbeatSignature].join(' - ');

// We ensure in config validation that INFO level logs are not disabled. The logs are sent to API3 for validation
// (that the data provider deployed deployed the correct configuration) and monitoring purposes (whether the instance
// is running).
logger.info(heartbeatLog);
};
1 change: 1 addition & 0 deletions packages/pusher/src/heartbeat/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './heartbeat';
2 changes: 2 additions & 0 deletions packages/pusher/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { initiateFetchingBeaconData } from './fetch-beacon-data';
import { initiateHeartbeat } from './heartbeat';
import { initializeState } from './state';
import { initiateUpdatingSignedApi } from './update-signed-api';
import { loadConfig } from './validation/config';
Expand All @@ -9,6 +10,7 @@ const main = async () => {

initiateFetchingBeaconData();
initiateUpdatingSignedApi();
initiateHeartbeat();
};

void main();
3 changes: 3 additions & 0 deletions packages/pusher/src/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface State {
apiLimiters: Record<string, Bottleneck | undefined>;
// We persist the derived Airnode wallet in memory as a performance optimization.
airnodeWallet: ethers.Wallet;
// The timestamp of when the service was initialized. This can be treated as a "deployment" timestamp.
deploymentTimestamp: number;
}

let state: State;
Expand Down Expand Up @@ -86,6 +88,7 @@ export const getInitialState = (config: Config): State => {
templateValues: buildTemplateStorages(config),
apiLimiters: buildApiLimiters(config),
airnodeWallet: ethers.Wallet.fromMnemonic(config.nodeSettings.airnodeWalletMnemonic),
deploymentTimestamp: Math.floor(Date.now() / 1000),
};
};

Expand Down
18 changes: 10 additions & 8 deletions packages/pusher/src/validation/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import dotenv from 'dotenv';
import { configSchema } from './schema';
import { interpolateSecrets, parseSecrets } from './utils';

export const loadConfig = async () => {
// When pusher is built the "/dist" file contains "src" folder and "package.json" and the config is expected to be
// located next to the "/dist" folder. When run in development, the config is expected to be located next to the "src"
// folder (one less import level). We resolve the config by CWD as a workaround. Since the pusher is dockerized, this
// is hidden from the user.
const configPath = join(cwd(), './config');
const rawSecrets = dotenv.parse(readFileSync(join(configPath, 'secrets.env'), 'utf8'));
// When pusher is built the "/dist" file contains "src" folder and "package.json" and the config is expected to be
// located next to the "/dist" folder. When run in development, the config is expected to be located next to the "src"
// folder (one less import level). We resolve the config by CWD as a workaround. Since the pusher is dockerized, this
// is hidden from the user.
const getConfigPath = () => join(cwd(), './config');

export const loadRawConfig = () => JSON.parse(fs.readFileSync(join(getConfigPath(), 'pusher.json'), 'utf8'));

export const loadConfig = async () => {
const goLoadConfig = await go(async () => {
const rawConfig = JSON.parse(fs.readFileSync(join(configPath, 'pusher.json'), 'utf8'));
const rawSecrets = dotenv.parse(readFileSync(join(getConfigPath(), 'secrets.env'), 'utf8'));
const rawConfig = loadRawConfig();
const secrets = parseSecrets(rawSecrets);
return configSchema.parseAsync(interpolateSecrets(rawConfig, secrets));
});
Expand Down
25 changes: 25 additions & 0 deletions packages/pusher/test/fixtures.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AxiosResponse } from 'axios';
import { ethers } from 'ethers';

import packageJson from '../package.json';
import type { SignedResponse, TemplateResponse } from '../src/sign-template-data';
Expand Down Expand Up @@ -102,6 +103,7 @@ export const config: Config = {
nodeVersion: packageJson.version,
airnodeWalletMnemonic: 'diamond result history offer forest diagram crop armed stumble orchard stage glance',
rateLimiting: { Nodary: { maxConcurrency: 25, minTime: 10 } },
stage: 'test',
},
};

Expand Down Expand Up @@ -183,3 +185,26 @@ export const signedApiResponse: Partial<AxiosResponse> = {
},
data: { count: 3 },
};

export const parseHeartbeatLog = (logMessage: string) => {
const [airnodeAddress, stage, nodeVersion, heartbeatTimestamp, deploymentTimestamp, configHash, signature] =
logMessage.split(' - ');

// Verify that the signature is valid.
const heartbeatPayload = [airnodeAddress, stage, nodeVersion, heartbeatTimestamp, deploymentTimestamp, configHash];
const signaturePayload = ethers.utils.arrayify(
ethers.utils.keccak256(ethers.utils.toUtf8Bytes(JSON.stringify(heartbeatPayload)))
);
const recoveredAddress = ethers.utils.verifyMessage(signaturePayload, signature!);
if (recoveredAddress !== airnodeAddress) throw new Error('Invalid signature');

return {
airnodeAddress,
stage,
nodeVersion,
deploymentTimestamp,
heartbeatTimestamp,
configHash,
signature,
};
};

0 comments on commit 1e44788

Please sign in to comment.