Skip to content

Commit

Permalink
Support topics filtering in getLogs ETH RPC API (#537)
Browse files Browse the repository at this point in the history
* Store event topics in separate columns in db

* Store event data in a separate column in db

* Support topics filter in eth_getLogs RPC API

* Make RPC server path configurable

* Sort logs result by log index
  • Loading branch information
prathamesh0 authored Sep 18, 2024
1 parent d413d72 commit a585500
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 35 deletions.
38 changes: 38 additions & 0 deletions packages/codegen/src/data/entities/Event.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,44 @@ columns:
columnOptions:
- option: length
value: 256
- name: topic0
pgType: varchar
tsType: string
columnType: Column
columnOptions:
- option: length
value: 66
- name: topic1
pgType: varchar
tsType: string | null
columnType: Column
columnOptions:
- option: length
value: 66
- option: nullable
value: true
- name: topic2
pgType: varchar
tsType: string | null
columnType: Column
columnOptions:
- option: length
value: 66
- option: nullable
value: true
- name: topic3
pgType: varchar
tsType: string | null
columnType: Column
columnOptions:
- option: length
value: 66
- option: nullable
value: true
- name: data
pgType: varchar
tsType: string
columnType: Column
- name: eventInfo
pgType: text
tsType: string
Expand Down
16 changes: 9 additions & 7 deletions packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@
# Flag to specify whether RPC endpoint supports block hash as block tag parameter
rpcSupportsBlockHashParam = true

# Enable ETH JSON RPC server at /rpc
enableEthRPCServer = true

# Max number of logs that can be returned in a single getLogs request (default: 10000)
ethGetLogsResultLimit = 10000

# Server GQL config
# GQL server config
[server.gql]
path = "/graphql"

Expand All @@ -55,6 +49,14 @@
timeTravelMaxAge = 86400 # 1 day
{{/if}}

# ETH RPC server config
[server.ethRPC]
enabled = true
path = "/rpc"

# Max number of logs that can be returned in a single getLogs request (default: 10000)
getLogsResultLimit = 10000

[metrics]
host = "127.0.0.1"
port = 9000
Expand Down
19 changes: 14 additions & 5 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ export interface GQLConfig {
logDir?: string;
}

// ETH RPC server config
export interface EthRPCConfig {
// Enable ETH JSON RPC server
enabled: boolean;

// Path to expose the RPC server at
path?: string;

// Max number of logs that can be returned in a single getLogs request
getLogsResultLimit?: number;
}

export interface ServerConfig {
host: string;
port: number;
Expand All @@ -253,11 +265,8 @@ export interface ServerConfig {
// https://ethereum.org/en/developers/docs/apis/json-rpc/#default-block
rpcSupportsBlockHashParam: boolean;

// Enable ETH JSON RPC server at /rpc
enableEthRPCServer: boolean;

// Max number of logs that can be returned in a single getLogs request
ethGetLogsResultLimit?: number;
// ETH JSON RPC server config
ethRPC: EthRPCConfig;
}

export interface FundingAmountsConfig {
Expand Down
107 changes: 90 additions & 17 deletions packages/util/src/eth-rpc-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ const ERROR_CONTRACT_METHOD_NOT_FOUND = 'Contract method not found';
const ERROR_METHOD_NOT_IMPLEMENTED = 'Method not implemented';
const ERROR_INVALID_BLOCK_TAG = 'Invalid block tag';
const ERROR_INVALID_BLOCK_HASH = 'Invalid block hash';
const ERROR_INVALID_CONTRACT_ADDRESS = 'Invalid contract address';
const ERROR_INVALID_TOPICS = 'Invalid topics';
const ERROR_BLOCK_NOT_FOUND = 'Block not found';
const ERROR_TOPICS_FILTER_NOT_SUPPORTED = 'Topics filter not supported';
const ERROR_LIMIT_EXCEEDED = 'Query results exceeds limit';

const DEFAULT_BLOCK_TAG = 'latest';
Expand Down Expand Up @@ -114,20 +115,14 @@ export const createEthRPCHandlers = async (
// Parse arg params into where options
const where: FindConditions<EventInterface> = {};

// TODO: Support topics filter
if (params.topics) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_TOPICS_FILTER_NOT_SUPPORTED);
}

// Address filter, address or a list of addresses
if (params.address) {
if (Array.isArray(params.address)) {
if (params.address.length > 0) {
where.contract = In(params.address);
}
} else {
where.contract = Equal(params.address);
}
buildAddressFilter(params.address, where);
}

// Topics filter
if (params.topics) {
buildTopicsFilter(params.topics, where);
}

// Block hash takes precedence over fromBlock / toBlock if provided
Expand Down Expand Up @@ -158,8 +153,14 @@ export const createEthRPCHandlers = async (

// Fetch events from the db
// Load block relation
const resultLimit = indexer.serverConfig.ethGetLogsResultLimit || DEFAULT_ETH_GET_LOGS_RESULT_LIMIT;
const events = await indexer.getEvents({ where, relations: ['block'], take: resultLimit + 1 });
const resultLimit = indexer.serverConfig.ethRPC.getLogsResultLimit || DEFAULT_ETH_GET_LOGS_RESULT_LIMIT;
const events = await indexer.getEvents({
where,
relations: ['block'],
// TODO: Use querybuilder to order by block number
order: { block: 'ASC', index: 'ASC' },
take: resultLimit + 1
});

// Limit number of results can be returned by a single query
if (events.length > resultLimit) {
Expand Down Expand Up @@ -229,19 +230,91 @@ const parseEthGetLogsBlockTag = async (indexer: IndexerInterface, blockTag: stri
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_INVALID_BLOCK_TAG);
};

const buildAddressFilter = (address: any, where: FindConditions<EventInterface>): void => {
if (Array.isArray(address)) {
// Validate input addresses
address.forEach((add: string) => {
if (!utils.isHexString(add, 20)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_CONTRACT_ADDRESS}: expected hex string of size 20`);
}
});

if (address.length > 0) {
where.contract = In(address);
}
} else {
// Validate input address
if (!utils.isHexString(address, 20)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_CONTRACT_ADDRESS}: expected hex string of size 20`);
}

where.contract = Equal(address);
}
};

type TopicColumn = 'topic0' | 'topic1' | 'topic2' | 'topic3';

const buildTopicsFilter = (topics: any, where: FindConditions<EventInterface>): void => {
// Check that topics is an array of size <= 4
if (!Array.isArray(topics)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_INVALID_TOPICS);
}

if (topics.length > 4) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_TOPICS}: exceeds max topics`);
}

for (let i = 0; i < topics.length; i++) {
addTopicCondition(topics[i], `topic${i}` as TopicColumn, where);
}
};

const addTopicCondition = (
topicFilter: string[] | string,
topicIndex: TopicColumn,
where: FindConditions<EventInterface>
): any => {
if (Array.isArray(topicFilter)) {
// Validate input topics
topicFilter.forEach((topic: string) => {
if (!utils.isHexString(topic, 32)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_TOPICS}: expected hex string of size 32 for ${topicIndex}`);
}
});

if (topicFilter.length > 0) {
where[topicIndex] = In(topicFilter);
}
} else {
// Validate input address
if (!utils.isHexString(topicFilter, 32)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_TOPICS}: expected hex string of size 32 for ${topicIndex}`);
}

where[topicIndex] = Equal(topicFilter);
}
};

const transformEventsToLogs = async (events: Array<EventInterface>): Promise<any[]> => {
return events.map(event => {
const parsedExtraInfo = JSON.parse(event.extraInfo);

const topics: string[] = [];
[event.topic0, event.topic1, event.topic2, event.topic3].forEach(topic => {
if (topic) {
topics.push(topic);
}
});

return {
address: event.contract.toLowerCase(),
blockHash: event.block.blockHash,
blockNumber: `0x${event.block.blockNumber.toString(16)}`,
transactionHash: event.txHash,
transactionIndex: `0x${parsedExtraInfo.tx.index.toString(16)}`,
logIndex: `0x${parsedExtraInfo.logIndex.toString(16)}`,
data: parsedExtraInfo.data,
topics: parsedExtraInfo.topics,
data: event.data,
topics,
removed: event.block.isPruned
};
});
Expand Down
9 changes: 8 additions & 1 deletion packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ export class Indexer {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx, logIndex };
const extraInfo: { [key: string]: any } = { tx, logIndex };

const [topic0, topic1, topic2, topic3] = topics as string[];

const contract = ethers.utils.getAddress(address);
const watchedContracts = this.isContractAddressWatched(contract);
Expand All @@ -694,6 +696,11 @@ export class Indexer {
txHash,
contract,
eventName,
topic0,
topic1,
topic2,
topic3,
data,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
Expand Down
12 changes: 7 additions & 5 deletions packages/util/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { PaymentsManager, paymentsPlugin } from './payments';
const log = debug('vulcanize:server');

const DEFAULT_GQL_PATH = '/graphql';
const ETH_RPC_PATH = '/rpc';
const DEFAULT_ETH_RPC_PATH = '/rpc';

export const createAndStartServer = async (
app: Application,
Expand Down Expand Up @@ -102,13 +102,15 @@ export const createAndStartServer = async (
path: gqlPath
});

if (serverConfig.enableEthRPCServer) {
const rpcPath = serverConfig.ethRPC?.path ?? DEFAULT_ETH_RPC_PATH;

if (serverConfig.ethRPC?.enabled) {
// Create a JSON-RPC server to handle ETH RPC calls
const rpcServer = jayson.Server(ethRPCHandlers);

// Mount the JSON-RPC server to ETH_RPC_PATH
app.use(
ETH_RPC_PATH,
rpcPath,
jsonParser(),
(req: any, res: any, next: () => void) => {
// Convert all GET requests to POST to avoid getting rejected from jayson server middleware
Expand All @@ -124,8 +126,8 @@ export const createAndStartServer = async (
httpServer.listen(port, host, () => {
log(`GQL server is listening on http://${host}:${port}${server.graphqlPath}`);

if (serverConfig.enableEthRPCServer) {
log(`ETH JSON RPC server is listening on http://${host}:${port}${ETH_RPC_PATH}`);
if (serverConfig.ethRPC?.enabled) {
log(`ETH JSON RPC server is listening on http://${host}:${port}${rpcPath}`);
}
});

Expand Down
5 changes: 5 additions & 0 deletions packages/util/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ export interface EventInterface {
index: number;
contract: string;
eventName: string;
topic0: string;
topic1: string | null;
topic2: string | null;
topic3: string | null;
data: string;
eventInfo: string;
extraInfo: string;
proof: string;
Expand Down

0 comments on commit a585500

Please sign in to comment.