Skip to content

Commit

Permalink
Fix delayed queue premature data prune, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Siegrift committed Sep 29, 2023
1 parent 5c53161 commit 52b7032
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 19 deletions.
5 changes: 4 additions & 1 deletion packages/data-pusher/src/api-requests/signed-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ export const postSignedApiData = async (group: SignedApiNameUpdateDelayGroup) =>

const airnode = ethers.Wallet.fromMnemonic(airnodeWalletMnemonic).address;
const batchPayloadOrNull = templateIds.map((templateId): SignedApiPayload | null => {
const delayedSignedData = templateValues[templateId]!.get(updateDelay);
// Calculate the reference timestamp based on the current time and update delay.
const referenceTimestamp = Date.now() / 1000 - updateDelay;
const delayedSignedData = templateValues[templateId]!.get(referenceTimestamp);
templateValues[templateId]!.prune();
if (isNil(delayedSignedData)) return null;

return { airnode, templateId, beaconId: deriveBeaconId(airnode, templateId), ...delayedSignedData };
Expand Down
63 changes: 63 additions & 0 deletions packages/data-pusher/src/state.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { DelayedSignedDataQueue } from './state';
import { nodarySignedTemplateResponses } from '../test/fixtures';

describe(DelayedSignedDataQueue.name, () => {
afterEach(() => {
jest.useRealTimers();
});

it('can put signed data', () => {
const queue = new DelayedSignedDataQueue(30);
const data = nodarySignedTemplateResponses[0]![1];

queue.put(data);

expect(queue.getAll()).toEqual([data]);
});

it('can get signed data with delay', () => {
const queue = new DelayedSignedDataQueue(30);
const data3 = nodarySignedTemplateResponses[0]![1];
const timestamp = parseInt(data3.timestamp);
const data2 = { ...data3, timestamp: (timestamp - 10).toString() };
const data1 = { ...data3, timestamp: (timestamp - 20).toString() };
queue.put(data1);
queue.put(data2);
queue.put(data3);

expect(queue.get(timestamp + 1)).toEqual(data3);
expect(queue.get(timestamp)).toEqual(data2);
expect(queue.get(timestamp - 5)).toEqual(data2);
expect(queue.get(timestamp - 15)).toEqual(data1);
expect(queue.get(timestamp - 30)).toEqual(undefined);
});

it('ensures that data is inserted by increasing timestamp', () => {
const queue = new DelayedSignedDataQueue(30);
const data3 = nodarySignedTemplateResponses[0]![1];
const timestamp = parseInt(data3.timestamp);
const data2 = { ...data3, timestamp: (timestamp - 10).toString() };
const data1 = { ...data3, timestamp: (timestamp - 20).toString() };
queue.put(data3);

expect(() => queue.put(data1)).toThrow('The signed data is too old');
expect(() => queue.put(data2)).toThrow('The signed data is too old');
});

it('can prune unused data', () => {
jest.useFakeTimers().setSystemTime(new Date('2023-01-20'));

const queue = new DelayedSignedDataQueue(30);
const data3 = nodarySignedTemplateResponses[0]![1];
const timestamp = parseInt(data3.timestamp);
const data2 = { ...data3, timestamp: (timestamp - 40).toString() };
const data1 = { ...data3, timestamp: (timestamp - 50).toString() };
queue.put(data1);
queue.put(data2);
queue.put(data3);

queue.prune();

expect(queue.getAll()).toEqual([data2, data3]);
});
});
70 changes: 52 additions & 18 deletions packages/data-pusher/src/state.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Bottleneck from 'bottleneck';
import { last } from 'lodash';
import { Config, SignedData, TemplateId } from './validation/schema';
import { OIS_MAX_CONCURRENCY_DEFAULT, OIS_MIN_TIME_DEFAULT_MS } from './constants';
import { deriveEndpointId, getRandomId } from './utils';
Expand Down Expand Up @@ -65,8 +66,14 @@ export const buildApiLimiters = (config: Config) => {
return apiLimiters;
};

export const buildTemplateStorages = (config: Config) =>
Object.fromEntries(Object.keys(config.templates).map((templateId) => [templateId, new DelayedSignedDataQueue()]));
export const buildTemplateStorages = (config: Config) => {
return Object.fromEntries(
Object.keys(config.templates).map((templateId) => {
const maxUpdateDelayTime = Math.max(...Object.values(config.triggers.signedApiUpdates).map((u) => u.updateDelay));
return [templateId, new DelayedSignedDataQueue(maxUpdateDelayTime)];
})
);
};

export const getInitialState = (config: Config): State => {
return {
Expand All @@ -89,33 +96,60 @@ export const getState = () => {
*/
export class DelayedSignedDataQueue {
private storage: SignedData[] = [];
private maxUpdateDelay: number;

/**
* Creates the delayed signed data queue with the maximum update delay time. If there exists some signed data satisfying
* this delay, all other signed data with smaller timestamps are removed.
* @param maxUpdateDelay - The maximum update delay time in seconds.
*/
constructor(maxUpdateDelay: number) {
this.maxUpdateDelay = maxUpdateDelay;
}

/**
* Checks if a signed data entry is delayed enough. This means that the timestamp of the entry must be smaller than
* the reference timestamp.
*/
private isDelayedEnough(data: SignedData, referenceTimestamp: number) {
return parseInt(data.timestamp) < referenceTimestamp;
}

/**
* Adds a signed data entry to the queue. Assumes the data is not older than other entries in the queue.
* @param data - The signed data entry to add.
*/
public put(data: SignedData): void {
// Make sure the data is not older than other entries in the queue.
if (this.storage.length && parseInt(last(this.storage)!.timestamp) > parseInt(data.timestamp)) {
throw new Error('The signed data is too old');
}
this.storage.push(data);
}
/**
* Retrieves the newest signed data entry from the queue that is delayed by a specified time.
* @param updateDelay - The maximum delay (in seconds) allowed for retrieved data.
* @param referenceTimestamp - The reference timestamp in seconds. Signed data with newer or equal timestamp is
* ignored during this call.
* @returns The delayed signed data entry, or undefined if none is found.
*/
public get(updateDelay: number): SignedData | undefined {
// Calculate the reference timestamp based on the current time and update delay.
const referenceTimestamp = Date.now() / 1000 - updateDelay;
// Function to check if an element's timestamp is delayed enough.
const isDelayedEnough = (element: SignedData) => parseInt(element.timestamp) < referenceTimestamp;
// Find the index of the newest delayed data entry in the storage.
const index = this.storage.findLastIndex(isDelayedEnough);
// If a delayed entry is found, remove older entries (also include returned one) and return the delayed one.
if (index >= 0) {
const delayedData = this.storage[index];
this.storage.splice(0, index + 1);
return delayedData;
}
public get(referenceTimestamp: number): SignedData | undefined {
// Find the newest delayed data entry in the storage.
return this.storage.findLast((data) => this.isDelayedEnough(data, referenceTimestamp));
}

/**
* Removes all signed data entries from the queue that are not needed anymore. This means that there must exist a
* signed data with timestamp smaller maximum update delay. All data with smaller timestamp can be removed.
*/
public prune(): void {
const index = this.storage.findLastIndex((data) =>
this.isDelayedEnough(data, Date.now() / 1000 - this.maxUpdateDelay)
);

return;
this.storage = this.storage.slice(index);
}

public list(): SignedData[] {
public getAll(): SignedData[] {
return this.storage;
}
}

0 comments on commit 52b7032

Please sign in to comment.