From 23f5da64d6dfbe2b5a77b8d620fffe8159b6c300 Mon Sep 17 00:00:00 2001 From: Phan Anh Tuan Date: Tue, 29 Aug 2023 16:42:32 +0700 Subject: [PATCH] feat: init job re assign msg_index to event --- ci/config.json.ci | 4 + config.json | 4 + src/common/constant.ts | 5 + src/services/crawl-tx/crawl_tx.service.ts | 4 +- .../reassign_msg_index_to_event.service.ts | 165 ++++++++++++++++++ 5 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 src/services/job/reassign_msg_index_to_event.service.ts diff --git a/ci/config.json.ci b/ci/config.json.ci index a2dd5d87c..b5937068d 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -184,5 +184,9 @@ "key": "crawlIbcApp", "millisecondRepeatJob": 2000, "blocksPerCall": 100 + }, + "jobReassignMsgIndexToEvent": { + "millisecondCrawl": 1000, + "blocksPerCall": 100 } } diff --git a/config.json b/config.json index 731b618f4..b64430ef3 100644 --- a/config.json +++ b/config.json @@ -184,5 +184,9 @@ "key": "crawlIbcApp", "millisecondRepeatJob": 2000, "blocksPerCall": 100 + }, + "jobReassignMsgIndexToEvent": { + "millisecondCrawl": 1000, + "blocksPerCall": 100 } } diff --git a/src/common/constant.ts b/src/common/constant.ts index efd0c19be..9ce82979a 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -76,6 +76,7 @@ export const BULL_JOB_NAME = { REINDEX_CW20_CONTRACT: 'reindex:cw20-contract', REINDEX_CW20_HISTORY: 'reindex:cw20-history', CRAWL_IBC_APP: 'crawl:ibc-app', + JOB_REASSIGN_MSG_INDEX_TO_EVENT: 'job:reassign-msg-index-to-event', }; export const SERVICE = { @@ -232,6 +233,10 @@ export const SERVICE = { key: 'ReDecodeTx', path: 'v1.ReDecodeTx', }, + ReAssignMsgIndexToEvent: { + key: 'ReAssignMsgIndexToEvent', + path: 'v1.ReAssignMsgIndexToEvent', + }, }, DailyStatisticsService: { key: 'DailyStatisticsService', diff --git a/src/services/crawl-tx/crawl_tx.service.ts b/src/services/crawl-tx/crawl_tx.service.ts index 318171a1a..5f78bff18 100644 --- a/src/services/crawl-tx/crawl_tx.service.ts +++ b/src/services/crawl-tx/crawl_tx.service.ts @@ -431,7 +431,7 @@ export default class CrawlTxService extends BullableService { let flattenLog: string[] = []; let flattenEventEncoded: string[] = []; - tx?.tx_response?.logs.forEach((log: any, index: number) => { + tx?.tx_response?.logs?.forEach((log: any, index: number) => { log.events.forEach((event: any) => { event.attributes.forEach((attr: any) => { flattenLog.push(`${index}-${event.type}-${attr.key}-${attr.value}`); @@ -464,7 +464,7 @@ export default class CrawlTxService extends BullableService { } } - private setMsgIndexToEvent(tx: any) { + public setMsgIndexToEvent(tx: any) { /*------ DO NOT USE CURRENTLY MAPPING BY ORDER IN EVENT AND LOG diff --git a/src/services/job/reassign_msg_index_to_event.service.ts b/src/services/job/reassign_msg_index_to_event.service.ts new file mode 100644 index 000000000..9c76125dc --- /dev/null +++ b/src/services/job/reassign_msg_index_to_event.service.ts @@ -0,0 +1,165 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import { fromBase64, fromUtf8 } from '@cosmjs/encoding'; +import { Transaction, EventAttribute, BlockCheckpoint } from '../../models'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { BULL_JOB_NAME, SERVICE } from '../../common'; +import config from '../../../config.json' assert { type: 'json' }; +import knex from '../../common/utils/db_connection'; +import CrawlTxService from '../crawl-tx/crawl_tx.service'; + +@Service({ + name: SERVICE.V1.JobService.ReAssignMsgIndexToEvent.key, + version: 1, +}) +export default class JobReAssignMsgIndexToEvent extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + crawlTxService: CrawlTxService = new ServiceBroker({ + logger: true, + }).createService(CrawlTxService) as CrawlTxService; + + @QueueHandler({ + queueName: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + jobName: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + }) + async reassignMsgIndexToEvent(_payload: { lastBlockCrawled: number }) { + const blockCheckpoint = await BlockCheckpoint.query().findOne({ + job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + }); + this.logger.info( + `Re assign msg index start from block ${blockCheckpoint?.height}` + ); + if (blockCheckpoint?.height === _payload.lastBlockCrawled) { + return; + } + + let lastBlock = + (blockCheckpoint?.height ?? 0) + + config.jobReassignMsgIndexToEvent.blocksPerCall; + if (lastBlock > _payload.lastBlockCrawled) { + lastBlock = _payload.lastBlockCrawled; + } + await knex.transaction(async (trx) => { + const listTx = await Transaction.query() + .withGraphFetched('events.[attributes]') + .orderBy('id', 'asc') + .where('height', '>=', blockCheckpoint?.height ?? 0) + .andWhere('height', '<', lastBlock) + .transacting(trx); + + const eventPatches: any[] = []; + const txPatches: any[] = []; + // eslint-disable-next-line no-restricted-syntax + for (const tx of listTx) { + // get data raw in tx + const rawData = tx.data; + // set msg_index to event + rawData.tx_response.events.forEach((event: any) => { + // eslint-disable-next-line no-param-reassign + delete event.msg_index; + }); + this.crawlTxService.setMsgIndexToEvent(rawData); + txPatches.push( + Transaction.query() + .patch({ data: rawData }) + .where('id', tx.id) + .transacting(trx) + ); + + const events = tx.events.sort((a: any, b: any) => a.id - b.id); + for (let index = 0; index < events.length; index += 1) { + const event = events[index]; + // events.forEach((event: Event, index: number) => { + const rawEvents = rawData.tx_response.events; + // check if event in raw is the same as event in db + if (rawEvents[index].type === event.type) { + const attributes = event.attributes.sort( + (a: any, b: any) => a.index - b.index + ); + const checkIndex = attributes.every((attr: EventAttribute) => { + const decodedKey = rawEvents[index].attributes[attr.index].key + ? fromUtf8( + fromBase64(rawEvents[index].attributes[attr.index].key) + ) + : null; + const decodedValue = rawEvents[index].attributes[attr.index].value + ? fromUtf8( + fromBase64(rawEvents[index].attributes[attr.index].value) + ) + : null; + return attr.key === decodedKey && attr.value === decodedValue; + }); + if (!checkIndex) { + throw new Error('order attribute is wrong'); + } else { + eventPatches.push( + knex.raw( + 'UPDATE EVENT SET tx_msg_index = :tx_msg_index WHERE id = :id', + { + tx_msg_index: rawEvents[index].msg_index ?? null, + id: event.id, + } + ) + ); + } + } else { + throw new Error(`order event is wrong, ${event.id}, ${index}`); + } + } + } + this.logger.info('mapping done'); + await Promise.all(eventPatches); + this.logger.info('update event done'); + await Promise.all(txPatches); + this.logger.info('update tx done'); + + await BlockCheckpoint.query() + .update( + BlockCheckpoint.fromJson({ + job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + height: lastBlock, + }) + ) + .where({ + job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + }); + }); + } + + async _start(): Promise { + const blockCheckpoint = await BlockCheckpoint.query().findOne({ + job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + }); + if (!blockCheckpoint) { + await BlockCheckpoint.query().insert({ + job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + height: config.crawlBlock.startBlock, + }); + const crawlBlockCheckpoint = await BlockCheckpoint.query().findOne({ + job_name: BULL_JOB_NAME.CRAWL_BLOCK, + }); + + this.createJob( + BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT, + { + lastBlockCrawled: crawlBlockCheckpoint?.height ?? 0, + }, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.jobReassignMsgIndexToEvent.millisecondCrawl, + }, + } + ); + } + return super._start(); + } +}