Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/create job re assign msg index to event merge staging #357

3 changes: 3 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@
"crawlIbcTao": {
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
}
}
3 changes: 3 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@
"crawlIbcTao": {
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
}
}
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export const BULL_JOB_NAME = {
REINDEX_CW20_HISTORY: 'reindex:cw20-history',
CRAWL_IBC_TAO: 'crawl:ibc-tao',
CRAWL_GENESIS_IBC_TAO: 'crawl:genesis-ibc-tao',
JOB_REASSIGN_MSG_INDEX_TO_EVENT: 'job:reassign-msg-index-to-event',
};

export const SERVICE = {
Expand Down Expand Up @@ -231,6 +232,10 @@ export const SERVICE = {
key: 'ReDecodeTx',
path: 'v1.ReDecodeTx',
},
ReAssignMsgIndexToEvent: {
key: 'ReAssignMsgIndexToEvent',
path: 'v1.ReAssignMsgIndexToEvent',
},
},
DailyStatisticsService: {
key: 'DailyStatisticsService',
Expand Down
3 changes: 1 addition & 2 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class Event extends BaseModel {

tx_id!: number;

tx_msg_index: number | undefined;
tx_msg_index?: number | undefined;

type!: string;

Expand All @@ -30,7 +30,6 @@ export class Event extends BaseModel {
required: ['type'],
properties: {
tx_id: { type: 'number' },
tx_msg_index: { type: 'number' },
type: { type: 'string' },
block_height: { type: 'number' },
source: { type: 'string', enum: Object.values(this.SOURCE) },
Expand Down
10 changes: 8 additions & 2 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ export default class CrawlTxService extends BullableService {
let flattenLog: string[] = [];
let flattenEventEncoded: string[] = [];

tx?.tx_response?.logs.forEach((log: any, index: number) => {
tx?.tx_response?.logs?.forEach((log: any, index: number) => {
log.events.forEach((event: any) => {
event.attributes.forEach((attr: any) => {
if (attr.value === undefined) {
Expand Down Expand Up @@ -468,7 +468,7 @@ export default class CrawlTxService extends BullableService {
}
}

private setMsgIndexToEvent(tx: any) {
public setMsgIndexToEvent(tx: any) {
/*------
DO NOT USE CURRENTLY
MAPPING BY ORDER IN EVENT AND LOG
Expand Down Expand Up @@ -532,6 +532,12 @@ export default class CrawlTxService extends BullableService {
TESTING
MAPPING EVENT BY COUNT EACH LOG AND EVENT MUST BE SAME
-----------*/

// if this is failed tx, then no need to set index msg
if (!tx.tx_response.logs) {
this.logger.info('Failed tx, no need to set index msg');
return;
}
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = tx?.tx_response?.logs?.map(
(log: any) =>
Expand Down
195 changes: 195 additions & 0 deletions src/services/job/reassign_msg_index_to_event.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/* eslint-disable no-await-in-loop */
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { ServiceBroker } from 'moleculer';
import { fromBase64, fromUtf8 } from '@cosmjs/encoding';
import { Knex } from 'knex';
import {
Transaction,
EventAttribute,
BlockCheckpoint,
Event,
} from '../../models';
import BullableService, { QueueHandler } from '../../base/bullable.service';
import { BULL_JOB_NAME, SERVICE } from '../../common';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';
import CrawlTxService from '../crawl-tx/crawl_tx.service';

@Service({
name: SERVICE.V1.JobService.ReAssignMsgIndexToEvent.key,
version: 1,
})
export default class JobReAssignMsgIndexToEvent extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

crawlTxService: CrawlTxService = new ServiceBroker({
logger: true,
}).createService(CrawlTxService) as CrawlTxService;

@QueueHandler({
queueName: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
jobName: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
})
async reassignMsgIndexToEvent(_payload: { lastBlockCrawled: number }) {
const blockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
});
this.logger.info(
`Re assign msg index start from block ${blockCheckpoint?.height}`
);
if (blockCheckpoint?.height === _payload.lastBlockCrawled) {
return;
}

let lastBlock =
(blockCheckpoint?.height ?? 0) +
config.jobReassignMsgIndexToEvent.blocksPerCall;
if (lastBlock > _payload.lastBlockCrawled) {
lastBlock = _payload.lastBlockCrawled;
}
await knex.transaction(async (trx) => {
const listTx = await Transaction.query()
.withGraphFetched('events.[attributes]')
.modifyGraph('events', (builder) => {
builder.orderBy('id', 'asc');
})
.modifyGraph('events.[attributes]', (builder) => {
builder.orderBy('index', 'asc');
})
.orderBy('id', 'asc')
.where('height', '>=', blockCheckpoint?.height ?? 0)
.andWhere('height', '<', lastBlock)
.transacting(trx);

const eventPatches: any[] = [];
const txPatches: any[] = [];
this.logger.info(`Re assign msg index ${listTx.length} txs`);
listTx.forEach((tx: any) => {
const { eventPatchInTx, txPatchInTx } = this.generateListUpdateMsgIndex(
trx,
tx
);
eventPatches.push(...eventPatchInTx);
txPatches.push(...txPatchInTx);
});
this.logger.info('mapping done');
await Promise.all(eventPatches);
this.logger.info('update event done');
await Promise.all(txPatches);
this.logger.info('update tx done');

await BlockCheckpoint.query()
.update(
BlockCheckpoint.fromJson({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
height: lastBlock,
})
)
.where({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
});
});
}

generateListUpdateMsgIndex(
trx: Knex.Transaction,
tx: any
): { eventPatchInTx: any[]; txPatchInTx: any } {
const eventPatchInTx: any[] = [];
const txPatchInTx: any[] = [];

// get data raw in tx
const rawData = tx.data;
// set msg_index to event
rawData.tx_response.events.forEach((event: any) => {
// eslint-disable-next-line no-param-reassign
delete event.msg_index;
});
this.crawlTxService.setMsgIndexToEvent(rawData);
txPatchInTx.push(
Transaction.query()
.patch({ data: rawData })
.where('id', tx.id)
.transacting(trx)
);
const mapUpdateEvent = new Map();
tx.events.forEach((event: any, index: number) => {
const rawEvents = rawData.tx_response.events;
// check if event in raw is the same as event in db
if (rawEvents[index].type === event.type) {
const checkIndex = event.attributes.every((attr: EventAttribute) => {
const decodedKey = rawEvents[index].attributes[attr.index].key
? fromUtf8(fromBase64(rawEvents[index].attributes[attr.index].key))
: '';
const decodedValue = rawEvents[index].attributes[attr.index].value
? fromUtf8(
fromBase64(rawEvents[index].attributes[attr.index].value)
)
: '';
return attr.key === decodedKey && attr.value === decodedValue;
});
if (!checkIndex) {
throw new Error('order attribute is wrong');
} else {
const msgIndex = rawEvents[index].msg_index ?? null;
if (mapUpdateEvent.has(msgIndex)) {
mapUpdateEvent.get(msgIndex).push(event.id);
} else {
mapUpdateEvent.set(msgIndex, [event.id]);
}
}
} else {
throw new Error(`order event is wrong, ${event.id}, ${index}`);
}
});
mapUpdateEvent.forEach((value: number[], key: null | number) => {
eventPatchInTx.push(
Event.query()
.patch({
tx_msg_index: key,
})
.whereIn('id', value)
.transacting(trx)
);
});
return {
eventPatchInTx,
txPatchInTx,
};
}

async _start(): Promise<void> {
const blockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
});
if (!blockCheckpoint) {
await BlockCheckpoint.query().insert({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
height: config.crawlBlock.startBlock,
});
const crawlBlockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.CRAWL_BLOCK,
});

this.createJob(
BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
{
lastBlockCrawled: crawlBlockCheckpoint?.height ?? 0,
},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.jobReassignMsgIndexToEvent.millisecondCrawl,
},
}
);
}
return super._start();
}
}
Loading
Loading