Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delayed queue premature data prune, add tests #58

Merged
merged 1 commit into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/data-pusher/src/api-requests/signed-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe(postSignedApiData.name, () => {
// Assumes the template responses are for unique template IDs (which is true in the test fixtures).
state.templateValues = Object.fromEntries(
nodarySignedTemplateResponses.map(([templateId, signedData]) => {
const dataQueue = new stateModule.DelayedSignedDataQueue();
const dataQueue = new stateModule.DelayedSignedDataQueue(0);
dataQueue.put(signedData);
return [templateId, dataQueue];
})
Expand All @@ -29,7 +29,7 @@ describe(postSignedApiData.name, () => {
// Assumes the template responses are for unique template IDs (which is true in the test fixtures).
state.templateValues = Object.fromEntries(
nodarySignedTemplateResponses.map(([templateId, signedData]) => {
const dataQueue = new stateModule.DelayedSignedDataQueue();
const dataQueue = new stateModule.DelayedSignedDataQueue(0);
dataQueue.put(signedData);
return [templateId, dataQueue];
})
Expand Down Expand Up @@ -61,7 +61,7 @@ describe(postSignedApiData.name, () => {
// Assumes the template responses are for unique template IDs (which is true in the test fixtures).
state.templateValues = Object.fromEntries(
nodarySignedTemplateResponses.map(([templateId, signedData]) => {
const dataQueue = new stateModule.DelayedSignedDataQueue();
const dataQueue = new stateModule.DelayedSignedDataQueue(0);
dataQueue.put(signedData);
return [templateId, dataQueue];
})
Expand Down
5 changes: 4 additions & 1 deletion packages/data-pusher/src/api-requests/signed-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ export const postSignedApiData = async (group: SignedApiNameUpdateDelayGroup) =>

const airnode = ethers.Wallet.fromMnemonic(airnodeWalletMnemonic).address;
const batchPayloadOrNull = templateIds.map((templateId): SignedApiPayload | null => {
const delayedSignedData = templateValues[templateId]!.get(updateDelay);
// Calculate the reference timestamp based on the current time and update delay.
const referenceTimestamp = Date.now() / 1000 - updateDelay;
const delayedSignedData = templateValues[templateId]!.get(referenceTimestamp);
templateValues[templateId]!.prune();
if (isNil(delayedSignedData)) return null;

return { airnode, templateId, beaconId: deriveBeaconId(airnode, templateId), ...delayedSignedData };
Expand Down
63 changes: 63 additions & 0 deletions packages/data-pusher/src/state.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { DelayedSignedDataQueue } from './state';
import { nodarySignedTemplateResponses } from '../test/fixtures';

describe(DelayedSignedDataQueue.name, () => {
afterEach(() => {
jest.useRealTimers();
});

it('can put signed data', () => {
const queue = new DelayedSignedDataQueue(30);
const data = nodarySignedTemplateResponses[0]![1];

queue.put(data);

expect(queue.getAll()).toEqual([data]);
});

it('can get signed data with delay', () => {
const queue = new DelayedSignedDataQueue(30);
const data3 = nodarySignedTemplateResponses[0]![1];
const timestamp = parseInt(data3.timestamp);
const data2 = { ...data3, timestamp: (timestamp - 10).toString() };
const data1 = { ...data3, timestamp: (timestamp - 20).toString() };
queue.put(data1);
queue.put(data2);
queue.put(data3);

expect(queue.get(timestamp + 1)).toEqual(data3);
expect(queue.get(timestamp)).toEqual(data2);
expect(queue.get(timestamp - 5)).toEqual(data2);
expect(queue.get(timestamp - 15)).toEqual(data1);
expect(queue.get(timestamp - 30)).toEqual(undefined);
});

it('ensures that data is inserted by increasing timestamp', () => {
const queue = new DelayedSignedDataQueue(30);
const data3 = nodarySignedTemplateResponses[0]![1];
const timestamp = parseInt(data3.timestamp);
const data2 = { ...data3, timestamp: (timestamp - 10).toString() };
const data1 = { ...data3, timestamp: (timestamp - 20).toString() };
queue.put(data3);

expect(() => queue.put(data1)).toThrow('The signed data is too old');
expect(() => queue.put(data2)).toThrow('The signed data is too old');
});

it('can prune unused data', () => {
jest.useFakeTimers().setSystemTime(new Date('2023-01-20'));

const queue = new DelayedSignedDataQueue(30);
const data3 = nodarySignedTemplateResponses[0]![1];
const timestamp = parseInt(data3.timestamp);
const data2 = { ...data3, timestamp: (timestamp - 40).toString() };
const data1 = { ...data3, timestamp: (timestamp - 50).toString() };
queue.put(data1);
queue.put(data2);
queue.put(data3);

queue.prune();

expect(queue.getAll()).toEqual([data2, data3]);
});
});
70 changes: 52 additions & 18 deletions packages/data-pusher/src/state.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Bottleneck from 'bottleneck';
import { last } from 'lodash';
import { Config, SignedData, TemplateId } from './validation/schema';
import { OIS_MAX_CONCURRENCY_DEFAULT, OIS_MIN_TIME_DEFAULT_MS } from './constants';
import { deriveEndpointId, getRandomId } from './utils';
Expand Down Expand Up @@ -65,8 +66,14 @@ export const buildApiLimiters = (config: Config) => {
return apiLimiters;
};

export const buildTemplateStorages = (config: Config) =>
Object.fromEntries(Object.keys(config.templates).map((templateId) => [templateId, new DelayedSignedDataQueue()]));
export const buildTemplateStorages = (config: Config) => {
return Object.fromEntries(
Object.keys(config.templates).map((templateId) => {
const maxUpdateDelayTime = Math.max(...Object.values(config.triggers.signedApiUpdates).map((u) => u.updateDelay));
return [templateId, new DelayedSignedDataQueue(maxUpdateDelayTime)];
})
);
};

export const getInitialState = (config: Config): State => {
return {
Expand All @@ -89,33 +96,60 @@ export const getState = () => {
*/
export class DelayedSignedDataQueue {
private storage: SignedData[] = [];
private maxUpdateDelay: number;

/**
* Creates the delayed signed data queue with the maximum update delay time. If there exists some signed data satisfying
* this delay, all other signed data with smaller timestamps are removed.
* @param maxUpdateDelay - The maximum update delay time in seconds.
*/
constructor(maxUpdateDelay: number) {
this.maxUpdateDelay = maxUpdateDelay;
}

/**
* Checks if a signed data entry is delayed enough. This means that the timestamp of the entry must be smaller than
* the reference timestamp.
*/
private isDelayedEnough(data: SignedData, referenceTimestamp: number) {
return parseInt(data.timestamp) < referenceTimestamp;
}

/**
* Adds a signed data entry to the queue. Assumes the data is not older than other entries in the queue.
* @param data - The signed data entry to add.
*/
public put(data: SignedData): void {
// Make sure the data is not older than other entries in the queue.
if (this.storage.length && parseInt(last(this.storage)!.timestamp) > parseInt(data.timestamp)) {
throw new Error('The signed data is too old');
}
this.storage.push(data);
}
/**
* Retrieves the newest signed data entry from the queue that is delayed by a specified time.
* @param updateDelay - The maximum delay (in seconds) allowed for retrieved data.
* @param referenceTimestamp - The reference timestamp in seconds. Signed data with newer or equal timestamp is
* ignored during this call.
* @returns The delayed signed data entry, or undefined if none is found.
*/
public get(updateDelay: number): SignedData | undefined {
// Calculate the reference timestamp based on the current time and update delay.
const referenceTimestamp = Date.now() / 1000 - updateDelay;
// Function to check if an element's timestamp is delayed enough.
const isDelayedEnough = (element: SignedData) => parseInt(element.timestamp) < referenceTimestamp;
// Find the index of the newest delayed data entry in the storage.
const index = this.storage.findLastIndex(isDelayedEnough);
// If a delayed entry is found, remove older entries (also include returned one) and return the delayed one.
if (index >= 0) {
const delayedData = this.storage[index];
this.storage.splice(0, index + 1);
return delayedData;
}
public get(referenceTimestamp: number): SignedData | undefined {
// Find the newest delayed data entry in the storage.
return this.storage.findLast((data) => this.isDelayedEnough(data, referenceTimestamp));
}

/**
* Removes all signed data entries from the queue that are not needed anymore. This means that there must exist a
* signed data with timestamp smaller maximum update delay. All data with smaller timestamp can be removed.
*/
public prune(): void {
const index = this.storage.findLastIndex((data) =>
this.isDelayedEnough(data, Date.now() / 1000 - this.maxUpdateDelay)
);

return;
this.storage = this.storage.slice(index);
}

public list(): SignedData[] {
public getAll(): SignedData[] {
return this.storage;
}
}