Skip to content

Commit

Permalink
Merge pull request #56 from tkiapril/feat/recover
Browse files Browse the repository at this point in the history
Defensive measures
  • Loading branch information
tkiapril authored Aug 16, 2023
2 parents ca76ebe + 34ecbcb commit 58a8b50
Show file tree
Hide file tree
Showing 11 changed files with 661 additions and 419 deletions.
24 changes: 17 additions & 7 deletions EventMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,64 @@ import { decode, encode } from "bencodex";
export type EventMessage = {
address: Uint8Array;
sigHash: Uint8Array;
abi: string;
topics: Uint8Array[];
blockTimestamp: bigint;
data: Uint8Array;
logIndex: bigint;
blockNumber: bigint;
blockHash: Uint8Array;
txHash: Uint8Array;
};

export type MarshaledEventMessage = [
EventMessage["address"],
EventMessage["sigHash"],
EventMessage["abi"],
EventMessage["topics"],
EventMessage["blockTimestamp"],
EventMessage["data"],
EventMessage["logIndex"],
EventMessage["blockNumber"],
EventMessage["blockHash"],
EventMessage["txHash"],
];

export function serializeEventMessage(msg: EventMessage): Uint8Array {
return encode(
[
msg.address,
msg.sigHash,
msg.abi,
msg.topics,
msg.blockTimestamp,
msg.data,
msg.logIndex,
msg.blockNumber,
msg.blockHash,
msg.txHash,
] satisfies MarshaledEventMessage,
);
}

export function deserializeEventMessage(data: Uint8Array): EventMessage {
export function deserializeEventMessage(msgData: Uint8Array): EventMessage {
const [
address,
sigHash,
abi,
topics,
blockTimestamp,
data,
logIndex,
blockNumber,
blockHash,
] = decode(data) as MarshaledEventMessage;
txHash,
] = decode(msgData) as MarshaledEventMessage;
return {
address,
sigHash,
abi,
topics,
blockTimestamp,
data,
logIndex,
blockNumber,
blockHash,
txHash,
} satisfies EventMessage;
}
43 changes: 18 additions & 25 deletions EventResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,30 @@ import { getAddress, toHex } from "npm:viem";

import { formatAbiItemPrototype } from "./abitype.ts";
import { decodeEventLog } from "./decodeEventLog.ts";
import type { Event, EventAbi } from "./generated/client/index.d.ts";
import { EventMessage } from "./EventMessage.ts";

export const serializeEventResponse = (event: Event & { Abi: EventAbi }) => {
export const serializeEventResponse = (evtMsg: EventMessage) => {
const { args } = decodeEventLog({
abi: [JSON.parse(event.Abi.json)],
data: toHex(event.data as unknown as Uint8Array),
topics: [toHex(event.abiHash)].concat(
event.topic1 !== null
? [toHex(event.topic1 as unknown as Uint8Array)].concat(
event.topic2 !== null
? [toHex(event.topic2 as unknown as Uint8Array)].concat(
event.topic3 !== null
? [toHex(event.topic3 as unknown as Uint8Array)]
: [],
)
: [],
)
: [],
) as [signature: `0x${string}`, ...args: `0x${string}`[]],
abi: [JSON.parse(evtMsg.abi)],
data: toHex(evtMsg.data),
topics: [...evtMsg.topics.toReversed(), evtMsg.sigHash].reduce(
(acc, x) => x != undefined ? [toHex(x), ...acc] : [],
[] as unknown as [] | [
signature: `0x${string}`,
...args: `0x${string}`[],
],
),
});

return {
timestamp: event.blockTimestamp,
blockIndex: event.blockNumber,
logIndex: event.logIndex,
blockHash: toHex(event.blockHash),
transactionHash: toHex(event.txHash),
sourceAddress: getAddress(toHex(event.sourceAddress)),
abiHash: toHex(event.abiHash),
blockIndex: evtMsg.blockNumber,
logIndex: evtMsg.logIndex,
blockHash: toHex(evtMsg.blockHash),
transactionHash: toHex(evtMsg.txHash),
sourceAddress: getAddress(toHex(evtMsg.address)),
abiHash: toHex(evtMsg.sigHash),
abiSignature: formatAbiItemPrototype(
JSON.parse(event.Abi.json),
JSON.parse(evtMsg.abi),
),
args: {
named: Object.keys(args).filter((x) =>
Expand Down
15 changes: 13 additions & 2 deletions api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,21 @@ export function api(prisma: PrismaClient) {
"bytes",
),
)),
blockTimestamp: { gte: request.after, lte: request.before },
},
include: { Abi: true },
})).map((event) => serializeEventResponse(event)));
})).map((evt) =>
serializeEventResponse({
address: evt.sourceAddress,
sigHash: evt.abiHash,
abi: evt.Abi.json,
topics: [evt.topic1, evt.topic2, evt.topic3],
data: evt.data,
logIndex: BigInt(evt.logIndex),
blockNumber: BigInt(evt.blockNumber),
blockHash: evt.blockHash,
txHash: evt.txHash,
})
));
});

router.get("/sources", (ctx) => {
Expand Down
26 changes: 26 additions & 0 deletions concurrencyUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
export type Awaitable<T> = T | PromiseLike<T>;

export function createMutex() {
const queue: bigint[] = [];
let sourcesMutex = new AbortController();
sourcesMutex.abort();
return async function doMutex<T>(fn: () => Awaitable<T>) {
const ticket = crypto.getRandomValues(new BigUint64Array(1))[0];
queue.push(ticket);
do {
await new Promise<void>((resolve) => {
sourcesMutex.signal.addEventListener("abort", () => resolve());
if (sourcesMutex.signal.aborted) resolve();
});
} while (queue[0] !== ticket);
sourcesMutex = new AbortController();
let result: T;
try {
result = await fn();
} finally {
queue.shift();
sourcesMutex.abort();
}
return result;
};
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
DATABASE_URL: postgres://postgres:V0uYcxk2dFixtPWfK4Nb@db/postgres
AMQP_BROKER_URL: amqp://message-queue-broker
CHAIN_DEFINITION_URL: https://esm.sh/@wagmi/[email protected]#goerli
BLOCK_FINALITY: safe
emitter:
depends_on:
message-queue-broker:
Expand All @@ -26,7 +27,6 @@ services:
DATABASE_URL: postgres://postgres:V0uYcxk2dFixtPWfK4Nb@db/postgres
AMQP_BROKER_URL: amqp://message-queue-broker
CHAIN_DEFINITION_URL: https://esm.sh/@wagmi/[email protected]#goerli
BLOCK_FINALITY: safe
api:
depends_on:
message-queue-broker:
Expand Down
Loading

0 comments on commit 58a8b50

Please sign in to comment.