Skip to content

Commit

Permalink
Improve query to find candles map. (#2650)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentwschau authored Dec 12, 2024
1 parent 3d59131 commit 196dc84
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 46 deletions.
86 changes: 58 additions & 28 deletions indexer/packages/postgres/src/stores/candle-table.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import _ from 'lodash';
import { PartialModelObject, QueryBuilder } from 'objection';

import { BUFFER_ENCODING_UTF_8, DEFAULT_POSTGRES_OPTIONS } from '../constants';
import { knexReadReplica } from '../helpers/knex';
import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers';
import Transaction from '../helpers/transaction';
import { getUuid } from '../helpers/uuid';
Expand Down Expand Up @@ -174,36 +174,66 @@ export async function findLatest(

export async function findCandlesMap(
tickers: string[],
resolutions: CandleResolution[],
options: Options = DEFAULT_POSTGRES_OPTIONS,
): Promise<CandlesMap> {
if (tickers.length === 0) {
return {};
}

const candlesMap: CandlesMap = {};
for (const ticker of tickers) {
candlesMap[ticker] = {};
}

await Promise.all(
_.map(
tickers,
async (ticker: string) => {
candlesMap[ticker] = {};
const findLatestCandles: Promise<CandleFromDatabase | undefined>[] = resolutions.map(
(resolution: CandleResolution) => findLatest(
ticker,
resolution,
options,
),
);

// Map each resolution to its respective candle
const allLatestCandles: (CandleFromDatabase | undefined)[] = await Promise.all(
findLatestCandles,
);
_.forEach(allLatestCandles, (candle: CandleFromDatabase | undefined) => {
if (candle !== undefined) {
candlesMap[ticker][candle.resolution] = candle;
}
});
},
),
);
const minuteCandlesResult: {
rows: CandleFromDatabase[],
} = await knexReadReplica.getConnection().raw(
`
SELECT DISTINCT ON (
ticker,
resolution
) candles.* FROM
candles
WHERE
"ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND
"startedAt" > NOW() - INTERVAL '3 hours' AND
resolution IN ('1MIN', '5MINS', '15MINS', '30MINS', '1HOUR')
ORDER BY
ticker,
resolution,
"startedAt" DESC;
`,
) as unknown as {
rows: CandleFromDatabase[],
};
const hourDayCandlesResult: {
rows: CandleFromDatabase[],
} = await knexReadReplica.getConnection().raw(
`
SELECT DISTINCT ON (
ticker,
resolution
) candles.* FROM
candles
WHERE
"ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND
"startedAt" > NOW() - INTERVAL '2 days' AND
resolution IN ('4HOURS', '1DAY')
ORDER BY
ticker,
resolution,
"startedAt" DESC;
`,
) as unknown as {
rows: CandleFromDatabase[],
};
const latestCandles: CandleFromDatabase[] = minuteCandlesResult.rows
.concat(hourDayCandlesResult.rows);
for (const candle of latestCandles) {
if (candlesMap[candle.ticker] === undefined) {
candlesMap[candle.ticker] = {};
}
candlesMap[candle.ticker][candle.resolution] = candle;
}

return candlesMap;
}
36 changes: 20 additions & 16 deletions indexer/services/ender/__tests__/lib/candles-generator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ import {
redis,
} from '@dydxprotocol-indexer/redis';
import { ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX } from '@dydxprotocol-indexer/redis/build/src/caches/orderbook-mid-prices-cache';
import { DateTime, Settings } from 'luxon';

describe('candleHelper', () => {
const startedAt: DateTime = helpers.calculateNormalizedCandleStartTime(
testConstants.createdDateTime,
CandleResolution.ONE_MINUTE,
);

beforeAll(async () => {
await dbHelpers.migrate();
await dbHelpers.clearData();
Expand All @@ -48,13 +54,15 @@ describe('candleHelper', () => {
beforeEach(async () => {
await testMocks.seedData();
await perpetualMarketRefresher.updatePerpetualMarkets();
Settings.now = () => startedAt.plus({ minutes: 30 }).valueOf();
});

afterEach(async () => {
await dbHelpers.clearData();
clearCandlesMap();
jest.clearAllMocks();
await redis.deleteAllAsync(redisClient);
Settings.now = () => new Date().valueOf();
});

afterAll(async () => {
Expand Down Expand Up @@ -87,10 +95,6 @@ describe('candleHelper', () => {
orderbookMidPriceClose: undefined,
orderbookMidPriceOpen: undefined,
};
const startedAt: IsoString = helpers.calculateNormalizedCandleStartTime(
testConstants.createdDateTime,
CandleResolution.ONE_MINUTE,
).toISO();
const previousStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime(
testConstants.createdDateTime.minus({ minutes: 1 }),
CandleResolution.ONE_MINUTE,
Expand Down Expand Up @@ -304,8 +308,8 @@ describe('candleHelper', () => {
'100', // open interest
false, // block contains trades
{ // expected candle
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt,
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt: startedAt.toISO(),
ticker: testConstants.defaultPerpetualMarket.ticker,
resolution: CandleResolution.ONE_MINUTE,
low: closePrice,
Expand Down Expand Up @@ -343,8 +347,8 @@ describe('candleHelper', () => {
true, // block contains trades
{ // expected candle
...defaultCandle,
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt,
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt: startedAt.toISO(),
resolution: CandleResolution.ONE_MINUTE,
startingOpenInterest: '100',
orderbookMidPriceClose: '1000',
Expand All @@ -356,7 +360,7 @@ describe('candleHelper', () => {
[
'updates empty candle', // description
{ // initial candle
startedAt,
startedAt: startedAt.toISO(),
ticker: testConstants.defaultPerpetualMarket.ticker,
resolution: CandleResolution.ONE_MINUTE,
low: closePrice,
Expand All @@ -374,8 +378,8 @@ describe('candleHelper', () => {
true, // block contains trades
{ // expected candle
...defaultCandle,
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt,
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt: startedAt.toISO(),
resolution: CandleResolution.ONE_MINUTE,
startingOpenInterest: existingStartingOpenInterest,
orderbookMidPriceClose: null,
Expand All @@ -396,7 +400,7 @@ describe('candleHelper', () => {
[
'does not update candle when there are no trades and an existing candle', // description
{ // initial candle
startedAt,
startedAt: startedAt.toISO(),
ticker: testConstants.defaultPerpetualMarket.ticker,
resolution: CandleResolution.ONE_MINUTE,
low: lowPrice,
Expand All @@ -413,8 +417,8 @@ describe('candleHelper', () => {
'100', // open interest
false, // block contains trades
{ // expected candle
id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt,
id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE),
startedAt: startedAt.toISO(),
ticker: testConstants.defaultPerpetualMarket.ticker,
resolution: CandleResolution.ONE_MINUTE,
low: lowPrice,
Expand Down Expand Up @@ -463,7 +467,7 @@ describe('candleHelper', () => {

if (expectedCandle === undefined) {
// Verify no candles in postgres and no kafka messages
await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt);
await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt.toISO());
verifyNoCandlesKafkaMessages(publisher, CandleResolution.ONE_MINUTE);
} else {
const expectedCandles: CandleFromDatabase[] = [expectedCandle];
Expand All @@ -489,7 +493,7 @@ describe('candleHelper', () => {
const startTime: IsoString = helpers.calculateNormalizedCandleStartTime(
testConstants.createdDateTime.minus({ minutes: 100 }),
CandleResolution.ONE_MINUTE,
).toISO();
).toUTC().toISO();

await Promise.all(
_.map(Object.values(CandleResolution), (resolution: CandleResolution) => {
Expand Down
5 changes: 5 additions & 0 deletions indexer/services/ender/src/caches/block-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ function isNextBlock(blockHeight: string): boolean {
* All caches must be initialized in a Transaction to ensure consistency
*/
export async function initializeAllCaches(): Promise<void> {
const start: number = Date.now();
const txId: number = await Transaction.start();
await Transaction.setIsolationLevel(txId, IsolationLevel.READ_COMMITTED);

Expand All @@ -120,6 +121,10 @@ export async function initializeAllCaches(): Promise<void> {
]);

await Transaction.rollback(txId);
stats.timing(
`${config.SERVICE_NAME}.initialize_caches`,
Date.now() - start,
);
}

export function resetBlockCache(): void {
Expand Down
2 changes: 0 additions & 2 deletions indexer/services/ender/src/caches/candle-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ export async function startCandleCache(txId?: number): Promise<void> {

candlesMap = await CandleTable.findCandlesMap(
tickers,
Object.values(CandleResolution),
{ txId },
);
}

Expand Down

0 comments on commit 196dc84

Please sign in to comment.