Skip to content

Commit

Permalink
Merge branch 'staging'
Browse files Browse the repository at this point in the history
  • Loading branch information
KONFeature committed Nov 20, 2024
2 parents 1ebc3d7 + fe53994 commit c0f7fcf
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 20 deletions.
1 change: 1 addition & 0 deletions infra/components/ServiceTargets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export class ServiceTargets extends Component {
timeout: toSeconds(health.timeout),
matcher: health.successCodes,
},
deregistrationDelay: 20,
},
{ parent: this }
);
Expand Down
65 changes: 45 additions & 20 deletions packages/ponder/config/configBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ import {
productRegistryAbi,
} from "../abis/registryAbis";

type GetLogsRpcType = Extract<
PublicRpcSchema[number],
{ Method: "eth_getLogs" }
>;

type GetBlockByNumber = Extract<
PublicRpcSchema[number],
{ Method: "eth_getBlockByNumber" }
>;

/**
* Custom transport with some failsafe options specific for envio upstream
* @param initialTransport
Expand All @@ -48,34 +58,49 @@ function safeClient(initialTransport: Transport): Transport {
if (
body.method === "eth_getLogs" &&
Array.isArray(body.params) &&
body.params?.[0]?.blockHash
(body.params as GetLogsRpcType["Parameters"])?.[0]
?.blockHash
) {
const requestedBlockHash = body.params?.[0]?.blockHash;
const requestedBlockHash = (
body.params as GetLogsRpcType["Parameters"]
)?.[0]?.blockHash;
if (!requestedBlockHash) {
throw new Error("Missing blockHash parameter");
}

// Perform the request
const response = (await transport.request(body)) as Extract<
PublicRpcSchema[number],
{ Method: "eth_getLogs" }
>["ReturnType"];
const response = (await transport.request(
body
)) as GetLogsRpcType["ReturnType"];

// Filter out logs with a different blockHash
// envio can leak parent / child block logs in the response
return response.filter((log) => {
if (log.blockHash !== requestedBlockHash) {
console.log(
"Filtering out log with different blockHash",
{
requestedBlockHash,
logBlockHash: log.blockHash,
}
);
return false;
}
return true;
});
const filteredResponse = response.filter(
(log) => log.blockHash === requestedBlockHash
);
if (filteredResponse.length !== response.length) {
console.log(
`Filtered out ${
response.length - filteredResponse.length
} logs cause of mismatching blockHash, requested: ${requestedBlockHash}`
);
}
return filteredResponse;
}

// If that's an eth_getBlockByNumber request, with latest block, do some manual thread lock, to ensure this block is well synchronised across different rpcs
if (
body.method === "eth_getBlockByNumber" &&
Array.isArray(body.params) &&
(body.params as GetBlockByNumber["Parameters"])?.[0] ===
"latest"
) {
// Perform the request
const response = await transport.request(body);
// Lock the thread for 3s
await new Promise((resolve) => setTimeout(resolve, 3_000));
// Return the response
return response;
}

// Otherwise, simple request
Expand Down Expand Up @@ -150,7 +175,7 @@ export function createEnvConfig<NetworkKey extends string>({
[networkKey]: {
chainId: network.chainId,
transport: getTransport(network.chainId),
pollingInterval: 5_000,
pollingInterval: 30_000,
},
},
// contracts config
Expand Down

0 comments on commit c0f7fcf

Please sign in to comment.