Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:aura-nw/horoscope-v2 into fix/ap…
Browse files Browse the repository at this point in the history
…prove-cw721-activity-dev
  • Loading branch information
phamphong9981 committed Sep 5, 2023
2 parents cbcab6b + 512866f commit ef4a4ba
Show file tree
Hide file tree
Showing 7 changed files with 1,107 additions and 4 deletions.
4 changes: 4 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,9 @@
"key": "crawlIbcApp",
"millisecondRepeatJob": 2000,
"blocksPerCall": 100
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
}
}
4 changes: 4 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,9 @@
"key": "crawlIbcApp",
"millisecondRepeatJob": 2000,
"blocksPerCall": 100
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
}
}
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export const BULL_JOB_NAME = {
CRAWL_IBC_APP: 'crawl:ibc-app',
UPDATE_CW721_ACTIVITY_OWNER: 'update:cw721-activity-owner',
UPDATE_CW721_ACTIVITY_OWNER_BY_TOKEN: 'update:cw721-activity-owner-by-token',
JOB_REASSIGN_MSG_INDEX_TO_EVENT: 'job:reassign-msg-index-to-event',
};

export const SERVICE = {
Expand Down Expand Up @@ -234,6 +235,10 @@ export const SERVICE = {
key: 'ReDecodeTx',
path: 'v1.ReDecodeTx',
},
ReAssignMsgIndexToEvent: {
key: 'ReAssignMsgIndexToEvent',
path: 'v1.ReAssignMsgIndexToEvent',
},
},
CrawlIBCTaoService: {
key: 'CrawlIBCTaoService',
Expand Down
3 changes: 1 addition & 2 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class Event extends BaseModel {

tx_id!: number;

tx_msg_index: number | undefined;
tx_msg_index?: number | undefined;

type!: string;

Expand All @@ -32,7 +32,6 @@ export class Event extends BaseModel {
required: ['type'],
properties: {
tx_id: { type: 'number' },
tx_msg_index: { type: 'number' },
type: { type: 'string' },
block_height: { type: 'number' },
source: { type: 'string', enum: Object.values(this.SOURCE) },
Expand Down
4 changes: 2 additions & 2 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ export default class CrawlTxService extends BullableService {
let flattenLog: string[] = [];
let flattenEventEncoded: string[] = [];

tx?.tx_response?.logs.forEach((log: any, index: number) => {
tx?.tx_response?.logs?.forEach((log: any, index: number) => {
log.events.forEach((event: any) => {
event.attributes.forEach((attr: any) => {
flattenLog.push(`${index}-${event.type}-${attr.key}-${attr.value}`);
Expand Down Expand Up @@ -464,7 +464,7 @@ export default class CrawlTxService extends BullableService {
}
}

private setMsgIndexToEvent(tx: any) {
public setMsgIndexToEvent(tx: any) {
/*------
DO NOT USE CURRENTLY
MAPPING BY ORDER IN EVENT AND LOG
Expand Down
195 changes: 195 additions & 0 deletions src/services/job/reassign_msg_index_to_event.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/* eslint-disable no-await-in-loop */
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { ServiceBroker } from 'moleculer';
import { fromBase64, fromUtf8 } from '@cosmjs/encoding';
import { Knex } from 'knex';
import {
Transaction,
EventAttribute,
BlockCheckpoint,
Event,
} from '../../models';
import BullableService, { QueueHandler } from '../../base/bullable.service';
import { BULL_JOB_NAME, SERVICE } from '../../common';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';
import CrawlTxService from '../crawl-tx/crawl_tx.service';

@Service({
name: SERVICE.V1.JobService.ReAssignMsgIndexToEvent.key,
version: 1,
})
export default class JobReAssignMsgIndexToEvent extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

crawlTxService: CrawlTxService = new ServiceBroker({
logger: true,
}).createService(CrawlTxService) as CrawlTxService;

@QueueHandler({
queueName: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
jobName: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
})
async reassignMsgIndexToEvent(_payload: { lastBlockCrawled: number }) {
const blockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
});
this.logger.info(
`Re assign msg index start from block ${blockCheckpoint?.height}`
);
if (blockCheckpoint?.height === _payload.lastBlockCrawled) {
return;
}

let lastBlock =
(blockCheckpoint?.height ?? 0) +
config.jobReassignMsgIndexToEvent.blocksPerCall;
if (lastBlock > _payload.lastBlockCrawled) {
lastBlock = _payload.lastBlockCrawled;
}
await knex.transaction(async (trx) => {
const listTx = await Transaction.query()
.withGraphFetched('events.[attributes]')
.modifyGraph('events', (builder) => {
builder.orderBy('id', 'asc');
})
.modifyGraph('events.[attributes]', (builder) => {
builder.orderBy('index', 'asc');
})
.orderBy('id', 'asc')
.where('height', '>=', blockCheckpoint?.height ?? 0)
.andWhere('height', '<', lastBlock)
.transacting(trx);

const eventPatches: any[] = [];
const txPatches: any[] = [];
this.logger.info(`Re assign msg index ${listTx.length} txs`);
listTx.forEach((tx: any) => {
const { eventPatchInTx, txPatchInTx } = this.generateListUpdateMsgIndex(
trx,
tx
);
eventPatches.push(...eventPatchInTx);
txPatches.push(...txPatchInTx);
});
this.logger.info('mapping done');
await Promise.all(eventPatches);
this.logger.info('update event done');
await Promise.all(txPatches);
this.logger.info('update tx done');

await BlockCheckpoint.query()
.update(
BlockCheckpoint.fromJson({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
height: lastBlock,
})
)
.where({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
});
});
}

generateListUpdateMsgIndex(
trx: Knex.Transaction,
tx: any
): { eventPatchInTx: any[]; txPatchInTx: any } {
const eventPatchInTx: any[] = [];
const txPatchInTx: any[] = [];

// get data raw in tx
const rawData = tx.data;
// set msg_index to event
rawData.tx_response.events.forEach((event: any) => {
// eslint-disable-next-line no-param-reassign
delete event.msg_index;
});
this.crawlTxService.setMsgIndexToEvent(rawData);
txPatchInTx.push(
Transaction.query()
.patch({ data: rawData })
.where('id', tx.id)
.transacting(trx)
);
const mapUpdateEvent = new Map();
tx.events.forEach((event: any, index: number) => {
const rawEvents = rawData.tx_response.events;
// check if event in raw is the same as event in db
if (rawEvents[index].type === event.type) {
const checkIndex = event.attributes.every((attr: EventAttribute) => {
const decodedKey = rawEvents[index].attributes[attr.index].key
? fromUtf8(fromBase64(rawEvents[index].attributes[attr.index].key))
: null;
const decodedValue = rawEvents[index].attributes[attr.index].value
? fromUtf8(
fromBase64(rawEvents[index].attributes[attr.index].value)
)
: null;
return attr.key === decodedKey && attr.value === decodedValue;
});
if (!checkIndex) {
throw new Error('order attribute is wrong');
} else {
const msgIndex = rawEvents[index].msg_index ?? null;
if (mapUpdateEvent.has(msgIndex)) {
mapUpdateEvent.get(msgIndex).push(event.id);
} else {
mapUpdateEvent.set(msgIndex, [event.id]);
}
}
} else {
throw new Error(`order event is wrong, ${event.id}, ${index}`);
}
});
mapUpdateEvent.forEach((value: number[], key: null | number) => {
eventPatchInTx.push(
Event.query()
.patch({
tx_msg_index: key,
})
.whereIn('id', value)
.transacting(trx)
);
});
return {
eventPatchInTx,
txPatchInTx,
};
}

async _start(): Promise<void> {
const blockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
});
if (!blockCheckpoint) {
await BlockCheckpoint.query().insert({
job_name: BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
height: config.crawlBlock.startBlock,
});
const crawlBlockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.CRAWL_BLOCK,
});

this.createJob(
BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
BULL_JOB_NAME.JOB_REASSIGN_MSG_INDEX_TO_EVENT,
{
lastBlockCrawled: crawlBlockCheckpoint?.height ?? 0,
},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.jobReassignMsgIndexToEvent.millisecondCrawl,
},
}
);
}
return super._start();
}
}
Loading

0 comments on commit ef4a4ba

Please sign in to comment.