From 8943f5098440e04fc40265d16c70365af403eff8 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Thu, 12 Dec 2024 12:58:58 -0500 Subject: [PATCH] Improved candles query. --- .../postgres/src/stores/candle-table.ts | 86 +++++++++++++------ .../__tests__/lib/candles-generator.test.ts | 40 +++++---- .../services/ender/src/caches/candle-cache.ts | 2 - 3 files changed, 83 insertions(+), 45 deletions(-) diff --git a/indexer/packages/postgres/src/stores/candle-table.ts b/indexer/packages/postgres/src/stores/candle-table.ts index 5eae4ee242..af6192e387 100644 --- a/indexer/packages/postgres/src/stores/candle-table.ts +++ b/indexer/packages/postgres/src/stores/candle-table.ts @@ -1,7 +1,7 @@ -import _ from 'lodash'; import { PartialModelObject, QueryBuilder } from 'objection'; import { BUFFER_ENCODING_UTF_8, DEFAULT_POSTGRES_OPTIONS } from '../constants'; +import { knexReadReplica } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import { getUuid } from '../helpers/uuid'; @@ -174,36 +174,66 @@ export async function findLatest( export async function findCandlesMap( tickers: string[], - resolutions: CandleResolution[], - options: Options = DEFAULT_POSTGRES_OPTIONS, ): Promise { + if (tickers.length === 0) { + return {}; + } + const candlesMap: CandlesMap = {}; + for (const ticker of tickers) { + candlesMap[ticker] = {}; + } - await Promise.all( - _.map( - tickers, - async (ticker: string) => { - candlesMap[ticker] = {}; - const findLatestCandles: Promise[] = resolutions.map( - (resolution: CandleResolution) => findLatest( - ticker, - resolution, - options, - ), - ); - - // Map each resolution to its respective candle - const allLatestCandles: (CandleFromDatabase | undefined)[] = await Promise.all( - findLatestCandles, - ); - _.forEach(allLatestCandles, (candle: CandleFromDatabase | undefined) => { - if (candle !== undefined) { - candlesMap[ticker][candle.resolution] = candle; - } - }); - }, - ), - ); + const minuteCandlesResult: { + rows: CandleFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT DISTINCT ON ( + ticker, + resolution + ) candles.* FROM + candles + WHERE + "ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND + "startedAt" > NOW() - INTERVAL '3 hours' AND + resolution IN ('1MIN', '5MINS', '15MINS', '30MINS', '1HOUR') + ORDER BY + ticker, + resolution, + "startedAt" DESC; + `, + ) as unknown as { + rows: CandleFromDatabase[], + }; + const hourDayCandlesResult: { + rows: CandleFromDatabase[], + } = await knexReadReplica.getConnection().raw( + ` + SELECT DISTINCT ON ( + ticker, + resolution + ) candles.* FROM + candles + WHERE + "ticker" IN (${tickers.map((ticker) => { return `'${ticker}'`; }).join(',')}) AND + "startedAt" > NOW() - INTERVAL '2 days' AND + resolution IN ('4HOURS', '1DAY') + ORDER BY + ticker, + resolution, + "startedAt" DESC; + `, + ) as unknown as { + rows: CandleFromDatabase[], + }; + const latestCandles: CandleFromDatabase[] = minuteCandlesResult.rows + .concat(hourDayCandlesResult.rows); + for (const candle of latestCandles) { + if (candlesMap[candle.ticker] === undefined) { + candlesMap[candle.ticker] = {}; + } + candlesMap[candle.ticker][candle.resolution] = candle; + } return candlesMap; } diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index f20b68354c..9b85e9abf2 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -37,8 +37,14 @@ import { redis, } from '@dydxprotocol-indexer/redis'; import { ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX } from '@dydxprotocol-indexer/redis/build/src/caches/orderbook-mid-prices-cache'; +import { DateTime, Settings } from 'luxon'; describe('candleHelper', () => { + const startedAt: DateTime = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + CandleResolution.ONE_MINUTE, + ); + beforeAll(async () => { await dbHelpers.migrate(); await dbHelpers.clearData(); @@ -48,6 +54,7 @@ describe('candleHelper', () => { beforeEach(async () => { await testMocks.seedData(); await perpetualMarketRefresher.updatePerpetualMarkets(); + Settings.now = () => startedAt.plus({ minutes: 30 }).valueOf(); }); afterEach(async () => { @@ -55,6 +62,7 @@ describe('candleHelper', () => { clearCandlesMap(); jest.clearAllMocks(); await redis.deleteAllAsync(redisClient); + Settings.now = () => new Date().valueOf(); }); afterAll(async () => { @@ -87,10 +95,6 @@ describe('candleHelper', () => { orderbookMidPriceClose: undefined, orderbookMidPriceOpen: undefined, }; - const startedAt: IsoString = helpers.calculateNormalizedCandleStartTime( - testConstants.createdDateTime, - CandleResolution.ONE_MINUTE, - ).toISO(); const previousStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( testConstants.createdDateTime.minus({ minutes: 1 }), CandleResolution.ONE_MINUTE, @@ -304,8 +308,8 @@ describe('candleHelper', () => { '100', // open interest false, // block contains trades { // expected candle - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: closePrice, @@ -343,8 +347,8 @@ describe('candleHelper', () => { true, // block contains trades { // expected candle ...defaultCandle, - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: '100', orderbookMidPriceClose: '1000', @@ -356,7 +360,7 @@ describe('candleHelper', () => { [ 'updates empty candle', // description { // initial candle - startedAt, + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: closePrice, @@ -374,8 +378,8 @@ describe('candleHelper', () => { true, // block contains trades { // expected candle ...defaultCandle, - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: existingStartingOpenInterest, orderbookMidPriceClose: null, @@ -396,7 +400,7 @@ describe('candleHelper', () => { [ 'does not update candle when there are no trades and an existing candle', // description { // initial candle - startedAt, + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: lowPrice, @@ -413,8 +417,8 @@ describe('candleHelper', () => { '100', // open interest false, // block contains trades { // expected candle - id: CandleTable.uuid(startedAt, defaultCandle.ticker, CandleResolution.ONE_MINUTE), - startedAt, + id: CandleTable.uuid(startedAt.toISO(), defaultCandle.ticker, CandleResolution.ONE_MINUTE), + startedAt: startedAt.toISO(), ticker: testConstants.defaultPerpetualMarket.ticker, resolution: CandleResolution.ONE_MINUTE, low: lowPrice, @@ -463,7 +467,7 @@ describe('candleHelper', () => { if (expectedCandle === undefined) { // Verify no candles in postgres and no kafka messages - await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt); + await verifyNoCandleInPostgres(CandleResolution.ONE_MINUTE, startedAt.toISO()); verifyNoCandlesKafkaMessages(publisher, CandleResolution.ONE_MINUTE); } else { const expectedCandles: CandleFromDatabase[] = [expectedCandle]; @@ -485,6 +489,12 @@ describe('candleHelper', () => { const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); const orderbookMidPriceClose = '7500'; const orderbookMidPriceOpen = '8000'; + // Set candle start time to be far in the past to ensure all candles are new + const startTime: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime.minus({ minutes: 100 }), + CandleResolution.ONE_MINUTE, + ).toUTC().toISO(); + await Promise.all( _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { return CandleTable.create({ diff --git a/indexer/services/ender/src/caches/candle-cache.ts b/indexer/services/ender/src/caches/candle-cache.ts index 69577ac4be..6fff23e502 100644 --- a/indexer/services/ender/src/caches/candle-cache.ts +++ b/indexer/services/ender/src/caches/candle-cache.ts @@ -23,8 +23,6 @@ export async function startCandleCache(txId?: number): Promise { candlesMap = await CandleTable.findCandlesMap( tickers, - Object.values(CandleResolution), - { txId }, ); }