Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/update msg index event merge staging #355

Merged
merged 10 commits into from
Sep 6, 2023
269 changes: 228 additions & 41 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,58 +333,245 @@ export default class CrawlTxService extends BullableService {
this.logger.debug('result insert tx', resultInsertGraph);
}

private setMsgIndexToEvent(tx: any) {
const mapEventMsgIdx: Map<string, number[]> = new Map();
public mappingFlatEventToLog(
eventHasIndex: any,
eventEncodedFlat: any,
indexMsg: number
) {
const attributesInEvent = eventHasIndex.attributes;
// i, j are index of eventHasIndex and eventEncodedFlat
let i = 0;
let j = 0;
while (i < attributesInEvent.length && j < eventEncodedFlat.length) {
// find last element with same eventIndex
let lastIndexSameEvent = j;
while (
lastIndexSameEvent < eventEncodedFlat.length &&
eventEncodedFlat[lastIndexSameEvent].indexEvent ===
eventEncodedFlat[j].indexEvent
) {
lastIndexSameEvent += 1;
}

if (
attributesInEvent[i].key === eventEncodedFlat[j].key &&
attributesInEvent[i].value === eventEncodedFlat[j].value
) {
const isEventMapped = eventEncodedFlat
.slice(j, lastIndexSameEvent)
// eslint-disable-next-line @typescript-eslint/no-loop-func, no-inner-declarations
.every((item: any, index: number) => {
if (
attributesInEvent[i + index].key === item.key &&
attributesInEvent[i + index].value === item.value
) {
return true;
}
return false;
});
if (isEventMapped) {
// mark event encoded to be mapped
for (let k = j; k < lastIndexSameEvent; k += 1) {
// eslint-disable-next-line no-param-reassign
eventEncodedFlat[k].indexMapped = indexMsg;
}
i += lastIndexSameEvent - j;
}
}
j = lastIndexSameEvent;
}
}

// self check msg index by counting event
private selfCheckByAnotherWay(tx: any) {
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = [];
tx.tx_response.logs.forEach((log: any) => {
const countAttribute = log.events.reduce(
(acc: number, curr: any) => acc + curr.attributes.length,
0
);
countAttributeInEvent.push(countAttribute);
});

let reachLastEventTypeTx = false;
let countCurrentAttribute = 0;
let currentCompareEventId = 0;
for (let i = 0; i < tx.tx_response.events.length; i += 1) {
if (tx.tx_response.events[i].type === 'tx') {
reachLastEventTypeTx = true;
}
if (reachLastEventTypeTx && tx.tx_response.events[i].type !== 'tx') {
if (
countCurrentAttribute < countAttributeInEvent[currentCompareEventId]
) {
countCurrentAttribute += tx.tx_response.events[i].attributes.length;
}

// after count, check if count is equal countAttributeInEvent[currentCompareEventId] or not
if (
countCurrentAttribute === countAttributeInEvent[currentCompareEventId]
) {
// if true, count success, then next currentCompareEventId and reset count = 0
currentCompareEventId += 1;
countCurrentAttribute = 0;
} else if (
countCurrentAttribute > countAttributeInEvent[currentCompareEventId]
) {
this.logger.warn('Count event in log is not equal event encoded');
return false;
}
}
}
return true;
}

private checkMappingEventToLog(tx: any) {
this.logger.info('checking mapping log in tx :', tx.tx_response.txhash);
let flattenLog: string[] = [];
let flattenEventEncoded: string[] = [];

// set index_msg from log to mapEventMsgIdx
tx.tx_response.logs?.forEach((log: any, index: number) => {
tx?.tx_response?.logs.forEach((log: any, index: number) => {
log.events.forEach((event: any) => {
const { type } = event;
event.attributes.forEach((attribute: any) => {
const keyInMap = `${type}_${attribute.key}_${attribute.value}`;
if (mapEventMsgIdx.has(keyInMap)) {
const listIndex = mapEventMsgIdx.get(keyInMap);
listIndex?.push(index);
event.attributes.forEach((attr: any) => {
if (attr.value === undefined) {
flattenLog.push(`${index}-${event.type}-${attr.key}-null`);
} else {
mapEventMsgIdx.set(keyInMap, [index]);
flattenLog.push(`${index}-${event.type}-${attr.key}-${attr.value}`);
}
});
});
});

// set index_msg from mapEventMsgIdx to event
tx.tx_response.events.forEach((event: any) => {
const { type } = event;
event.attributes.forEach((attribute: any) => {
const key = attribute?.key
? fromUtf8(fromBase64(attribute?.key))
: null;
const value = attribute?.value
? fromUtf8(fromBase64(attribute?.value))
: null;
const keyInMap = `${type}_${key}_${value}`;

const listIndex = mapEventMsgIdx.get(keyInMap);
// get first index with this key
const firstIndex = listIndex?.shift();

if (firstIndex != null) {
if (event.msg_index && event.msg_index !== firstIndex) {
this.logger.warn(
`something wrong: setting index ${firstIndex} to existed index ${event.msg_index}`
);
} else {
// eslint-disable-next-line no-param-reassign
event.msg_index = firstIndex;
}
}

// delete key in map if value is empty
if (listIndex?.length === 0) {
mapEventMsgIdx.delete(keyInMap);
tx?.tx_response?.events?.forEach((event: any) => {
event.attributes.forEach((attr: any) => {
if (event.msg_index !== undefined) {
const key = attr.key ? fromUtf8(fromBase64(attr.key)) : null;
const value = attr.value ? fromUtf8(fromBase64(attr.value)) : null;
flattenEventEncoded.push(
`${event.msg_index}-${event.type}-${key}-${value}`
);
}
});
});
// compare 2 array
if (flattenLog.length !== flattenEventEncoded.length) {
this.logger.warn('Length between 2 flatten array is not equal');
}
flattenLog = flattenLog.sort();
flattenEventEncoded = flattenEventEncoded.sort();
const checkResult = flattenLog.every(
(item: string, index: number) => item === flattenEventEncoded[index]
);
if (checkResult === false) {
this.logger.warn('Mapping event to log is wrong');
}
}

private setMsgIndexToEvent(tx: any) {
/*------
DO NOT USE CURRENTLY
MAPPING BY ORDER IN EVENT AND LOG
THIS CASE BASED ON ORDER NOT CHANGED BETWEEN EVENT AND LOG
--------*/
// // flatten event and decode key value
// const eventEncodedFlats: any[] = [];
// tx.tx_response.events.forEach((event: any, index: number) => {
// event.attributes.forEach((attr: any) => {
// eventEncodedFlats.push({
// ...{
// key: attr.key ? fromUtf8(fromBase64(attr.key)) : null,
// value: attr.value ? fromUtf8(fromBase64(attr.value)) : null,
// },
// indexEvent: index,
// type: event.type,
// });
// });
// });
// // loop logs (has order for each messages)
// tx.tx_response.logs.forEach((log: any, index: number) => {
// // loop each event in log to compare with event encoded
// log.events.forEach((eventInLog: any) => {
// // filter list event has type equal eventInLog.type and not be setted index msg
// const filtedEventByTypeFlat = eventEncodedFlats.filter(
// (eventEncoded: any) =>
// eventEncoded.type === eventInLog.type &&
// eventEncoded.msg_index === undefined
// );
// // mapping between log and event
// this.mappingFlatEventToLog(eventInLog, filtedEventByTypeFlat, index);
// });
// });
// // set index msg to event encoded
// eventEncodedFlats
// .filter((item: any) => item.indexMapped !== undefined)
// .forEach((item: any) => {
// if (
// tx.tx_response.events[item.indexEvent].msg_index !== undefined &&
// tx.tx_response.events[item.indexEvent].msg_index !== item.indexMapped
// ) {
// this.logger.warn(
// `something wrong: setting index ${
// item.indexMapped
// } to existed index ${
// tx.tx_response.eventstx.tx_response.events[item.indexEvent]
// .msg_index
// }`
// );
// }
// // eslint-disable-next-line no-param-reassign
// tx.tx_response.events[item.indexEvent].msg_index = item.indexMapped;
// });
// // self check msg index by counting event
// const selfCheck = this.selfCheckByAnotherWay(tx);
// if (!selfCheck) {
// this.logger.warn('selfcheck fail');
// }

/*---------
TESTING
MAPPING EVENT BY COUNT EACH LOG AND EVENT MUST BE SAME
-----------*/
// count total attribute for each message, countAttributeInEvent[i] = x mean message i has x attributes
const countAttributeInEvent: number[] = tx?.tx_response?.logs?.map(
(log: any) =>
log.events.reduce(
(acc: number, curr: any) => acc + curr.attributes.length,
0
)
);

let reachLastEventTypeTx = false;
let countCurrentAttribute = 0;
let currentCompareEventId = 0;
for (let i = 0; i < tx?.tx_response?.events?.length; i += 1) {
if (tx.tx_response.events[i].type === 'tx') {
reachLastEventTypeTx = true;
}
if (reachLastEventTypeTx && tx.tx_response.events[i].type !== 'tx') {
if (
countCurrentAttribute < countAttributeInEvent[currentCompareEventId]
) {
countCurrentAttribute += tx.tx_response.events[i].attributes.length;
// eslint-disable-next-line no-param-reassign
tx.tx_response.events[i].msg_index = currentCompareEventId;
}

// after count, check if count is equal countAttributeInEvent[currentCompareEventId] or not
if (
countCurrentAttribute === countAttributeInEvent[currentCompareEventId]
) {
// if true, count success, then next currentCompareEventId and reset count = 0
currentCompareEventId += 1;
countCurrentAttribute = 0;
} else if (
countCurrentAttribute > countAttributeInEvent[currentCompareEventId]
) {
this.logger.warn('Count event in log is not equal event encoded');
}
}
}
this.checkMappingEventToLog(tx);
}

private _findAttribute(
Expand Down
Loading