Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune data #36

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ The API provides the following endpoints:

- `POST /`: Insert a batch of signed data.
- The batch is validated for consistency and data integrity errors. If there is any issue during this step, the whole
batch is rejected. Otherwise the batch is accepted.
batch is rejected. Otherwise the batch is accepted. Also, all data that is no longer needed is removed during this
step.
- `GET /{endpoint-name}/{airnode}`: Retrieve signed data for the Airnode respecting the endpoint configuration.
- Only returns the freshest signed data available for the given Airnode, respecting the configured endpoint delay.
- `GET /`: Retrieve list of all available Airnode address.
Expand Down
11 changes: 9 additions & 2 deletions packages/api/src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { go, goSync } from '@api3/promise-utils';
import { isEmpty, isNil, omit, size } from 'lodash';
import { CACHE_HEADERS, COMMON_HEADERS } from './constants';
import { deriveBeaconId, recoverSignerAddress } from './evm';
import { getAll, getAllAirnodeAddresses, putAll } from './in-memory-cache';
import { getAll, getAllAirnodeAddresses, prune, putAll } from './in-memory-cache';
import { ApiResponse } from './types';
import { generateErrorResponse, getConfig, isBatchUnique } from './utils';
import { batchSignedDataSchema, evmAddressSchema } from './schema';
Expand All @@ -26,7 +26,7 @@ export const batchInsertData = async (requestBody: unknown): Promise<ApiResponse
if (isEmpty(batchSignedData)) return generateErrorResponse(400, 'No signed data to push');

// Check whether the size of batch exceeds a maximum batch size
const { maxBatchSize } = getConfig();
const { maxBatchSize, endpoints } = getConfig();
if (size(batchSignedData) > maxBatchSize)
return generateErrorResponse(400, `Maximum batch size (${maxBatchSize}) exceeded`);

Expand Down Expand Up @@ -64,6 +64,13 @@ export const batchInsertData = async (requestBody: unknown): Promise<ApiResponse
if (!goBatchWriteDb.success)
return generateErrorResponse(500, 'Unable to send batch of signed data to database', goBatchWriteDb.error.message);

// Prune the cache with the data that is too old (no endpoint will ever return it)
const maxDelay = endpoints.reduce((acc, endpoint) => Math.max(acc, endpoint.delaySeconds), 0);
const maxIgnoreAfterTimestamp = Math.floor(Date.now() / 1000 - maxDelay);
const goPruneCache = await go(() => prune(batchSignedData, maxIgnoreAfterTimestamp));
if (!goPruneCache.success)
return generateErrorResponse(500, 'Unable to remove outdated cache data', goPruneCache.error.message);

return { statusCode: 201, headers: COMMON_HEADERS, body: JSON.stringify({ count: batchSignedData.length }) };
};

Expand Down
44 changes: 39 additions & 5 deletions packages/api/src/in-memory-cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { groupBy } from 'lodash';
import { get, getAll, getAllAirnodeAddresses, ignoreTooFreshData, put, putAll } from './in-memory-cache';
import { get, getAll, getAllAirnodeAddresses, ignoreTooFreshData, prune, put, putAll } from './in-memory-cache';
import * as cacheModule from './cache';
import { createSignedData, generateRandomWallet } from '../test/utils';

afterEach(() => {
cacheModule.setCache({});
});

describe(ignoreTooFreshData.name, () => {
const createData = async () => [
await createSignedData({ timestamp: '100' }),
Expand Down Expand Up @@ -151,10 +155,6 @@ describe(getAllAirnodeAddresses.name, () => {
});

describe(put.name, () => {
afterEach(() => {
cacheModule.setCache({});
});

it('inserts the data in the correct position', async () => {
const airnodeWallet = generateRandomWallet();
const data = await createSignedData({ airnodeWallet: airnodeWallet, timestamp: '100' });
Expand Down Expand Up @@ -209,3 +209,37 @@ describe(putAll.name, () => {
expect(cache[newDataBatch[1]!.airnode]![newDataBatch[1]!.templateId]).toEqual([newDataBatch[1]]);
});
});

describe(prune.name, () => {
Siegrift marked this conversation as resolved.
Show resolved Hide resolved
it('removes all data that is too old', async () => {
const airnodeWallet = generateRandomWallet();
const data = await createSignedData({ airnodeWallet: airnodeWallet, timestamp: '100' });
const insertData = [
data,
await createSignedData({ airnodeWallet: airnodeWallet, templateId: data.templateId, timestamp: '105' }),
await createSignedData({ airnodeWallet: airnodeWallet, templateId: data.templateId, timestamp: '110' }),
];
const otherAirnodeWallet = generateRandomWallet();
const otherAirnodeData = await createSignedData({ airnodeWallet: otherAirnodeWallet, timestamp: '80' });
const otherAirnodeInsertData = [
otherAirnodeData,
await createSignedData({
airnodeWallet: otherAirnodeWallet,
templateId: otherAirnodeData.templateId,
timestamp: '90',
}),
];
const batchInsertData = [...insertData, ...otherAirnodeInsertData];
// We can't mock because the implementation mutates the cache value directly.
cacheModule.setCache({
[data.airnode]: groupBy(insertData, 'templateId'),
[otherAirnodeData.airnode]: groupBy(otherAirnodeInsertData, 'templateId'),
});

await prune(batchInsertData, 105);

const cache = cacheModule.getCache();
expect(cache[data.airnode]![data.templateId]).toEqual([insertData[1], insertData[2]]);
expect(cache[otherAirnodeData.airnode]![otherAirnodeData.templateId]).toEqual([otherAirnodeInsertData[1]]);
});
});
22 changes: 21 additions & 1 deletion packages/api/src/in-memory-cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { last } from 'lodash';
import { last, uniqBy } from 'lodash';
import { isIgnored } from './utils';
import { SignedData } from './schema';
import { getCache } from './cache';
Expand Down Expand Up @@ -54,3 +54,23 @@ export const put = async (signedData: SignedData) => {
export const putAll = async (signedDataArray: SignedData[]) => {
for (const signedData of signedDataArray) await put(signedData);
};

// The API is deliberately asynchronous to mimic a database call.
//
// Removes all signed data that is no longer needed to be kept in memory (because it is too old and there exist a newer
// signed data for each endpoint). The function is intended to be called after each insertion of new signed data for
// performance reasons, because it only looks to prune the data that for beacons that have been just inserted.
export const prune = async (signedDataArray: SignedData[], maxIgnoreAfterTimestamp: number) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the timestamp include the unit in the name? Or should it rather be a Date and the function handles converting to a Unix timestamp internally?

It's not obvious to me if "maxIgnoreAfterTimestamp" is in seconds or milliseconds and requires me to dig through several other functions to figure this out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp should imo always refer to unix timestamp and thus be in seconds. We also use this implicitly for signed data currently (e.g. the input for the signed API expects the timestamp

timestamp: z.string(),
and assumes it's in seconds).

Also, my intention is for the cache to work with timestamps and not delays. That's why I made API handlers convert the required delay to timestamp before calling the cache functions.

const beaconsToPrune = uniqBy(signedDataArray, 'beaconId');
const signedDataCache = getCache();

for (const beacon of beaconsToPrune) {
const { airnode, templateId } = beacon;
const allSignedDatas = signedDataCache[airnode]![templateId]!; // Assume the data is inserted the cache.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"data" is already the plural form 😛 "datum" is the singular

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but we always use data in singular and then there is the dillemma how to call a "group" of data. After a bit of internal pain, the "datas" was the best I could come up with. Calling it allSignedDataArray was second on my internal list afair. Do you like more?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's fine as is. Not sure how I feel about the alternative

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also find datas useful (specifically calldatas in a multicall situation) but then https://github.com/api3dao/manager-multisig/pull/213#discussion_r1284344567

const signedDatas = ignoreTooFreshData(allSignedDatas, maxIgnoreAfterTimestamp);

// It is enough to keep only the freshest signed data for each beacon.
const removeCount = Math.max(0, signedDatas.length - 1);
signedDataCache[airnode]![templateId] = allSignedDatas.slice(removeCount);
}
};