Skip to content

Commit

Permalink
Stagger pusher (#105)
Browse files Browse the repository at this point in the history
* Remove rate limiting

* Add at maximum 10% of fetchInterval to waiting time

* Use strictObject wherever it's needed

* Log the extra time

* Fix tests
  • Loading branch information
Siegrift authored Nov 7, 2023
1 parent 94bea08 commit 0bad6fc
Show file tree
Hide file tree
Showing 15 changed files with 59 additions and 195 deletions.
37 changes: 17 additions & 20 deletions packages/api/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ export const evmAddressSchema = z.string().regex(/^0x[\dA-Fa-f]{40}$/, 'Must be

export const evmIdSchema = z.string().regex(/^0x[\dA-Fa-f]{64}$/, 'Must be a valid EVM ID');

export const endpointSchema = z
.object({
urlPath: z
.string()
.regex(/^\/[\dA-Za-z-]+$/, 'Must start with a slash and contain only alphanumeric characters and dashes'),
delaySeconds: z.number().nonnegative().int(),
})
.strict();
export const endpointSchema = z.strictObject({
urlPath: z
.string()
.regex(/^\/[\dA-Za-z-]+$/, 'Must start with a slash and contain only alphanumeric characters and dashes'),
delaySeconds: z.number().nonnegative().int(),
});

export type Endpoint = z.infer<typeof endpointSchema>;

Expand All @@ -26,21 +24,19 @@ export const endpointsSchema = z

export const allowedAirnodesSchema = z.union([z.literal('*'), z.array(evmAddressSchema).nonempty()]);

export const configSchema = z
.object({
endpoints: endpointsSchema,
maxBatchSize: z.number().nonnegative().int(),
port: z.number().nonnegative().int(),
cache: z.object({
maxAgeSeconds: z.number().nonnegative().int(),
}),
allowedAirnodes: allowedAirnodesSchema,
})
.strict();
export const configSchema = z.strictObject({
endpoints: endpointsSchema,
maxBatchSize: z.number().nonnegative().int(),
port: z.number().nonnegative().int(),
cache: z.strictObject({
maxAgeSeconds: z.number().nonnegative().int(),
}),
allowedAirnodes: allowedAirnodesSchema,
});

export type Config = z.infer<typeof configSchema>;

export const signedDataSchema = z.object({
export const signedDataSchema = z.strictObject({
airnode: evmAddressSchema,
templateId: evmIdSchema,
beaconId: evmIdSchema,
Expand All @@ -60,6 +56,7 @@ export const envBooleanSchema = z.union([z.literal('true'), z.literal('false')])
// We apply default values to make it convenient to omit certain environment variables. The default values should be
// primarily focused on users and production usage.
export const envConfigSchema = z
// Intentionally not using strictObject here because we want to allow other environment variables to be present.
.object({
LOG_COLORIZE: envBooleanSchema.default('false'),
LOG_FORMAT: z
Expand Down
1 change: 0 additions & 1 deletion packages/e2e/src/pusher/pusher.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
"nodeSettings": {
"nodeVersion": "0.1.0",
"airnodeWalletMnemonic": "diamond result history offer forest diagram crop armed stumble orchard stage glance",
"rateLimiting": { "Mock API": { "maxConcurrency": 25, "minTime": 0 } },
"stage": "local-example"
}
}
28 changes: 0 additions & 28 deletions packages/pusher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,34 +323,6 @@ An identifier of the deployment stage. This is used to distinguish between diffe
`dev`, `staging` or `production`. The stage value can have 256 characters at maximum and can only include lowercase
alphanumeric characters and hyphens.

##### `rateLimiting`

Configuration for rate limiting OIS requests. Rate limiting can be configured for each OIS separately. For example:

```jsonc
// Defines no rate limiting.
"rateLimiting": { },
```

or

```jsonc
// Defines rate limiting for OIS with title "Nodary"
"rateLimiting": { "Nodary": { "maxConcurrency": 25, "minTime": 10 } },
```

###### `rateLimiting[<OIS_TITLE>]`

The configuration for the OIS with title `<OIS_TITLE>`.

`maxConcurrency`

Maximum number of concurrent requests to the OIS.

`minTime`

Minimum time in milliseconds between two requests to the OIS.

## Deployment

TODO: Write example how to deploy on AWS
Expand Down
1 change: 0 additions & 1 deletion packages/pusher/config/pusher.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
"nodeSettings": {
"nodeVersion": "0.1.0",
"airnodeWalletMnemonic": "${WALLET_MNEMONIC}",
"rateLimiting": { "Nodary": { "maxConcurrency": 25, "minTime": 10 } },
"stage": "local-example"
}
}
1 change: 0 additions & 1 deletion packages/pusher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@api3/ois": "^2.2.1",
"@api3/promise-utils": "^0.4.0",
"axios": "^1.5.1",
"bottleneck": "^2.19.5",
"dotenv": "^16.3.1",
"ethers": "^5.7.2",
"express": "^4.18.2",
Expand Down
9 changes: 1 addition & 8 deletions packages/pusher/src/api-requests/data-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ export const callApi = async (
export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Promise<TemplateResponse[]> => {
const {
config: { endpoints, templates, ois: oises, apiCredentials },
apiLimiters,
} = getState();
logger.debug('Making template requests', signedApiUpdate);
const { templateIds } = signedApiUpdate;
Expand All @@ -61,13 +60,7 @@ export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Pr
};
}, {});

const limiter = apiLimiters[operationTemplateId];

const goCallApi = await (limiter
? limiter.schedule({ expiration: 90_000 }, async () =>
callApi(ois, operationOisEndpoint, operationApiCallParameters, apiCredentials)
)
: callApi(ois, operationOisEndpoint, operationApiCallParameters, apiCredentials));
const goCallApi = await callApi(ois, operationOisEndpoint, operationApiCallParameters, apiCredentials);

if (!goCallApi.success) {
logger.warn(`Failed to make API call`, {
Expand Down
7 changes: 0 additions & 7 deletions packages/pusher/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
export const SIGNED_DATA_PUSH_POLLING_INTERVAL = 2500;

export const RANDOM_BACKOFF_MIN_MS = 0;
export const RANDOM_BACKOFF_MAX_MS = 2500;
// The minimum amount of time between HTTP calls to remote APIs per OIS.
export const OIS_MIN_TIME_DEFAULT_MS = 20;
// The maximum number of simultaneously-running HTTP requests to remote APIs per OIS.
export const OIS_MAX_CONCURRENCY_DEFAULT = 10;

export const NO_SIGNED_API_UPDATE_EXIT_CODE = 1;
export const NO_FETCH_EXIT_CODE = 2;

Expand Down
12 changes: 11 additions & 1 deletion packages/pusher/src/fetch-beacon-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const initiateFetchingBeaconData = () => {

const { signedApiUpdates } = config.triggers;

// TODO: Validate using zod schema
if (isEmpty(signedApiUpdates)) {
logger.error('No signed API updates found. Stopping.');
// eslint-disable-next-line unicorn/no-process-exit
Expand All @@ -41,8 +42,17 @@ const fetchBeaconDataInLoop = async (signedApiUpdate: SignedApiUpdate) => {
logger.warn(`Could not put signed response`, { templateId, signedResponse, errorMessage: goPut.error.message });
}
});

const duration = Date.now() - startTimestamp;
// Take at most 10% of the fetch interval as extra time to avoid all API requests be done at the same time. This
// delay is taken for each interval, so if the system runs for a sufficiently long time, the requests should happen
// at random intervals.
const extraTime = Math.random() * signedApiUpdate.fetchInterval * 1000 * 0.1;
logger.debug('Adding extra time to fetch interval', {
extraTime,
fetchInterval: signedApiUpdate.fetchInterval * 1000,
});

await sleep(signedApiUpdate.fetchInterval * 1000 - duration);
await sleep(signedApiUpdate.fetchInterval * 1000 - duration + extraTime);
}
};
12 changes: 6 additions & 6 deletions packages/pusher/src/heartbeat/heartbeat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ describe(logHeartbeat.name, () => {
nodeVersion: '0.1.0',
currentTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286',
configHash: '0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730',
signature:
'0x24467037db96b652286c30c39ee9611faff07e1c17916f5c154ea7a27dfbc32f308969bdadf586bdaee0951b84819633e126a4fc72e3aa2e98a6eda95ce640081b',
'0xc2adb0ef11cdf50fc382752a73d2314cef28f530d030801a1e301cea2da2f66961c30dd158e20e80ed05819ed8f7ef53e0ca6a73441de7df3bdff209e586e3b71c',
};
const rawConfig = JSON.parse(readFileSync(join(__dirname, '../../config/pusher.example.json'), 'utf8'));
jest.spyOn(configModule, 'loadRawConfig').mockReturnValue(rawConfig);
Expand All @@ -50,12 +50,12 @@ describe(verifyHeartbeatLog.name, () => {
const jsonLog = {
context: {
airnode: '0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286',
configHash: '0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730',
currentTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
nodeVersion: '0.1.0',
signature:
'0x24467037db96b652286c30c39ee9611faff07e1c17916f5c154ea7a27dfbc32f308969bdadf586bdaee0951b84819633e126a4fc72e3aa2e98a6eda95ce640081b',
'0xc2adb0ef11cdf50fc382752a73d2314cef28f530d030801a1e301cea2da2f66961c30dd158e20e80ed05819ed8f7ef53e0ca6a73441de7df3bdff209e586e3b71c',
stage: 'test',
},
level: 'info',
Expand All @@ -81,10 +81,10 @@ describe(stringifyUnsignedHeartbeatPayload.name, () => {
nodeVersion: '0.1.0',
currentTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286',
configHash: '0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730',
})
).toBe(
'{"airnode":"0xbF3137b0a7574563a23a8fC8badC6537F98197CC","configHash":"0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286","currentTimestamp":"1674172803","deploymentTimestamp":"1674172800","nodeVersion":"0.1.0","stage":"test"}'
'{"airnode":"0xbF3137b0a7574563a23a8fC8badC6537F98197CC","configHash":"0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730","currentTimestamp":"1674172803","deploymentTimestamp":"1674172800","nodeVersion":"0.1.0","stage":"test"}'
);
});
});
Expand Down
53 changes: 0 additions & 53 deletions packages/pusher/src/state.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import Bottleneck from 'bottleneck';
import { ethers } from 'ethers';
import { last } from 'lodash';

import { OIS_MAX_CONCURRENCY_DEFAULT, OIS_MIN_TIME_DEFAULT_MS } from './constants';
import { deriveEndpointId, getRandomId } from './utils';
import type { Config, SignedData, TemplateId } from './validation/schema';

export type TemplateValueStorage = Record<TemplateId, DelayedSignedDataQueue>;

export interface State {
config: Config;
templateValues: TemplateValueStorage;
apiLimiters: Record<string, Bottleneck | undefined>;
// We persist the derived Airnode wallet in memory as a performance optimization.
airnodeWallet: ethers.Wallet;
// The timestamp of when the service was initialized. This can be treated as a "deployment" timestamp.
Expand All @@ -25,54 +21,6 @@ export const initializeState = (config: Config) => {
return state;
};

export const buildApiLimiters = (config: Config) => {
const { ois, nodeSettings, templates } = config;
const { rateLimiting } = nodeSettings;

if (!ois) {
return {};
}

const oisLimiters = Object.fromEntries(
ois.map((ois) => {
if (rateLimiting[ois.title]) {
const { minTime, maxConcurrency } = rateLimiting[ois.title]!;

return [
ois.title,
new Bottleneck({
id: getRandomId(),
minTime: minTime ?? OIS_MIN_TIME_DEFAULT_MS,
maxConcurrent: maxConcurrency ?? OIS_MAX_CONCURRENCY_DEFAULT,
}),
];
}

return [
ois.title,
new Bottleneck({
id: getRandomId(),
minTime: OIS_MIN_TIME_DEFAULT_MS,
maxConcurrent: OIS_MAX_CONCURRENCY_DEFAULT,
}),
];
})
);
const endpointTitles = Object.fromEntries(
ois.flatMap((ois) => ois.endpoints.map((endpoint) => [deriveEndpointId(ois.title, endpoint.name), ois.title]))
);

// Make use of the reference/pointer nature of objects
const apiLimiters = Object.fromEntries(
Object.entries(templates).map(([templateId, template]) => {
const title = endpointTitles[template.endpointId]!;
return [templateId, oisLimiters[title]];
})
);

return apiLimiters;
};

export const buildTemplateStorages = (config: Config) => {
return Object.fromEntries(
Object.keys(config.templates).map((templateId) => {
Expand All @@ -86,7 +34,6 @@ export const getInitialState = (config: Config): State => {
return {
config,
templateValues: buildTemplateStorages(config),
apiLimiters: buildApiLimiters(config),
airnodeWallet: ethers.Wallet.fromMnemonic(config.nodeSettings.airnodeWalletMnemonic),
deploymentTimestamp: Math.floor(Date.now() / 1000).toString(),
};
Expand Down
1 change: 1 addition & 0 deletions packages/pusher/src/update-signed-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export const initiateUpdatingSignedApi = () => {
}))
);

// TODO: Validate using zod schema
if (isEmpty(signedApiUpdateDelayGroups)) {
logger.error('No signed API updates found. Stopping.');
// eslint-disable-next-line unicorn/no-process-exit
Expand Down
5 changes: 0 additions & 5 deletions packages/pusher/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ import { ethers } from 'ethers';

export const sleep = async (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

/**
* Generates a random ID used when creating Bottleneck limiters.
*/
export const getRandomId = () => ethers.utils.randomBytes(16).toString();

export const deriveEndpointId = (oisTitle: string, endpointName: string) =>
ethers.utils.keccak256(ethers.utils.defaultAbiCoder.encode(['string', 'string'], [oisTitle, endpointName]));

Expand Down
Loading

0 comments on commit 0bad6fc

Please sign in to comment.