Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stagger pusher #105

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions packages/api/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ export const evmAddressSchema = z.string().regex(/^0x[\dA-Fa-f]{40}$/, 'Must be

export const evmIdSchema = z.string().regex(/^0x[\dA-Fa-f]{64}$/, 'Must be a valid EVM ID');

export const endpointSchema = z
.object({
urlPath: z
.string()
.regex(/^\/[\dA-Za-z-]+$/, 'Must start with a slash and contain only alphanumeric characters and dashes'),
delaySeconds: z.number().nonnegative().int(),
})
.strict();
export const endpointSchema = z.strictObject({
urlPath: z
.string()
.regex(/^\/[\dA-Za-z-]+$/, 'Must start with a slash and contain only alphanumeric characters and dashes'),
delaySeconds: z.number().nonnegative().int(),
});

export type Endpoint = z.infer<typeof endpointSchema>;

Expand All @@ -26,21 +24,19 @@ export const endpointsSchema = z

export const allowedAirnodesSchema = z.union([z.literal('*'), z.array(evmAddressSchema).nonempty()]);

export const configSchema = z
.object({
endpoints: endpointsSchema,
maxBatchSize: z.number().nonnegative().int(),
port: z.number().nonnegative().int(),
cache: z.object({
maxAgeSeconds: z.number().nonnegative().int(),
}),
allowedAirnodes: allowedAirnodesSchema,
})
.strict();
export const configSchema = z.strictObject({
endpoints: endpointsSchema,
maxBatchSize: z.number().nonnegative().int(),
port: z.number().nonnegative().int(),
cache: z.strictObject({
maxAgeSeconds: z.number().nonnegative().int(),
}),
allowedAirnodes: allowedAirnodesSchema,
});

export type Config = z.infer<typeof configSchema>;

export const signedDataSchema = z.object({
export const signedDataSchema = z.strictObject({
airnode: evmAddressSchema,
templateId: evmIdSchema,
beaconId: evmIdSchema,
Expand All @@ -60,6 +56,7 @@ export const envBooleanSchema = z.union([z.literal('true'), z.literal('false')])
// We apply default values to make it convenient to omit certain environment variables. The default values should be
// primarily focused on users and production usage.
export const envConfigSchema = z
// Intentionally not using strictObject here because we want to allow other environment variables to be present.
.object({
LOG_COLORIZE: envBooleanSchema.default('false'),
LOG_FORMAT: z
Expand Down
1 change: 0 additions & 1 deletion packages/e2e/src/pusher/pusher.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
"nodeSettings": {
"nodeVersion": "0.1.0",
"airnodeWalletMnemonic": "diamond result history offer forest diagram crop armed stumble orchard stage glance",
"rateLimiting": { "Mock API": { "maxConcurrency": 25, "minTime": 0 } },
"stage": "local-example"
}
}
28 changes: 0 additions & 28 deletions packages/pusher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,34 +323,6 @@ An identifier of the deployment stage. This is used to distinguish between diffe
`dev`, `staging` or `production`. The stage value can have 256 characters at maximum and can only include lowercase
alphanumeric characters and hyphens.

##### `rateLimiting`

Configuration for rate limiting OIS requests. Rate limiting can be configured for each OIS separately. For example:

```jsonc
// Defines no rate limiting.
"rateLimiting": { },
```

or

```jsonc
// Defines rate limiting for OIS with title "Nodary"
"rateLimiting": { "Nodary": { "maxConcurrency": 25, "minTime": 10 } },
```

###### `rateLimiting[<OIS_TITLE>]`

The configuration for the OIS with title `<OIS_TITLE>`.

`maxConcurrency`

Maximum number of concurrent requests to the OIS.

`minTime`

Minimum time in milliseconds between two requests to the OIS.

## Deployment

TODO: Write example how to deploy on AWS
Expand Down
1 change: 0 additions & 1 deletion packages/pusher/config/pusher.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
"nodeSettings": {
"nodeVersion": "0.1.0",
"airnodeWalletMnemonic": "${WALLET_MNEMONIC}",
"rateLimiting": { "Nodary": { "maxConcurrency": 25, "minTime": 10 } },
"stage": "local-example"
}
}
1 change: 0 additions & 1 deletion packages/pusher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@api3/ois": "^2.2.1",
"@api3/promise-utils": "^0.4.0",
"axios": "^1.5.1",
"bottleneck": "^2.19.5",
"dotenv": "^16.3.1",
"ethers": "^5.7.2",
"express": "^4.18.2",
Expand Down
9 changes: 1 addition & 8 deletions packages/pusher/src/api-requests/data-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ export const callApi = async (
export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Promise<TemplateResponse[]> => {
const {
config: { endpoints, templates, ois: oises, apiCredentials },
apiLimiters,
} = getState();
logger.debug('Making template requests', signedApiUpdate);
const { templateIds } = signedApiUpdate;
Expand All @@ -61,13 +60,7 @@ export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Pr
};
}, {});

const limiter = apiLimiters[operationTemplateId];

const goCallApi = await (limiter
? limiter.schedule({ expiration: 90_000 }, async () =>
callApi(ois, operationOisEndpoint, operationApiCallParameters, apiCredentials)
)
: callApi(ois, operationOisEndpoint, operationApiCallParameters, apiCredentials));
const goCallApi = await callApi(ois, operationOisEndpoint, operationApiCallParameters, apiCredentials);

if (!goCallApi.success) {
logger.warn(`Failed to make API call`, {
Expand Down
7 changes: 0 additions & 7 deletions packages/pusher/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
export const SIGNED_DATA_PUSH_POLLING_INTERVAL = 2500;

export const RANDOM_BACKOFF_MIN_MS = 0;
export const RANDOM_BACKOFF_MAX_MS = 2500;
// The minimum amount of time between HTTP calls to remote APIs per OIS.
export const OIS_MIN_TIME_DEFAULT_MS = 20;
// The maximum number of simultaneously-running HTTP requests to remote APIs per OIS.
export const OIS_MAX_CONCURRENCY_DEFAULT = 10;

export const NO_SIGNED_API_UPDATE_EXIT_CODE = 1;
export const NO_FETCH_EXIT_CODE = 2;

Expand Down
12 changes: 11 additions & 1 deletion packages/pusher/src/fetch-beacon-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const initiateFetchingBeaconData = () => {

const { signedApiUpdates } = config.triggers;

// TODO: Validate using zod schema
if (isEmpty(signedApiUpdates)) {
logger.error('No signed API updates found. Stopping.');
// eslint-disable-next-line unicorn/no-process-exit
Expand All @@ -41,8 +42,17 @@ const fetchBeaconDataInLoop = async (signedApiUpdate: SignedApiUpdate) => {
logger.warn(`Could not put signed response`, { templateId, signedResponse, errorMessage: goPut.error.message });
}
});

const duration = Date.now() - startTimestamp;
// Take at most 10% of the fetch interval as extra time to avoid all API requests be done at the same time. This
// delay is taken for each interval, so if the system runs for a sufficiently long time, the requests should happen
// at random intervals.
const extraTime = Math.random() * signedApiUpdate.fetchInterval * 1000 * 0.1;
logger.debug('Adding extra time to fetch interval', {
extraTime,
fetchInterval: signedApiUpdate.fetchInterval * 1000,
});

await sleep(signedApiUpdate.fetchInterval * 1000 - duration);
await sleep(signedApiUpdate.fetchInterval * 1000 - duration + extraTime);
}
};
12 changes: 6 additions & 6 deletions packages/pusher/src/heartbeat/heartbeat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ describe(logHeartbeat.name, () => {
nodeVersion: '0.1.0',
currentTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286',
configHash: '0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730',
signature:
'0x24467037db96b652286c30c39ee9611faff07e1c17916f5c154ea7a27dfbc32f308969bdadf586bdaee0951b84819633e126a4fc72e3aa2e98a6eda95ce640081b',
'0xc2adb0ef11cdf50fc382752a73d2314cef28f530d030801a1e301cea2da2f66961c30dd158e20e80ed05819ed8f7ef53e0ca6a73441de7df3bdff209e586e3b71c',
};
const rawConfig = JSON.parse(readFileSync(join(__dirname, '../../config/pusher.example.json'), 'utf8'));
jest.spyOn(configModule, 'loadRawConfig').mockReturnValue(rawConfig);
Expand All @@ -50,12 +50,12 @@ describe(verifyHeartbeatLog.name, () => {
const jsonLog = {
context: {
airnode: '0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286',
configHash: '0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730',
currentTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
nodeVersion: '0.1.0',
signature:
'0x24467037db96b652286c30c39ee9611faff07e1c17916f5c154ea7a27dfbc32f308969bdadf586bdaee0951b84819633e126a4fc72e3aa2e98a6eda95ce640081b',
'0xc2adb0ef11cdf50fc382752a73d2314cef28f530d030801a1e301cea2da2f66961c30dd158e20e80ed05819ed8f7ef53e0ca6a73441de7df3bdff209e586e3b71c',
stage: 'test',
},
level: 'info',
Expand All @@ -81,10 +81,10 @@ describe(stringifyUnsignedHeartbeatPayload.name, () => {
nodeVersion: '0.1.0',
currentTimestamp: '1674172803',
deploymentTimestamp: '1674172800',
configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286',
configHash: '0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730',
})
).toBe(
'{"airnode":"0xbF3137b0a7574563a23a8fC8badC6537F98197CC","configHash":"0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286","currentTimestamp":"1674172803","deploymentTimestamp":"1674172800","nodeVersion":"0.1.0","stage":"test"}'
'{"airnode":"0xbF3137b0a7574563a23a8fC8badC6537F98197CC","configHash":"0xc40fb6dce9a4c5898b344f17ecc922a8ab97096ed92e5e2f8c53edb486ea7730","currentTimestamp":"1674172803","deploymentTimestamp":"1674172800","nodeVersion":"0.1.0","stage":"test"}'
);
});
});
Expand Down
53 changes: 0 additions & 53 deletions packages/pusher/src/state.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import Bottleneck from 'bottleneck';
import { ethers } from 'ethers';
import { last } from 'lodash';

import { OIS_MAX_CONCURRENCY_DEFAULT, OIS_MIN_TIME_DEFAULT_MS } from './constants';
import { deriveEndpointId, getRandomId } from './utils';
import type { Config, SignedData, TemplateId } from './validation/schema';

export type TemplateValueStorage = Record<TemplateId, DelayedSignedDataQueue>;

export interface State {
config: Config;
templateValues: TemplateValueStorage;
apiLimiters: Record<string, Bottleneck | undefined>;
// We persist the derived Airnode wallet in memory as a performance optimization.
airnodeWallet: ethers.Wallet;
// The timestamp of when the service was initialized. This can be treated as a "deployment" timestamp.
Expand All @@ -25,54 +21,6 @@ export const initializeState = (config: Config) => {
return state;
};

export const buildApiLimiters = (config: Config) => {
const { ois, nodeSettings, templates } = config;
const { rateLimiting } = nodeSettings;

if (!ois) {
return {};
}

const oisLimiters = Object.fromEntries(
ois.map((ois) => {
if (rateLimiting[ois.title]) {
const { minTime, maxConcurrency } = rateLimiting[ois.title]!;

return [
ois.title,
new Bottleneck({
id: getRandomId(),
minTime: minTime ?? OIS_MIN_TIME_DEFAULT_MS,
maxConcurrent: maxConcurrency ?? OIS_MAX_CONCURRENCY_DEFAULT,
}),
];
}

return [
ois.title,
new Bottleneck({
id: getRandomId(),
minTime: OIS_MIN_TIME_DEFAULT_MS,
maxConcurrent: OIS_MAX_CONCURRENCY_DEFAULT,
}),
];
})
);
const endpointTitles = Object.fromEntries(
ois.flatMap((ois) => ois.endpoints.map((endpoint) => [deriveEndpointId(ois.title, endpoint.name), ois.title]))
);

// Make use of the reference/pointer nature of objects
const apiLimiters = Object.fromEntries(
Object.entries(templates).map(([templateId, template]) => {
const title = endpointTitles[template.endpointId]!;
return [templateId, oisLimiters[title]];
})
);

return apiLimiters;
};

export const buildTemplateStorages = (config: Config) => {
return Object.fromEntries(
Object.keys(config.templates).map((templateId) => {
Expand All @@ -86,7 +34,6 @@ export const getInitialState = (config: Config): State => {
return {
config,
templateValues: buildTemplateStorages(config),
apiLimiters: buildApiLimiters(config),
airnodeWallet: ethers.Wallet.fromMnemonic(config.nodeSettings.airnodeWalletMnemonic),
deploymentTimestamp: Math.floor(Date.now() / 1000).toString(),
};
Expand Down
1 change: 1 addition & 0 deletions packages/pusher/src/update-signed-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export const initiateUpdatingSignedApi = () => {
}))
);

// TODO: Validate using zod schema
if (isEmpty(signedApiUpdateDelayGroups)) {
logger.error('No signed API updates found. Stopping.');
// eslint-disable-next-line unicorn/no-process-exit
Expand Down
5 changes: 0 additions & 5 deletions packages/pusher/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ import { ethers } from 'ethers';

export const sleep = async (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

/**
* Generates a random ID used when creating Bottleneck limiters.
*/
export const getRandomId = () => ethers.utils.randomBytes(16).toString();

export const deriveEndpointId = (oisTitle: string, endpointName: string) =>
ethers.utils.keccak256(ethers.utils.defaultAbiCoder.encode(['string', 'string'], [oisTitle, endpointName]));

Expand Down
Loading