Skip to content

Commit

Permalink
filter by tableId during sync
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Sep 22, 2023
1 parent 8ef49ae commit 0507729
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
4 changes: 4 additions & 0 deletions packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
* Optional maximum block range, if your RPC limits the amount of blocks fetched at a time.
*/
maxBlockRange?: bigint;
/**
* Optional table IDs to filter indexer state and RPC state.
*/
tableIds?: Hex[];
/**
* Optional MUD tRPC indexer URL to fetch initial state from.
*/
Expand Down
19 changes: 16 additions & 3 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ConfigToKeyPrimitives, ConfigToValuePrimitives, StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceiptNotFoundError } from "viem";
import { SetRecordOperation, StorageAdapter, SyncOptions, SyncResult, TableWithRecords } from "./common";
import { BlockLogs, SetRecordOperation, StorageAdapter, SyncOptions, SyncResult, TableWithRecords } from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
filter,
Expand All @@ -19,6 +19,7 @@ import {
combineLatest,
scan,
identity,
Observable,
} from "rxjs";
import { BlockStorageOperations, blockLogsToStorage } from "./blockLogsToStorage";
import { debug as parentDebug } from "./debug";
Expand Down Expand Up @@ -48,6 +49,7 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
publicClient,
startBlock: initialStartBlock = 0n,
maxBlockRange,
tableIds,
initialState,
indexerUrl,
}: CreateStoreSyncOptions<TConfig>): Promise<CreateStoreSyncResult<TConfig>> {
Expand All @@ -74,7 +76,7 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

const indexer = createIndexerClient({ url: indexerUrl });
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const result = await indexer.findAll.query({ chainId, address });
const result = await indexer.findAll.query({ chainId, address, tableIds });

onProgress?.({
step: SyncStep.SNAPSHOT,
Expand Down Expand Up @@ -114,6 +116,13 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
(initialState): initialState is { blockNumber: bigint; tables: TableWithRecords[] } =>
initialState != null && initialState.blockNumber != null && initialState.tables.length > 0
),
// Initial state from indexer should already be filtered by table IDs, but we should
// still attempt to filter in case initialState was passed in as an argument or the
// indexer is being silly.
map(({ blockNumber, tables }) => ({
blockNumber,
tables: tables.filter((table) => tableIds != null && tableIds.includes(table.tableId)),
})),
concatMap(async ({ blockNumber, tables }) => {
debug("hydrating from initial state to block", blockNumber);

Expand Down Expand Up @@ -179,7 +188,7 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
const blockLogs$: Observable<BlockLogs> = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
Expand All @@ -192,6 +201,10 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
maxBlockRange,
}),
mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))),
map(({ blockNumber, logs }) => ({
blockNumber,
logs: logs.filter((log) => tableIds != null && tableIds.includes(log.args.table)),
})),
share()
);

Expand Down

0 comments on commit 0507729

Please sign in to comment.