Skip to content

Commit

Permalink
Skip persisting signed data for data feed with existing timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Siegrift committed Oct 19, 2023
1 parent 534639f commit a70aaef
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 15 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { SignedData } from './schema';

type SignedDataCache = Record<
string, // Airnode ID.
string, // Airnode address.
Record<
string, // Template ID.
SignedData[] // Signed data is ordered by timestamp (oldest first).
Expand Down
39 changes: 38 additions & 1 deletion packages/api/src/handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createSignedData, generateRandomWallet } from '../test/utils';
import * as cacheModule from './cache';
import * as configModule from './config';
import { batchInsertData, getData, listAirnodeAddresses } from './handlers';
import { logger } from './logger';

// eslint-disable-next-line jest/no-hooks
beforeEach(() => {
Expand Down Expand Up @@ -44,13 +45,49 @@ describe(batchInsertData.name, () => {
expect(cacheModule.getCache()).toStrictEqual({});
});

it('skips signed data if there exists one with the same timestamp', async () => {
const airnodeWallet = generateRandomWallet();
const storedSignedData = await createSignedData({ airnodeWallet });
cacheModule.setCache({
[storedSignedData.airnode]: {
[storedSignedData.templateId]: [storedSignedData],
},
});
const batchData = [
await createSignedData({
airnodeWallet,
templateId: storedSignedData.templateId,
timestamp: storedSignedData.timestamp,
}),
await createSignedData(),
];
jest.spyOn(logger, 'debug');

const result = await batchInsertData(batchData);

expect(result).toStrictEqual({
body: JSON.stringify({ count: 1, skipped: 1 }),
headers: {
'access-control-allow-methods': '*',
'access-control-allow-origin': '*',
'content-type': 'application/json',
},
statusCode: 201,
});
expect(logger.debug).toHaveBeenCalledWith(
'Skipping signed data because signed data with the same timestamp already exists',
expect.any(Object)
);
expect(cacheModule.getCache()[storedSignedData.airnode]![storedSignedData.templateId]!).toHaveLength(1);
});

it('inserts the batch if data is valid', async () => {
const batchData = [await createSignedData(), await createSignedData()];

const result = await batchInsertData(batchData);

expect(result).toStrictEqual({
body: JSON.stringify({ count: 2 }),
body: JSON.stringify({ count: 2, skipped: 0 }),
headers: {
'access-control-allow-methods': '*',
'access-control-allow-origin': '*',
Expand Down
29 changes: 24 additions & 5 deletions packages/api/src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { isEmpty, isNil, omit, size } from 'lodash';
import { getConfig } from './config';
import { CACHE_HEADERS, COMMON_HEADERS } from './constants';
import { deriveBeaconId, recoverSignerAddress } from './evm';
import { getAll, getAllAirnodeAddresses, prune, putAll } from './in-memory-cache';
import { batchSignedDataSchema, evmAddressSchema } from './schema';
import { get, getAll, getAllAirnodeAddresses, prune, putAll } from './in-memory-cache';
import { logger } from './logger';
import { type SignedData, batchSignedDataSchema, evmAddressSchema } from './schema';
import type { ApiResponse } from './types';
import { generateErrorResponse, isBatchUnique } from './utils';

Expand Down Expand Up @@ -67,21 +68,39 @@ export const batchInsertData = async (requestBody: unknown): Promise<ApiResponse
const firstError = signedDataValidationResults.find(Boolean);
if (firstError) return firstError;

const newSignedData: SignedData[] = [];
// Because pushers do not keep track of the last timestamp they pushed, they may push the same data twice, which
// is acceptable, but we only want to store one data for each timestamp.
for (const signedData of batchSignedData) {
const requestTimestamp = Number.parseInt(signedData.timestamp, 10);
const goReadDb = await go(async () => get(signedData.airnode, signedData.templateId, requestTimestamp));
if (goReadDb.data && requestTimestamp === Number.parseInt(goReadDb.data.timestamp, 10)) {
logger.debug('Skipping signed data because signed data with the same timestamp already exists', { signedData });
continue;
}

newSignedData.push(signedData);
}

// Write batch of validated data to the database
const goBatchWriteDb = await go(async () => putAll(batchSignedData));
const goBatchWriteDb = await go(async () => putAll(newSignedData));
if (!goBatchWriteDb.success) {
return generateErrorResponse(500, 'Unable to send batch of signed data to database', goBatchWriteDb.error.message);
}

// Prune the cache with the data that is too old (no endpoint will ever return it)
const maxDelay = endpoints.reduce((acc, endpoint) => Math.max(acc, endpoint.delaySeconds), 0);
const maxIgnoreAfterTimestamp = Math.floor(Date.now() / 1000 - maxDelay);
const goPruneCache = await go(async () => prune(batchSignedData, maxIgnoreAfterTimestamp));
const goPruneCache = await go(async () => prune(newSignedData, maxIgnoreAfterTimestamp));
if (!goPruneCache.success) {
return generateErrorResponse(500, 'Unable to remove outdated cache data', goPruneCache.error.message);
}

return { statusCode: 201, headers: COMMON_HEADERS, body: JSON.stringify({ count: batchSignedData.length }) };
return {
statusCode: 201,
headers: COMMON_HEADERS,
body: JSON.stringify({ count: newSignedData.length, skipped: batchSignedData.length - newSignedData.length }),
};
};

// Returns the most fresh signed data for each templateId for the given airnode address. The API can be delayed, which
Expand Down
16 changes: 8 additions & 8 deletions packages/api/src/in-memory-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ export const ignoreTooFreshData = (signedDatas: SignedData[], ignoreAfterTimesta

// The API is deliberately asynchronous to mimic a database call.
// eslint-disable-next-line @typescript-eslint/require-await
export const get = async (airnodeId: string, templateId: string, ignoreAfterTimestamp: number) => {
logger.debug('Getting signed data', { airnodeId, templateId, ignoreAfterTimestamp });
export const get = async (airnodeAddress: string, templateId: string, ignoreAfterTimestamp: number) => {
logger.debug('Getting signed data', { airnodeAddress, templateId, ignoreAfterTimestamp });

const signedDataCache = getCache();
if (!signedDataCache[airnodeId]) return null;
const signedDatas = signedDataCache[airnodeId]![templateId];
if (!signedDataCache[airnodeAddress]) return null;
const signedDatas = signedDataCache[airnodeAddress]![templateId];
if (!signedDatas) return null;

return last(ignoreTooFreshData(signedDatas, ignoreAfterTimestamp)) ?? null;
};

// The API is deliberately asynchronous to mimic a database call.
export const getAll = async (airnodeId: string, ignoreAfterTimestamp: number) => {
logger.debug('Getting all signed data', { airnodeId, ignoreAfterTimestamp });
export const getAll = async (airnodeAddress: string, ignoreAfterTimestamp: number) => {
logger.debug('Getting all signed data', { airnodeAddress, ignoreAfterTimestamp });

const signedDataCache = getCache();
const signedDataByTemplateId = signedDataCache[airnodeId] ?? {};
const signedDataByTemplateId = signedDataCache[airnodeAddress] ?? {};
const freshestSignedData: SignedData[] = [];
for (const templateId of Object.keys(signedDataByTemplateId)) {
const freshest = await get(airnodeId, templateId, ignoreAfterTimestamp);
const freshest = await get(airnodeAddress, templateId, ignoreAfterTimestamp);
if (freshest) freshestSignedData.push(freshest);
}

Expand Down

0 comments on commit a70aaef

Please sign in to comment.