From c1036d6971c3c280bf9516cc074642f16c6a7e36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emanuel=20Tesa=C5=99?= Date: Fri, 29 Sep 2023 14:14:22 +0200 Subject: [PATCH] Fix delayed queue premature data prune, add tests --- .../src/api-requests/signed-api.test.ts | 6 +- .../src/api-requests/signed-api.ts | 5 +- packages/data-pusher/src/state.test.ts | 63 +++++++++++++++++ packages/data-pusher/src/state.ts | 70 ++++++++++++++----- 4 files changed, 122 insertions(+), 22 deletions(-) create mode 100644 packages/data-pusher/src/state.test.ts diff --git a/packages/data-pusher/src/api-requests/signed-api.test.ts b/packages/data-pusher/src/api-requests/signed-api.test.ts index 0d2ce268..daed6bd3 100644 --- a/packages/data-pusher/src/api-requests/signed-api.test.ts +++ b/packages/data-pusher/src/api-requests/signed-api.test.ts @@ -11,7 +11,7 @@ describe(postSignedApiData.name, () => { // Assumes the template responses are for unique template IDs (which is true in the test fixtures). state.templateValues = Object.fromEntries( nodarySignedTemplateResponses.map(([templateId, signedData]) => { - const dataQueue = new stateModule.DelayedSignedDataQueue(); + const dataQueue = new stateModule.DelayedSignedDataQueue(0); dataQueue.put(signedData); return [templateId, dataQueue]; }) @@ -29,7 +29,7 @@ describe(postSignedApiData.name, () => { // Assumes the template responses are for unique template IDs (which is true in the test fixtures). state.templateValues = Object.fromEntries( nodarySignedTemplateResponses.map(([templateId, signedData]) => { - const dataQueue = new stateModule.DelayedSignedDataQueue(); + const dataQueue = new stateModule.DelayedSignedDataQueue(0); dataQueue.put(signedData); return [templateId, dataQueue]; }) @@ -61,7 +61,7 @@ describe(postSignedApiData.name, () => { // Assumes the template responses are for unique template IDs (which is true in the test fixtures). state.templateValues = Object.fromEntries( nodarySignedTemplateResponses.map(([templateId, signedData]) => { - const dataQueue = new stateModule.DelayedSignedDataQueue(); + const dataQueue = new stateModule.DelayedSignedDataQueue(0); dataQueue.put(signedData); return [templateId, dataQueue]; }) diff --git a/packages/data-pusher/src/api-requests/signed-api.ts b/packages/data-pusher/src/api-requests/signed-api.ts index c910926f..cbd03521 100644 --- a/packages/data-pusher/src/api-requests/signed-api.ts +++ b/packages/data-pusher/src/api-requests/signed-api.ts @@ -21,7 +21,10 @@ export const postSignedApiData = async (group: SignedApiNameUpdateDelayGroup) => const airnode = ethers.Wallet.fromMnemonic(airnodeWalletMnemonic).address; const batchPayloadOrNull = templateIds.map((templateId): SignedApiPayload | null => { - const delayedSignedData = templateValues[templateId]!.get(updateDelay); + // Calculate the reference timestamp based on the current time and update delay. + const referenceTimestamp = Date.now() / 1000 - updateDelay; + const delayedSignedData = templateValues[templateId]!.get(referenceTimestamp); + templateValues[templateId]!.prune(); if (isNil(delayedSignedData)) return null; return { airnode, templateId, beaconId: deriveBeaconId(airnode, templateId), ...delayedSignedData }; diff --git a/packages/data-pusher/src/state.test.ts b/packages/data-pusher/src/state.test.ts new file mode 100644 index 00000000..b513a320 --- /dev/null +++ b/packages/data-pusher/src/state.test.ts @@ -0,0 +1,63 @@ +import { DelayedSignedDataQueue } from './state'; +import { nodarySignedTemplateResponses } from '../test/fixtures'; + +describe(DelayedSignedDataQueue.name, () => { + afterEach(() => { + jest.useRealTimers(); + }); + + it('can put signed data', () => { + const queue = new DelayedSignedDataQueue(30); + const data = nodarySignedTemplateResponses[0]![1]; + + queue.put(data); + + expect(queue.getAll()).toEqual([data]); + }); + + it('can get signed data with delay', () => { + const queue = new DelayedSignedDataQueue(30); + const data3 = nodarySignedTemplateResponses[0]![1]; + const timestamp = parseInt(data3.timestamp); + const data2 = { ...data3, timestamp: (timestamp - 10).toString() }; + const data1 = { ...data3, timestamp: (timestamp - 20).toString() }; + queue.put(data1); + queue.put(data2); + queue.put(data3); + + expect(queue.get(timestamp + 1)).toEqual(data3); + expect(queue.get(timestamp)).toEqual(data2); + expect(queue.get(timestamp - 5)).toEqual(data2); + expect(queue.get(timestamp - 15)).toEqual(data1); + expect(queue.get(timestamp - 30)).toEqual(undefined); + }); + + it('ensures that data is inserted by increasing timestamp', () => { + const queue = new DelayedSignedDataQueue(30); + const data3 = nodarySignedTemplateResponses[0]![1]; + const timestamp = parseInt(data3.timestamp); + const data2 = { ...data3, timestamp: (timestamp - 10).toString() }; + const data1 = { ...data3, timestamp: (timestamp - 20).toString() }; + queue.put(data3); + + expect(() => queue.put(data1)).toThrow('The signed data is too old'); + expect(() => queue.put(data2)).toThrow('The signed data is too old'); + }); + + it('can prune unused data', () => { + jest.useFakeTimers().setSystemTime(new Date('2023-01-20')); + + const queue = new DelayedSignedDataQueue(30); + const data3 = nodarySignedTemplateResponses[0]![1]; + const timestamp = parseInt(data3.timestamp); + const data2 = { ...data3, timestamp: (timestamp - 40).toString() }; + const data1 = { ...data3, timestamp: (timestamp - 50).toString() }; + queue.put(data1); + queue.put(data2); + queue.put(data3); + + queue.prune(); + + expect(queue.getAll()).toEqual([data2, data3]); + }); +}); diff --git a/packages/data-pusher/src/state.ts b/packages/data-pusher/src/state.ts index 7b3863eb..b71877d1 100644 --- a/packages/data-pusher/src/state.ts +++ b/packages/data-pusher/src/state.ts @@ -1,4 +1,5 @@ import Bottleneck from 'bottleneck'; +import { last } from 'lodash'; import { Config, SignedData, TemplateId } from './validation/schema'; import { OIS_MAX_CONCURRENCY_DEFAULT, OIS_MIN_TIME_DEFAULT_MS } from './constants'; import { deriveEndpointId, getRandomId } from './utils'; @@ -65,8 +66,14 @@ export const buildApiLimiters = (config: Config) => { return apiLimiters; }; -export const buildTemplateStorages = (config: Config) => - Object.fromEntries(Object.keys(config.templates).map((templateId) => [templateId, new DelayedSignedDataQueue()])); +export const buildTemplateStorages = (config: Config) => { + return Object.fromEntries( + Object.keys(config.templates).map((templateId) => { + const maxUpdateDelayTime = Math.max(...Object.values(config.triggers.signedApiUpdates).map((u) => u.updateDelay)); + return [templateId, new DelayedSignedDataQueue(maxUpdateDelayTime)]; + }) + ); +}; export const getInitialState = (config: Config): State => { return { @@ -89,33 +96,60 @@ export const getState = () => { */ export class DelayedSignedDataQueue { private storage: SignedData[] = []; + private maxUpdateDelay: number; + + /** + * Creates the delayed signed data queue with the maximum update delay time. If there exists some signed data satisfying + * this delay, all other signed data with smaller timestamps are removed. + * @param maxUpdateDelay - The maximum update delay time in seconds. + */ + constructor(maxUpdateDelay: number) { + this.maxUpdateDelay = maxUpdateDelay; + } + /** + * Checks if a signed data entry is delayed enough. This means that the timestamp of the entry must be smaller than + * the reference timestamp. + */ + private isDelayedEnough(data: SignedData, referenceTimestamp: number) { + return parseInt(data.timestamp) < referenceTimestamp; + } + + /** + * Adds a signed data entry to the queue. Assumes the data is not older than other entries in the queue. + * @param data - The signed data entry to add. + */ public put(data: SignedData): void { + // Make sure the data is not older than other entries in the queue. + if (this.storage.length && parseInt(last(this.storage)!.timestamp) > parseInt(data.timestamp)) { + throw new Error('The signed data is too old'); + } this.storage.push(data); } /** * Retrieves the newest signed data entry from the queue that is delayed by a specified time. - * @param updateDelay - The maximum delay (in seconds) allowed for retrieved data. + * @param referenceTimestamp - The reference timestamp in seconds. Signed data with newer or equal timestamp is + * ignored during this call. * @returns The delayed signed data entry, or undefined if none is found. */ - public get(updateDelay: number): SignedData | undefined { - // Calculate the reference timestamp based on the current time and update delay. - const referenceTimestamp = Date.now() / 1000 - updateDelay; - // Function to check if an element's timestamp is delayed enough. - const isDelayedEnough = (element: SignedData) => parseInt(element.timestamp) < referenceTimestamp; - // Find the index of the newest delayed data entry in the storage. - const index = this.storage.findLastIndex(isDelayedEnough); - // If a delayed entry is found, remove older entries (also include returned one) and return the delayed one. - if (index >= 0) { - const delayedData = this.storage[index]; - this.storage.splice(0, index + 1); - return delayedData; - } + public get(referenceTimestamp: number): SignedData | undefined { + // Find the newest delayed data entry in the storage. + return this.storage.findLast((data) => this.isDelayedEnough(data, referenceTimestamp)); + } + + /** + * Removes all signed data entries from the queue that are not needed anymore. This means that there must exist a + * signed data with timestamp smaller maximum update delay. All data with smaller timestamp can be removed. + */ + public prune(): void { + const index = this.storage.findLastIndex((data) => + this.isDelayedEnough(data, Date.now() / 1000 - this.maxUpdateDelay) + ); - return; + this.storage = this.storage.slice(index); } - public list(): SignedData[] { + public getAll(): SignedData[] { return this.storage; } }