Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update msg_index by order in event and log #327

Merged
merged 19 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@
"jobRedecodeTx": {
"limitRecordGet": 100
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
},
"crawlIbcTao": {
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
Expand Down
4 changes: 4 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@
"jobRedecodeTx": {
"limitRecordGet": 100
},
"jobReassignMsgIndexToEvent": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
},
"crawlIbcTao": {
"key": "crawlIbcTao",
"millisecondRepeatJob": 2000,
Expand Down
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export const BULL_JOB_NAME = {
REINDEX_CW721_HISTORY: 'reindex:cw721-history',
HANDLE_MIGRATE_CONTRACT: 'handle:migrate-contract',
JOB_REDECODE_TX: 'job:redecode-tx',
JOB_REASSIGN_MSG_INDEX_TO_EVENT: 'job:reassign-msg-index-to-event',
CRAWL_IBC_TAO: 'crawl:ibc-tao',
CRAWL_GENESIS_IBC_TAO: 'crawl:genesis-ibc-tao',
CRAWL_IBC_APP: 'crawl:ibc-app',
Expand Down Expand Up @@ -232,6 +233,10 @@ export const SERVICE = {
key: 'ReDecodeTx',
path: 'v1.ReDecodeTx',
},
ReAssignMsgIndexToEvent: {
key: 'ReAssignMsgIndexToEvent',
path: 'v1.ReAssignMsgIndexToEvent',
},
},
CrawlIBCTaoService: {
key: 'CrawlIBCTaoService',
Expand Down
3 changes: 1 addition & 2 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class Event extends BaseModel {

tx_id!: number;

tx_msg_index: number | undefined;
tx_msg_index?: number | undefined;

type!: string;

Expand All @@ -32,7 +32,6 @@ export class Event extends BaseModel {
required: ['type'],
properties: {
tx_id: { type: 'number' },
tx_msg_index: { type: 'number' },
type: { type: 'string' },
block_height: { type: 'number' },
source: { type: 'string', enum: Object.values(this.SOURCE) },
Expand Down
275 changes: 234 additions & 41 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,58 +333,251 @@ export default class CrawlTxService extends BullableService {
this.logger.debug('result insert tx', resultInsertGraph);
}

private setMsgIndexToEvent(tx: any) {
const mapEventMsgIdx: Map<string, number[]> = new Map();
public mappingFlatEventToLog(
eventHasIndex: any,
eventEncodedFlat: any,
indexMsg: number
) {
const attributesInEvent = eventHasIndex.attributes;
// i, j are index of eventHasIndex and eventEncodedFlat
let i = 0;
let j = 0;
while (i < attributesInEvent.length && j < eventEncodedFlat.length) {
// find last element with same eventIndex
let lastIndexSameEvent = j;
while (
lastIndexSameEvent < eventEncodedFlat.length &&
eventEncodedFlat[lastIndexSameEvent].indexEvent ===
eventEncodedFlat[j].indexEvent
) {
lastIndexSameEvent += 1;
}

if (
attributesInEvent[i].key === eventEncodedFlat[j].key &&
attributesInEvent[i].value === eventEncodedFlat[j].value
) {
const isEventMapped = eventEncodedFlat
.slice(j, lastIndexSameEvent)
// eslint-disable-next-line @typescript-eslint/no-loop-func, no-inner-declarations
.every((item: any, index: number) => {
if (
attributesInEvent[i + index].key === item.key &&
attributesInEvent[i + index].value === item.value
) {
return true;
}
return false;
});
if (isEventMapped) {
// mark event encoded to be mapped
for (let k = j; k < lastIndexSameEvent; k += 1) {
// eslint-disable-next-line no-param-reassign
eventEncodedFlat[k].indexMapped = indexMsg;
}
i += lastIndexSameEvent - j;
}
}
j = lastIndexSameEvent;
}
}

// self check msg index by counting event
private selfCheckByAnotherWay(tx: any) {
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = [];
tx.tx_response.logs.forEach((log: any) => {
const countAttribute = log.events.reduce(
(acc: number, curr: any) => acc + curr.attributes.length,
0
);
countAttributeInEvent.push(countAttribute);
});

let reachLastEventTypeTx = false;
let countCurrentAttribute = 0;
let currentCompareEventId = 0;
for (let i = 0; i < tx.tx_response.events.length; i += 1) {
if (tx.tx_response.events[i].type === 'tx') {
reachLastEventTypeTx = true;
}
if (reachLastEventTypeTx && tx.tx_response.events[i].type !== 'tx') {
if (
countCurrentAttribute < countAttributeInEvent[currentCompareEventId]
) {
countCurrentAttribute += tx.tx_response.events[i].attributes.length;
}

// after count, check if count is equal countAttributeInEvent[currentCompareEventId] or not
if (
countCurrentAttribute === countAttributeInEvent[currentCompareEventId]
) {
// if true, count success, then next currentCompareEventId and reset count = 0
currentCompareEventId += 1;
countCurrentAttribute = 0;
} else if (
countCurrentAttribute > countAttributeInEvent[currentCompareEventId]
) {
this.logger.warn('Count event in log is not equal event encoded');
return false;
}
}
}
return true;
}

private checkMappingEventToLog(tx: any) {
this.logger.info('checking mapping log in tx :', tx.tx_response.txhash);
let flattenLog: string[] = [];
let flattenEventEncoded: string[] = [];

// set index_msg from log to mapEventMsgIdx
tx.tx_response.logs?.forEach((log: any, index: number) => {
tx?.tx_response?.logs?.forEach((log: any, index: number) => {
log.events.forEach((event: any) => {
const { type } = event;
event.attributes.forEach((attribute: any) => {
const keyInMap = `${type}_${attribute.key}_${attribute.value}`;
if (mapEventMsgIdx.has(keyInMap)) {
const listIndex = mapEventMsgIdx.get(keyInMap);
listIndex?.push(index);
event.attributes.forEach((attr: any) => {
if (attr.value === undefined) {
flattenLog.push(`${index}-${event.type}-${attr.key}-null`);
} else {
mapEventMsgIdx.set(keyInMap, [index]);
flattenLog.push(`${index}-${event.type}-${attr.key}-${attr.value}`);
}
});
});
});

// set index_msg from mapEventMsgIdx to event
tx.tx_response.events.forEach((event: any) => {
const { type } = event;
event.attributes.forEach((attribute: any) => {
const key = attribute?.key
? fromUtf8(fromBase64(attribute?.key))
: null;
const value = attribute?.value
? fromUtf8(fromBase64(attribute?.value))
: null;
const keyInMap = `${type}_${key}_${value}`;

const listIndex = mapEventMsgIdx.get(keyInMap);
// get first index with this key
const firstIndex = listIndex?.shift();

if (firstIndex != null) {
if (event.msg_index && event.msg_index !== firstIndex) {
this.logger.warn(
`something wrong: setting index ${firstIndex} to existed index ${event.msg_index}`
);
} else {
// eslint-disable-next-line no-param-reassign
event.msg_index = firstIndex;
}
}

// delete key in map if value is empty
if (listIndex?.length === 0) {
mapEventMsgIdx.delete(keyInMap);
tx?.tx_response?.events?.forEach((event: any) => {
event.attributes.forEach((attr: any) => {
if (event.msg_index !== undefined) {
const key = attr.key ? fromUtf8(fromBase64(attr.key)) : null;
const value = attr.value ? fromUtf8(fromBase64(attr.value)) : null;
flattenEventEncoded.push(
`${event.msg_index}-${event.type}-${key}-${value}`
);
}
});
});
// compare 2 array
if (flattenLog.length !== flattenEventEncoded.length) {
this.logger.warn('Length between 2 flatten array is not equal');
}
flattenLog = flattenLog.sort();
flattenEventEncoded = flattenEventEncoded.sort();
const checkResult = flattenLog.every(
(item: string, index: number) => item === flattenEventEncoded[index]
);
if (checkResult === false) {
this.logger.warn('Mapping event to log is wrong');
}
}

public setMsgIndexToEvent(tx: any) {
/*------
DO NOT USE CURRENTLY
MAPPING BY ORDER IN EVENT AND LOG
THIS CASE BASED ON ORDER NOT CHANGED BETWEEN EVENT AND LOG
--------*/
// // flatten event and decode key value
// const eventEncodedFlats: any[] = [];
// tx.tx_response.events.forEach((event: any, index: number) => {
// event.attributes.forEach((attr: any) => {
// eventEncodedFlats.push({
// ...{
// key: attr.key ? fromUtf8(fromBase64(attr.key)) : null,
// value: attr.value ? fromUtf8(fromBase64(attr.value)) : null,
// },
// indexEvent: index,
// type: event.type,
// });
// });
// });
// // loop logs (has order for each messages)
// tx.tx_response.logs.forEach((log: any, index: number) => {
// // loop each event in log to compare with event encoded
// log.events.forEach((eventInLog: any) => {
// // filter list event has type equal eventInLog.type and not be setted index msg
// const filtedEventByTypeFlat = eventEncodedFlats.filter(
// (eventEncoded: any) =>
// eventEncoded.type === eventInLog.type &&
// eventEncoded.msg_index === undefined
// );
// // mapping between log and event
// this.mappingFlatEventToLog(eventInLog, filtedEventByTypeFlat, index);
// });
// });
// // set index msg to event encoded
// eventEncodedFlats
// .filter((item: any) => item.indexMapped !== undefined)
// .forEach((item: any) => {
// if (
// tx.tx_response.events[item.indexEvent].msg_index !== undefined &&
// tx.tx_response.events[item.indexEvent].msg_index !== item.indexMapped
// ) {
// this.logger.warn(
// `something wrong: setting index ${
// item.indexMapped
// } to existed index ${
// tx.tx_response.eventstx.tx_response.events[item.indexEvent]
// .msg_index
// }`
// );
// }
// // eslint-disable-next-line no-param-reassign
// tx.tx_response.events[item.indexEvent].msg_index = item.indexMapped;
// });
// // self check msg index by counting event
// const selfCheck = this.selfCheckByAnotherWay(tx);
// if (!selfCheck) {
// this.logger.warn('selfcheck fail');
// }

/*---------
TESTING
MAPPING EVENT BY COUNT EACH LOG AND EVENT MUST BE SAME
-----------*/

// if this is failed tx, then no need to set index msg
if (!tx.tx_response.logs) {
this.logger.info('Failed tx, no need to set index msg');
return;
}
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = tx?.tx_response?.logs?.map(
(log: any) =>
log.events.reduce(
(acc: number, curr: any) => acc + curr.attributes.length,
0
)
);

let reachLastEventTypeTx = false;
let countCurrentAttribute = 0;
let currentCompareEventId = 0;
for (let i = 0; i < tx?.tx_response?.events?.length; i += 1) {
if (tx.tx_response.events[i].type === 'tx') {
reachLastEventTypeTx = true;
}
if (reachLastEventTypeTx && tx.tx_response.events[i].type !== 'tx') {
if (
countCurrentAttribute < countAttributeInEvent[currentCompareEventId]
) {
countCurrentAttribute += tx.tx_response.events[i].attributes.length;
// eslint-disable-next-line no-param-reassign
tx.tx_response.events[i].msg_index = currentCompareEventId;
}

// after count, check if count is equal countAttributeInEvent[currentCompareEventId] or not
if (
countCurrentAttribute === countAttributeInEvent[currentCompareEventId]
) {
// if true, count success, then next currentCompareEventId and reset count = 0
currentCompareEventId += 1;
countCurrentAttribute = 0;
} else if (
countCurrentAttribute > countAttributeInEvent[currentCompareEventId]
) {
this.logger.warn('Count event in log is not equal event encoded');
}
}
}
this.checkMappingEventToLog(tx);
}

private _findAttribute(
Expand Down
Loading
Loading