From 0ada41d0808ef5b8b7332adb2c7c75b9430162de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emanuel=20Tesa=C5=99?= Date: Mon, 25 Sep 2023 07:37:43 +0200 Subject: [PATCH] Prune data --- packages/api/README.md | 3 +- packages/api/src/handlers.ts | 11 ++++-- packages/api/src/in-memory-cache.test.ts | 44 +++++++++++++++++++++--- packages/api/src/in-memory-cache.ts | 22 +++++++++++- 4 files changed, 71 insertions(+), 9 deletions(-) diff --git a/packages/api/README.md b/packages/api/README.md index c9fe5d9d..33a5dc55 100644 --- a/packages/api/README.md +++ b/packages/api/README.md @@ -44,7 +44,8 @@ The API provides the following endpoints: - `POST /`: Insert a batch of signed data. - The batch is validated for consistency and data integrity errors. If there is any issue during this step, the whole - batch is rejected. Otherwise the batch is accepted. + batch is rejected. Otherwise the batch is accepted. Also, all data that is no longer needed is removed during this + step. - `GET /{endpoint-name}/{airnode}`: Retrieve signed data for the Airnode respecting the endpoint configuration. - Only returns the freshest signed data available for the given Airnode, respecting the configured endpoint delay. - `GET /`: Retrieve list of all available Airnode address. diff --git a/packages/api/src/handlers.ts b/packages/api/src/handlers.ts index f3bf662f..9746dec1 100644 --- a/packages/api/src/handlers.ts +++ b/packages/api/src/handlers.ts @@ -2,7 +2,7 @@ import { go, goSync } from '@api3/promise-utils'; import { isEmpty, isNil, omit, size } from 'lodash'; import { CACHE_HEADERS, COMMON_HEADERS } from './constants'; import { deriveBeaconId, recoverSignerAddress } from './evm'; -import { getAll, getAllAirnodeAddresses, putAll } from './in-memory-cache'; +import { getAll, getAllAirnodeAddresses, prune, putAll } from './in-memory-cache'; import { ApiResponse } from './types'; import { generateErrorResponse, getConfig, isBatchUnique } from './utils'; import { batchSignedDataSchema, evmAddressSchema } from './schema'; @@ -26,7 +26,7 @@ export const batchInsertData = async (requestBody: unknown): Promise maxBatchSize) return generateErrorResponse(400, `Maximum batch size (${maxBatchSize}) exceeded`); @@ -64,6 +64,13 @@ export const batchInsertData = async (requestBody: unknown): Promise Math.max(acc, endpoint.delaySeconds), 0); + const maxIgnoreAfterTimestamp = Math.floor(Date.now() / 1000 - maxDelay); + const goPruneCache = await go(() => prune(batchSignedData, maxIgnoreAfterTimestamp)); + if (!goPruneCache.success) + return generateErrorResponse(500, 'Unable to remove outdated cache data', goPruneCache.error.message); + return { statusCode: 201, headers: COMMON_HEADERS, body: JSON.stringify({ count: batchSignedData.length }) }; }; diff --git a/packages/api/src/in-memory-cache.test.ts b/packages/api/src/in-memory-cache.test.ts index b08a6ad0..4cf7171f 100644 --- a/packages/api/src/in-memory-cache.test.ts +++ b/packages/api/src/in-memory-cache.test.ts @@ -1,8 +1,12 @@ import { groupBy } from 'lodash'; -import { get, getAll, getAllAirnodeAddresses, ignoreTooFreshData, put, putAll } from './in-memory-cache'; +import { get, getAll, getAllAirnodeAddresses, ignoreTooFreshData, prune, put, putAll } from './in-memory-cache'; import * as cacheModule from './cache'; import { createSignedData, generateRandomWallet } from '../test/utils'; +afterEach(() => { + cacheModule.setCache({}); +}); + describe(ignoreTooFreshData.name, () => { const createData = async () => [ await createSignedData({ timestamp: '100' }), @@ -151,10 +155,6 @@ describe(getAllAirnodeAddresses.name, () => { }); describe(put.name, () => { - afterEach(() => { - cacheModule.setCache({}); - }); - it('inserts the data in the correct position', async () => { const airnodeWallet = generateRandomWallet(); const data = await createSignedData({ airnodeWallet: airnodeWallet, timestamp: '100' }); @@ -209,3 +209,37 @@ describe(putAll.name, () => { expect(cache[newDataBatch[1]!.airnode]![newDataBatch[1]!.templateId]).toEqual([newDataBatch[1]]); }); }); + +describe(prune.name, () => { + it('removes all data that is too old', async () => { + const airnodeWallet = generateRandomWallet(); + const data = await createSignedData({ airnodeWallet: airnodeWallet, timestamp: '100' }); + const insertData = [ + data, + await createSignedData({ airnodeWallet: airnodeWallet, templateId: data.templateId, timestamp: '105' }), + await createSignedData({ airnodeWallet: airnodeWallet, templateId: data.templateId, timestamp: '110' }), + ]; + const otherAirnodeWallet = generateRandomWallet(); + const otherAirnodeData = await createSignedData({ airnodeWallet: otherAirnodeWallet, timestamp: '80' }); + const otherAirnodeInsertData = [ + otherAirnodeData, + await createSignedData({ + airnodeWallet: otherAirnodeWallet, + templateId: otherAirnodeData.templateId, + timestamp: '90', + }), + ]; + const batchInsertData = [...insertData, ...otherAirnodeInsertData]; + // We can't mock because the implementation mutates the cache value directly. + cacheModule.setCache({ + [data.airnode]: groupBy(insertData, 'templateId'), + [otherAirnodeData.airnode]: groupBy(otherAirnodeInsertData, 'templateId'), + }); + + await prune(batchInsertData, 105); + + const cache = cacheModule.getCache(); + expect(cache[data.airnode]![data.templateId]).toEqual([insertData[1], insertData[2]]); + expect(cache[otherAirnodeData.airnode]![otherAirnodeData.templateId]).toEqual([otherAirnodeInsertData[1]]); + }); +}); diff --git a/packages/api/src/in-memory-cache.ts b/packages/api/src/in-memory-cache.ts index 086e5741..5a6a6337 100644 --- a/packages/api/src/in-memory-cache.ts +++ b/packages/api/src/in-memory-cache.ts @@ -1,4 +1,4 @@ -import { last } from 'lodash'; +import { last, uniqBy } from 'lodash'; import { isIgnored } from './utils'; import { SignedData } from './schema'; import { getCache } from './cache'; @@ -54,3 +54,23 @@ export const put = async (signedData: SignedData) => { export const putAll = async (signedDataArray: SignedData[]) => { for (const signedData of signedDataArray) await put(signedData); }; + +// The API is deliberately asynchronous to mimic a database call. +// +// Removes all signed data that is no longer needed to be kept in memory (because it is too old and there exist a newer +// signed data for each endpoint). The function is intended to be called after each insertion of new signed data for +// performance reasons, because it only looks to prune the data that for beacons that have been just inserted. +export const prune = async (signedDataArray: SignedData[], maxIgnoreAfterTimestamp: number) => { + const beaconsToPrune = uniqBy(signedDataArray, 'beaconId'); + const signedDataCache = getCache(); + + for (const beacon of beaconsToPrune) { + const { airnode, templateId } = beacon; + const allSignedDatas = signedDataCache[airnode]![templateId]!; // Assume the data is inserted the cache. + const signedDatas = ignoreTooFreshData(allSignedDatas, maxIgnoreAfterTimestamp); + + // It is enough to keep only the freshest signed data for each beacon. + const removeCount = Math.max(0, signedDatas.length - 1); + signedDataCache[airnode]![templateId] = allSignedDatas.slice(removeCount); + } +};