From b3095ad62fa6ba488c32b79274736882c666d61a Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 10 Dec 2024 17:42:21 -0300 Subject: [PATCH 1/8] feat: db strategy repository, registry, cached version --- apps/processing/src/config/env.ts | 1 + apps/processing/src/index.ts | 2 +- .../src/services/processing.service.ts | 10 ++- .../services/sharedDependencies.service.ts | 24 ++++-- .../test/unit/processing.service.spec.ts | 4 +- .../unit/sharedDependencies.service.spec.ts | 4 +- .../interfaces/strategyRegistry.interface.ts | 30 +++++-- packages/data-flow/src/internal.ts | 2 +- packages/data-flow/src/orchestrator.ts | 31 +++++--- .../src/registries/cachedStrategyRegistry.ts | 78 +++++++++++++++++++ .../src/registries/dbStrategyRegistry.ts | 44 +++++++++++ packages/data-flow/src/registries/index.ts | 3 + .../src/registries/strategyRegistry.ts | 58 ++++++++++++++ packages/data-flow/src/strategyRegistry.ts | 25 ------ .../data-flow/test/unit/orchestrator.spec.ts | 29 ++++++- .../test/unit/strategyRegistry.spec.ts | 41 +++++++--- packages/repository/src/db/connection.ts | 2 + packages/repository/src/external.ts | 4 + packages/repository/src/interfaces/index.ts | 1 + .../strategyRepository.interface.ts | 12 +++ .../src/repositories/kysely/index.ts | 1 + .../kysely/strategy.repository.ts | 48 ++++++++++++ packages/repository/src/types/index.ts | 1 + .../repository/src/types/strategy.types.ts | 10 +++ 24 files changed, 391 insertions(+), 74 deletions(-) create mode 100644 packages/data-flow/src/registries/cachedStrategyRegistry.ts create mode 100644 packages/data-flow/src/registries/dbStrategyRegistry.ts create mode 100644 packages/data-flow/src/registries/index.ts create mode 100644 packages/data-flow/src/registries/strategyRegistry.ts delete mode 100644 packages/data-flow/src/strategyRegistry.ts create mode 100644 packages/repository/src/interfaces/strategyRepository.interface.ts create mode 100644 packages/repository/src/repositories/kysely/strategy.repository.ts create mode 100644 packages/repository/src/types/strategy.types.ts diff --git a/apps/processing/src/config/env.ts b/apps/processing/src/config/env.ts index 4a201e2..543ec86 100644 --- a/apps/processing/src/config/env.ts +++ b/apps/processing/src/config/env.ts @@ -30,6 +30,7 @@ const baseSchema = z.object({ }, "Chain IDs must be unique"), DATABASE_URL: z.string(), DATABASE_SCHEMA: z.string().default("public"), + DATABASE_REGISTRIES_SCHEMA: z.string().default("public"), INDEXER_GRAPHQL_URL: z.string().url(), INDEXER_ADMIN_SECRET: z.string(), PRICING_SOURCE: z.enum(["dummy", "coingecko"]).default("coingecko"), diff --git a/apps/processing/src/index.ts b/apps/processing/src/index.ts index 97d77ad..9705da5 100644 --- a/apps/processing/src/index.ts +++ b/apps/processing/src/index.ts @@ -6,7 +6,7 @@ import { ProcessingService } from "./services/processing.service.js"; let processor: ProcessingService; const main = async (): Promise => { - processor = new ProcessingService(environment); + processor = await ProcessingService.initialize(environment); await processor.start(); }; diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index 285cd8f..cf75f58 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -23,10 +23,9 @@ export class ProcessingService { private readonly logger = new Logger({ className: "ProcessingService" }); private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"]; - constructor(private readonly env: Environment) { + private constructor(env: Environment, sharedDependencies: SharedDependencies) { const { CHAINS: chains } = env; - const { core, registries, indexerClient, kyselyDatabase } = - SharedDependenciesService.initialize(env); + const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies; this.kyselyDatabase = kyselyDatabase; for (const chain of chains) { @@ -49,6 +48,11 @@ export class ProcessingService { } } + static async initialize(env: Environment): Promise { + const sharedDependencies = await SharedDependenciesService.initialize(env); + return new ProcessingService(env, sharedDependencies); + } + /** * Start the processor service * diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 6e1bc25..240d43b 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -1,8 +1,13 @@ import { CoreDependencies, InMemoryEventsRegistry, - InMemoryStrategyRegistry, + IStrategyRegistry, } from "@grants-stack-indexer/data-flow"; +import { + DatabaseStrategyRegistry, + IEventsRegistry, + InMemoryCachedStrategyRegistry, +} from "@grants-stack-indexer/data-flow/dist/src/internal.js"; import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client"; import { IpfsProvider } from "@grants-stack-indexer/metadata"; import { PricingProviderFactory } from "@grants-stack-indexer/pricing"; @@ -13,6 +18,7 @@ import { KyselyDonationRepository, KyselyProjectRepository, KyselyRoundRepository, + KyselyStrategyRepository, } from "@grants-stack-indexer/repository"; import { Logger } from "@grants-stack-indexer/shared"; @@ -21,8 +27,8 @@ import { Environment } from "../config/index.js"; export type SharedDependencies = { core: Omit; registries: { - eventsRegistry: InMemoryEventsRegistry; - strategyRegistry: InMemoryStrategyRegistry; + eventsRegistry: IEventsRegistry; + strategyRegistry: IStrategyRegistry; }; indexerClient: EnvioIndexerClient; kyselyDatabase: ReturnType; @@ -35,7 +41,7 @@ export type SharedDependencies = { * - Initializes indexer client */ export class SharedDependenciesService { - static initialize(env: Environment): SharedDependencies { + static async initialize(env: Environment): Promise { // Initialize repositories const kyselyDatabase = createKyselyDatabase({ connectionString: env.DATABASE_URL, @@ -68,8 +74,16 @@ export class SharedDependenciesService { const eventsRegistry = new InMemoryEventsRegistry( new Logger({ className: "InMemoryEventsRegistry" }), ); - const strategyRegistry = new InMemoryStrategyRegistry( + const strategyRepository = new KyselyStrategyRepository( + kyselyDatabase, + env.DATABASE_REGISTRIES_SCHEMA, + ); + const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( new Logger({ className: "InMemoryStrategyRegistry" }), + new DatabaseStrategyRegistry( + new Logger({ className: "DatabaseStrategyRegistry" }), + strategyRepository, + ), ); // Initialize indexer client diff --git a/apps/processing/test/unit/processing.service.spec.ts b/apps/processing/test/unit/processing.service.spec.ts index 802ed72..6623fde 100644 --- a/apps/processing/test/unit/processing.service.spec.ts +++ b/apps/processing/test/unit/processing.service.spec.ts @@ -52,9 +52,9 @@ describe("ProcessingService", () => { DATABASE_SCHEMA: "public", }; - beforeEach(() => { + beforeEach(async () => { vi.clearAllMocks(); - processingService = new ProcessingService(mockEnv as Environment); + processingService = await ProcessingService.initialize(mockEnv as Environment); }); afterEach(() => { diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index fc633dd..3452ad3 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -51,8 +51,8 @@ describe("SharedDependenciesService", () => { PRICING_SOURCE: "dummy", }; - it("initializes all dependencies correctly", () => { - const dependencies = SharedDependenciesService.initialize(mockEnv as Environment); + it("initializes all dependencies correctly", async () => { + const dependencies = await SharedDependenciesService.initialize(mockEnv as Environment); // Verify database initialization expect(createKyselyDatabase).toHaveBeenCalledWith({ diff --git a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts index 8d3aa44..cb187c3 100644 --- a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts +++ b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts @@ -1,22 +1,38 @@ import { Address, Hex } from "viem"; +import { Strategy } from "@grants-stack-indexer/repository"; +import { ChainId } from "@grants-stack-indexer/shared"; + /** * The strategy registry saves the mapping between the strategy address and the strategy id. Serves as a Cache * to avoid having to read from the chain to get the strategy id every time. */ -//TODO: implement a mechanism to record Strategy that we still don't have a corresponding handler -// we need to store and mark that this strategy is not handled yet, so when it's supported we can process all of the pending events for it export interface IStrategyRegistry { /** - * Get the strategy id by the strategy address + * Get the strategy id by the strategy address and chain id + * + * @param chainId - The chain id * @param strategyAddress - The strategy address - * @returns The strategy id or undefined if the strategy address is not registered + * @returns The strategy or undefined if the strategy address is not registered */ - getStrategyId(strategyAddress: Address): Promise; + getStrategyId(chainId: ChainId, strategyAddress: Address): Promise; /** - * Save the strategy id by the strategy address + * Save the strategy id by the strategy address and chain id + * @param chainId - The chain id * @param strategyAddress - The strategy address * @param strategyId - The strategy id + * @param handled - Whether the strategy is handled + */ + saveStrategyId( + chainId: ChainId, + strategyAddress: Address, + strategyId: Hex, + handled: boolean, + ): Promise; + + /** + * Get all the strategies + * @returns The strategies */ - saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise; + getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise; } diff --git a/packages/data-flow/src/internal.ts b/packages/data-flow/src/internal.ts index accf61a..cb9a1a9 100644 --- a/packages/data-flow/src/internal.ts +++ b/packages/data-flow/src/internal.ts @@ -5,7 +5,7 @@ export * from "./abis/index.js"; export * from "./utils/index.js"; export * from "./data-loader/index.js"; export * from "./eventsFetcher.js"; -export * from "./strategyRegistry.js"; +export * from "./registries/index.js"; export * from "./eventsRegistry.js"; export * from "./eventsProcessor.js"; export * from "./orchestrator.js"; diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index ede93c6..39fd1c1 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -115,14 +115,20 @@ export class Orchestrator { event = await this.enhanceStrategyId(event); if (event.contractName === "Strategy" && "strategyId" in event) { if (!existsHandler(event.strategyId)) { - //TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events - //TODO: decide if we want to log this - // this.logger.info( - // `No handler found for strategyId: ${event.strategyId}. Event: ${stringify( - // event, - // )}`, - // ); + await this.strategyRegistry.saveStrategyId( + this.chainId, + event.srcAddress, + event.strategyId, + false, + ); continue; + } else { + await this.strategyRegistry.saveStrategyId( + this.chainId, + event.srcAddress, + event.strategyId, + true, + ); } } @@ -216,9 +222,12 @@ export class Orchestrator { * @returns The strategy id */ private async getOrFetchStrategyId(strategyAddress: Address): Promise { - const existingId = await this.strategyRegistry.getStrategyId(strategyAddress); - if (existingId) { - return existingId; + const cachedStrategy = await this.strategyRegistry.getStrategyId( + this.chainId, + strategyAddress, + ); + if (cachedStrategy) { + return cachedStrategy.id; } const strategyId = await this.dependencies.evmProvider.readContract( @@ -227,8 +236,6 @@ export class Orchestrator { "getStrategyId", ); - await this.strategyRegistry.saveStrategyId(strategyAddress, strategyId); - return strategyId; } diff --git a/packages/data-flow/src/registries/cachedStrategyRegistry.ts b/packages/data-flow/src/registries/cachedStrategyRegistry.ts new file mode 100644 index 0000000..2babeac --- /dev/null +++ b/packages/data-flow/src/registries/cachedStrategyRegistry.ts @@ -0,0 +1,78 @@ +import { Strategy } from "@grants-stack-indexer/repository"; +import { Address, ChainId, Hex, ILogger } from "@grants-stack-indexer/shared"; + +import { IStrategyRegistry } from "../internal.js"; + +/** + * Proxy class to cache the strategy ids in memory or fallback to another strategy registry + */ +export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { + private cache: Map>; + + private constructor( + private readonly logger: ILogger, + private readonly strategyRegistry: IStrategyRegistry, + cache: Map>, + ) { + this.cache = structuredClone(cache); + } + + async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { + return this.strategyRegistry.getStrategies(params); + } + + static async initialize( + logger: ILogger, + strategyRegistry: IStrategyRegistry, + ): Promise { + const strategies = await strategyRegistry.getStrategies(); + const cache = new Map>(); + + logger.debug(`Loading strategies into memory...`); + + for (const strategy of strategies) { + if (!cache.has(strategy.chainId)) { + cache.set(strategy.chainId, new Map()); + } + cache.get(strategy.chainId)?.set(strategy.address, strategy); + } + + return new InMemoryCachedStrategyRegistry(logger, strategyRegistry, cache); + } + + async getStrategyId(chainId: ChainId, strategyAddress: Address): Promise { + const cache = this.cache.get(chainId)?.get(strategyAddress); + if (cache) { + return cache; + } + + const strategy = await this.strategyRegistry.getStrategyId(chainId, strategyAddress); + if (strategy) { + this.cache.get(chainId)?.set(strategyAddress, strategy); + } + return strategy; + } + + async saveStrategyId( + chainId: ChainId, + strategyAddress: Address, + strategyId: Hex, + handled: boolean, + ): Promise { + if (this.cache.get(chainId)?.get(strategyAddress)?.handled === handled) { + return; + } + + this.logger.debug( + `Saving strategy id ${strategyId} for address ${strategyAddress} and chainId ${chainId}`, + ); + await this.strategyRegistry.saveStrategyId(chainId, strategyAddress, strategyId, handled); + + this.cache.get(chainId)?.set(strategyAddress, { + address: strategyAddress, + id: strategyId, + chainId, + handled, + }); + } +} diff --git a/packages/data-flow/src/registries/dbStrategyRegistry.ts b/packages/data-flow/src/registries/dbStrategyRegistry.ts new file mode 100644 index 0000000..318e6be --- /dev/null +++ b/packages/data-flow/src/registries/dbStrategyRegistry.ts @@ -0,0 +1,44 @@ +import type { Address, Hex } from "viem"; + +import { IStrategyRepository, Strategy } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { IStrategyRegistry } from "../internal.js"; + +/** + * Class to store strategy ids in Database + */ +export class DatabaseStrategyRegistry implements IStrategyRegistry { + constructor( + private logger: ILogger, + private strategyRepository: IStrategyRepository, + ) {} + + /** @inheritdoc */ + async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { + return this.strategyRepository.getStrategies(params); + } + + /** @inheritdoc */ + async getStrategyId(chainId: ChainId, strategyAddress: Address): Promise { + return this.strategyRepository.getStrategyByChainIdAndAddress(chainId, strategyAddress); + } + + /** @inheritdoc */ + async saveStrategyId( + chainId: ChainId, + strategyAddress: Address, + strategyId: Hex, + handled: boolean, + ): Promise { + this.logger.debug( + `Saving strategy id ${strategyId} for address ${strategyAddress} and chainId ${chainId} in Database`, + ); + await this.strategyRepository.saveStrategy({ + chainId, + address: strategyAddress, + id: strategyId, + handled, + }); + } +} diff --git a/packages/data-flow/src/registries/index.ts b/packages/data-flow/src/registries/index.ts new file mode 100644 index 0000000..3431897 --- /dev/null +++ b/packages/data-flow/src/registries/index.ts @@ -0,0 +1,3 @@ +export * from "./cachedStrategyRegistry.js"; +export * from "./dbStrategyRegistry.js"; +export * from "./strategyRegistry.js"; diff --git a/packages/data-flow/src/registries/strategyRegistry.ts b/packages/data-flow/src/registries/strategyRegistry.ts new file mode 100644 index 0000000..1ff55cf --- /dev/null +++ b/packages/data-flow/src/registries/strategyRegistry.ts @@ -0,0 +1,58 @@ +import type { Address, Hex } from "viem"; + +import { Strategy } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import type { IStrategyRegistry } from "../internal.js"; + +/** + * Class to store strategy ids in memory + */ +export class InMemoryStrategyRegistry implements IStrategyRegistry { + private strategiesMap: Map> = new Map(); + constructor(private logger: ILogger) {} + + /** @inheritdoc */ + async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { + return Array.from(this.strategiesMap.entries()) + .filter(([chainId]) => params?.chainId === undefined || chainId === params.chainId) + .map(([chainId, strategies]) => + Array.from(strategies.entries()) + .filter( + ([_address, strategy]) => + params?.handled === undefined || strategy.handled === params.handled, + ) + .map(([address, strategy]) => ({ + id: strategy.id, + address, + chainId, + handled: strategy.handled, + })), + ) + .flat(); + } + + /** @inheritdoc */ + async getStrategyId(chainId: ChainId, strategyAddress: Address): Promise { + return this.strategiesMap.get(chainId)?.get(strategyAddress); + } + + /** @inheritdoc */ + async saveStrategyId( + chainId: ChainId, + strategyAddress: Address, + strategyId: Hex, + handled: boolean, + ): Promise { + this.logger.debug(`Saving strategy id ${strategyId} for address ${strategyAddress}`); + if (!this.strategiesMap.has(chainId)) { + this.strategiesMap.set(chainId, new Map()); + } + this.strategiesMap.get(chainId)!.set(strategyAddress, { + address: strategyAddress, + id: strategyId, + chainId, + handled, + }); + } +} diff --git a/packages/data-flow/src/strategyRegistry.ts b/packages/data-flow/src/strategyRegistry.ts deleted file mode 100644 index d32fd59..0000000 --- a/packages/data-flow/src/strategyRegistry.ts +++ /dev/null @@ -1,25 +0,0 @@ -import type { Address, Hex } from "viem"; - -import { ILogger } from "@grants-stack-indexer/shared"; - -import type { IStrategyRegistry } from "./internal.js"; - -/** - * Class to store strategy ids in memory - */ -//TODO: Implement storage to persist strategies. since we're using address, do we need ChainId? -export class InMemoryStrategyRegistry implements IStrategyRegistry { - private strategiesMap: Map = new Map(); - constructor(private logger: ILogger) {} - - /** @inheritdoc */ - async getStrategyId(strategyAddress: Address): Promise { - return this.strategiesMap.get(strategyAddress); - } - - /** @inheritdoc */ - async saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise { - this.logger.debug(`Saving strategy id ${strategyId} for address ${strategyAddress}`); - this.strategiesMap.set(strategyAddress, strategyId); - } -} diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 6943c78..e1bd5d7 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -83,6 +83,7 @@ describe("Orchestrator", { sequential: true }, () => { mockStrategyRegistry = { getStrategyId: vi.fn(), saveStrategyId: vi.fn(), + getStrategies: vi.fn(), }; mockEvmProvider = { @@ -321,7 +322,12 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); - vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(strategyId); + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }); eventsProcessorSpy.mockResolvedValue(changesets); vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); @@ -375,7 +381,12 @@ describe("Orchestrator", { sequential: true }, () => { .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); - vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(unhandledStrategyId); + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ + id: unhandledStrategyId, + address: strategyAddress, + chainId, + handled: false, + }); vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(unhandledStrategyId); runPromise = orchestrator.run(abortController.signal); @@ -418,7 +429,12 @@ describe("Orchestrator", { sequential: true }, () => { vi.spyOn(mockStrategyRegistry, "getStrategyId") .mockResolvedValueOnce(undefined) - .mockResolvedValue(strategyId); + .mockResolvedValue({ + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") .mockResolvedValueOnce([poolCreatedEvent]) @@ -580,7 +596,12 @@ describe("Orchestrator", { sequential: true }, () => { ); const error = new UnsupportedStrategy(strategyId); - vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(strategyId); + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ + id: strategyId, + address: mockEvent.srcAddress, + chainId, + handled: true, + }); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") .mockResolvedValueOnce([mockEvent]) .mockResolvedValue([]); diff --git a/packages/data-flow/test/unit/strategyRegistry.spec.ts b/packages/data-flow/test/unit/strategyRegistry.spec.ts index 01e4806..650a2a2 100644 --- a/packages/data-flow/test/unit/strategyRegistry.spec.ts +++ b/packages/data-flow/test/unit/strategyRegistry.spec.ts @@ -1,9 +1,9 @@ import { Address, Hex } from "viem"; import { describe, expect, it, vi } from "vitest"; -import { ILogger } from "@grants-stack-indexer/shared"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; -import { InMemoryStrategyRegistry } from "../../src/strategyRegistry.js"; +import { InMemoryStrategyRegistry } from "../../src/registries/strategyRegistry.js"; describe("InMemoryStrategyRegistry", () => { const logger: ILogger = { @@ -12,11 +12,13 @@ describe("InMemoryStrategyRegistry", () => { info: vi.fn(), warn: vi.fn(), }; + const chainId = 1 as ChainId; + it("return undefined for non-existent strategy address", async () => { const registry = new InMemoryStrategyRegistry(logger); const strategyAddress = "0x123" as Address; - const strategyId = await registry.getStrategyId(strategyAddress); + const strategyId = await registry.getStrategyId(chainId, strategyAddress); expect(strategyId).toBeUndefined(); }); @@ -25,10 +27,15 @@ describe("InMemoryStrategyRegistry", () => { const strategyAddress = "0x123" as Address; const strategyId = "0xabc" as Hex; - await registry.saveStrategyId(strategyAddress, strategyId); - const retrievedId = await registry.getStrategyId(strategyAddress); + await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); + const retrieved = await registry.getStrategyId(chainId, strategyAddress); - expect(retrievedId).toBe(strategyId); + expect(retrieved).toEqual({ + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }); }); it("handle multiple strategy addresses independently", async () => { @@ -38,13 +45,23 @@ describe("InMemoryStrategyRegistry", () => { const firstStrategyId = "0xabc" as Hex; const secondStrategyId = "0xdef" as Hex; - await registry.saveStrategyId(firstAddress, firstStrategyId); - await registry.saveStrategyId(secondAddress, secondStrategyId); + await registry.saveStrategyId(chainId, firstAddress, firstStrategyId, true); + await registry.saveStrategyId(chainId, secondAddress, secondStrategyId, true); - const retrievedFirstId = await registry.getStrategyId(firstAddress); - const retrievedSecondId = await registry.getStrategyId(secondAddress); + const retrievedFirst = await registry.getStrategyId(chainId, firstAddress); + const retrievedSecond = await registry.getStrategyId(chainId, secondAddress); - expect(retrievedFirstId).toBe(firstStrategyId); - expect(retrievedSecondId).toBe(secondStrategyId); + expect(retrievedFirst).toEqual({ + id: firstStrategyId, + address: firstAddress, + chainId, + handled: true, + }); + expect(retrievedSecond).toEqual({ + id: secondStrategyId, + address: secondAddress, + chainId, + handled: true, + }); }); }); diff --git a/packages/repository/src/db/connection.ts b/packages/repository/src/db/connection.ts index 8b0a352..d9e5568 100644 --- a/packages/repository/src/db/connection.ts +++ b/packages/repository/src/db/connection.ts @@ -20,6 +20,7 @@ import { Round, RoundRole as RoundRoleTable, StatusSnapshot, + Strategy as StrategyTable, } from "../internal.js"; const { Pool } = pg; @@ -59,6 +60,7 @@ export interface Database { applications: ApplicationTable; donations: DonationTable; applicationsPayouts: ApplicationPayoutTable; + strategies: StrategyTable; } /** diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index 8d10c3c..b8f9d36 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -8,6 +8,7 @@ export type { IApplicationReadRepository, IDonationRepository, IApplicationPayoutRepository, + IStrategyRepository, DatabaseConfig, } from "./internal.js"; @@ -35,6 +36,8 @@ export type { Donation, NewDonation } from "./types/index.js"; export type { NewApplicationPayout, ApplicationPayout } from "./types/index.js"; +export type { Strategy, NewStrategy } from "./types/index.js"; + export type { Changeset, ProjectChangeset, @@ -50,6 +53,7 @@ export { KyselyApplicationRepository, KyselyDonationRepository, KyselyApplicationPayoutRepository, + KyselyStrategyRepository, } from "./repositories/kysely/index.js"; export { diff --git a/packages/repository/src/interfaces/index.ts b/packages/repository/src/interfaces/index.ts index 26a47f1..3562351 100644 --- a/packages/repository/src/interfaces/index.ts +++ b/packages/repository/src/interfaces/index.ts @@ -3,3 +3,4 @@ export * from "./roundRepository.interface.js"; export * from "./applicationRepository.interface.js"; export * from "./donationRepository.interface.js"; export * from "./applicationPayoutRepository.interface.js"; +export * from "./strategyRepository.interface.js"; diff --git a/packages/repository/src/interfaces/strategyRepository.interface.ts b/packages/repository/src/interfaces/strategyRepository.interface.ts new file mode 100644 index 0000000..e5332f9 --- /dev/null +++ b/packages/repository/src/interfaces/strategyRepository.interface.ts @@ -0,0 +1,12 @@ +import { Address, ChainId } from "@grants-stack-indexer/shared"; + +import { Strategy } from "../internal.js"; + +export interface IStrategyRepository { + getStrategyByChainIdAndAddress( + chainId: ChainId, + strategyAddress: Address, + ): Promise; + saveStrategy(strategy: Strategy): Promise; + getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise; +} diff --git a/packages/repository/src/repositories/kysely/index.ts b/packages/repository/src/repositories/kysely/index.ts index 6b5d977..5d79682 100644 --- a/packages/repository/src/repositories/kysely/index.ts +++ b/packages/repository/src/repositories/kysely/index.ts @@ -3,3 +3,4 @@ export * from "./round.repository.js"; export * from "./application.repository.js"; export * from "./donation.repository.js"; export * from "./applicationPayout.repository.js"; +export * from "./strategy.repository.js"; diff --git a/packages/repository/src/repositories/kysely/strategy.repository.ts b/packages/repository/src/repositories/kysely/strategy.repository.ts new file mode 100644 index 0000000..e6cb91c --- /dev/null +++ b/packages/repository/src/repositories/kysely/strategy.repository.ts @@ -0,0 +1,48 @@ +import { Kysely } from "kysely"; + +import { Address, ChainId } from "@grants-stack-indexer/shared"; + +import { IStrategyRepository } from "../../interfaces/index.js"; +import { Database, Strategy } from "../../internal.js"; + +export class KyselyStrategyRepository implements IStrategyRepository { + constructor( + private readonly db: Kysely, + private readonly schemaName: string, + ) {} + + async getStrategyByChainIdAndAddress( + chainId: ChainId, + strategyAddress: Address, + ): Promise { + return this.db + .withSchema(this.schemaName) + .selectFrom("strategies") + .where("chainId", "=", chainId) + .where("address", "=", strategyAddress) + .selectAll() + .executeTakeFirst(); + } + async saveStrategy(strategy: Strategy): Promise { + await this.db + .withSchema(this.schemaName) + .insertInto("strategies") + .values(strategy) + .onConflict((oc) => oc.columns(["chainId", "address"]).doUpdateSet(strategy)) + .execute(); + } + + async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { + const query = this.db.withSchema(this.schemaName).selectFrom("strategies"); + + if (params?.chainId) { + query.where("chainId", "=", params.chainId); + } + + if (params?.handled) { + query.where("handled", "=", params.handled); + } + + return query.selectAll().execute(); + } +} diff --git a/packages/repository/src/types/index.ts b/packages/repository/src/types/index.ts index f573b25..c34de7d 100644 --- a/packages/repository/src/types/index.ts +++ b/packages/repository/src/types/index.ts @@ -4,3 +4,4 @@ export * from "./application.types.js"; export * from "./changeset.types.js"; export * from "./donation.types.js"; export * from "./applicationPayout.types.js"; +export * from "./strategy.types.js"; diff --git a/packages/repository/src/types/strategy.types.ts b/packages/repository/src/types/strategy.types.ts new file mode 100644 index 0000000..8bee12f --- /dev/null +++ b/packages/repository/src/types/strategy.types.ts @@ -0,0 +1,10 @@ +import { Address, ChainId, Hex } from "@grants-stack-indexer/shared"; + +export type Strategy = { + address: Address; + id: Hex; + chainId: ChainId; + handled: boolean; +}; + +export type NewStrategy = Strategy; From a851079f846bffd1c13c8b1307f3fced1b2aafd3 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:25:12 -0300 Subject: [PATCH 2/8] feat: update migrations script to handle different schemas --- package.json | 6 ++- pnpm-lock.yaml | 24 +++++++++ scripts/migrations/package.json | 8 ++- scripts/migrations/src/migrateDb.script.ts | 22 ++++----- .../20241029T120000_initial.ts | 2 +- .../20241210T175001_strategy_registry.ts | 33 +++++++++++++ scripts/migrations/src/resetDb.script.ts | 22 ++++----- scripts/migrations/src/schemas/index.ts | 1 - scripts/migrations/src/utils/index.ts | 1 + scripts/migrations/src/utils/parsing.ts | 49 +++++++++++++++++++ 10 files changed, 140 insertions(+), 28 deletions(-) rename scripts/migrations/src/migrations/{ => chainData}/20241029T120000_initial.ts (99%) create mode 100644 scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts create mode 100644 scripts/migrations/src/utils/parsing.ts diff --git a/package.json b/package.json index 0ec53b2..9679466 100644 --- a/package.json +++ b/package.json @@ -16,9 +16,11 @@ "preinstall": "npx only-allow pnpm", "lint": "turbo run lint", "lint:fix": "turbo run lint:fix", + "migrate:chain-data": "pnpm run --filter @grants-stack-indexer/migrations migrate:chain-data", + "migrate:registries": "pnpm run --filter @grants-stack-indexer/migrations migrate:registries", "prepare": "husky", - "script:db:migrate": "pnpm run --filter @grants-stack-indexer/migrations script:db:migrate", - "script:db:reset": "pnpm run --filter @grants-stack-indexer/migrations script:db:reset", + "reset:chain-data": "pnpm run --filter @grants-stack-indexer/migrations reset:chain-data", + "reset:registries": "pnpm run --filter @grants-stack-indexer/migrations reset:registries", "start": "turbo run start", "test": "turbo run test", "test:cov": "turbo run test:cov", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eeca826..92721dd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -279,10 +279,16 @@ importers: kysely: specifier: 0.27.4 version: 0.27.4 + yargs: + specifier: 17.7.2 + version: 17.7.2 zod: specifier: 3.23.8 version: 3.23.8 devDependencies: + "@types/yargs": + specifier: 17.0.33 + version: 17.0.33 tsx: specifier: 4.19.2 version: 4.19.2 @@ -1527,6 +1533,18 @@ packages: integrity: sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==, } + "@types/yargs-parser@21.0.3": + resolution: + { + integrity: sha512-I4q9QU9MQv4oEOz4tAHJtNz1cwuLxn2F3xcc2iV5WdqLPpUnj30aUuxt1mAxYTG+oe8CZMV/+6rU4S4gRDzqtQ==, + } + + "@types/yargs@17.0.33": + resolution: + { + integrity: sha512-WpxBCKWPLr4xSsHgz511rFJAM+wS28w2zEO1QDNY5zM/S8ok70NNfztH0xwhqKyaK0OHCbN98LDAZuy1ctxDkA==, + } + "@typescript-eslint/eslint-plugin@5.62.0": resolution: { @@ -5770,6 +5788,12 @@ snapshots: "@types/triple-beam@1.3.5": {} + "@types/yargs-parser@21.0.3": {} + + "@types/yargs@17.0.33": + dependencies: + "@types/yargs-parser": 21.0.3 + "@typescript-eslint/eslint-plugin@5.62.0(@typescript-eslint/parser@5.62.0(eslint@8.56.0)(typescript@5.5.4))(eslint@8.56.0)(typescript@5.5.4)": dependencies: "@eslint-community/regexpp": 4.12.1 diff --git a/scripts/migrations/package.json b/scripts/migrations/package.json index 737e796..ffe8e67 100644 --- a/scripts/migrations/package.json +++ b/scripts/migrations/package.json @@ -20,8 +20,10 @@ "format:fix": "prettier --write \"{src,test}/**/*.{js,ts,json}\"", "lint": "eslint \"{src,test}/**/*.{js,ts,json}\"", "lint:fix": "pnpm lint --fix", - "script:db:migrate": "tsx src/migrateDb.script.ts", - "script:db:reset": "tsx src/resetDb.script.ts", + "migrate:chain-data": "tsx src/migrateDb.script.ts --folder chainData", + "migrate:registries": "tsx src/migrateDb.script.ts --folder registries", + "reset:chain-data": "tsx src/resetDb.script.ts --folder chainData", + "reset:registries": "tsx src/resetDb.script.ts --folder registries", "test": "vitest run --config vitest.config.ts --passWithNoTests", "test:cov": "vitest run --config vitest.config.ts --coverage --passWithNoTests" }, @@ -29,9 +31,11 @@ "@grants-stack-indexer/repository": "workspace:*", "dotenv": "16.4.5", "kysely": "0.27.4", + "yargs": "17.7.2", "zod": "3.23.8" }, "devDependencies": { + "@types/yargs": "17.0.33", "tsx": "4.19.2" } } diff --git a/scripts/migrations/src/migrateDb.script.ts b/scripts/migrations/src/migrateDb.script.ts index 3d7245b..90bc5fa 100644 --- a/scripts/migrations/src/migrateDb.script.ts +++ b/scripts/migrations/src/migrateDb.script.ts @@ -1,10 +1,9 @@ -import path from "path"; import { configDotenv } from "dotenv"; import { createKyselyDatabase } from "@grants-stack-indexer/repository"; import { getDatabaseConfigFromEnv } from "./schemas/index.js"; -import { migrateToLatest } from "./utils/index.js"; +import { getMigrationsFolder, migrateToLatest, parseArguments } from "./utils/index.js"; configDotenv(); @@ -21,7 +20,10 @@ configDotenv(); * * Environment variables required: * - DATABASE_URL: PostgreSQL connection string - * - DATABASE_SCHEMA: Schema name to migrate (e.g. "grants_stack") + * + * Script arguments: + * - folder: Folder name to migrate (e.g. "chainData" or "registries") + * - schema: Database schema name where migrations are applied * * The script will: * - Create the schema if it doesn't exist @@ -31,22 +33,20 @@ configDotenv(); */ export const main = async (): Promise => { - const { DATABASE_URL, DATABASE_SCHEMA } = getDatabaseConfigFromEnv(); + const { DATABASE_URL } = getDatabaseConfigFromEnv(); + const { folder, schema } = parseArguments(); const db = createKyselyDatabase({ connectionString: DATABASE_URL, - withSchema: DATABASE_SCHEMA, + withSchema: schema, }); - console.log(`Migrating database schema '${DATABASE_SCHEMA}'...`); + console.log(`Migrating database schema '${schema}'...`); const migrationResults = await migrateToLatest({ db, - schema: DATABASE_SCHEMA, - migrationsFolder: path.join( - path.dirname(new URL(import.meta.url).pathname), - "./migrations", - ), + schema, + migrationsFolder: getMigrationsFolder(folder), }); if (migrationResults && migrationResults?.length > 0) { diff --git a/scripts/migrations/src/migrations/20241029T120000_initial.ts b/scripts/migrations/src/migrations/chainData/20241029T120000_initial.ts similarity index 99% rename from scripts/migrations/src/migrations/20241029T120000_initial.ts rename to scripts/migrations/src/migrations/chainData/20241029T120000_initial.ts index 778d0e8..904a139 100644 --- a/scripts/migrations/src/migrations/20241029T120000_initial.ts +++ b/scripts/migrations/src/migrations/chainData/20241029T120000_initial.ts @@ -1,6 +1,6 @@ import { Kysely, sql } from "kysely"; -import { getSchemaName } from "../utils/index.js"; +import { getSchemaName } from "../../utils/index.js"; /** * The up function is called when you update your database schema to the next version and down when you go back to previous version. diff --git a/scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts b/scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts new file mode 100644 index 0000000..704a75d --- /dev/null +++ b/scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts @@ -0,0 +1,33 @@ +import { Kysely } from "kysely"; + +import { getSchemaName } from "../../utils/index.js"; + +/** + * The up function is called when you update your database schema to the next version and down when you go back to previous version. + * The only argument for the functions is an instance of Kysely. It's important to use Kysely and not Kysely. + * ref: https://kysely.dev/docs/migrations#migration-files + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function up(db: Kysely): Promise { + const ADDRESS_TYPE = "text"; + const CHAIN_ID_TYPE = "integer"; + + const schema = getSchemaName(db.schema); + + console.log("schema", schema); + + await db.schema + .createTable("strategies") + .addColumn("address", ADDRESS_TYPE) + .addColumn("id", "text") + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("handled", "boolean") + .addPrimaryKeyConstraint("strategies_pkey", ["address", "chainId"]) + .execute(); +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export async function down(db: Kysely): Promise { + // Drop everything in reverse order + await db.schema.dropTable("strategies").execute(); +} diff --git a/scripts/migrations/src/resetDb.script.ts b/scripts/migrations/src/resetDb.script.ts index 905c9eb..b55f6a2 100644 --- a/scripts/migrations/src/resetDb.script.ts +++ b/scripts/migrations/src/resetDb.script.ts @@ -1,10 +1,9 @@ -import path from "path"; import { configDotenv } from "dotenv"; import { createKyselyDatabase } from "@grants-stack-indexer/repository"; import { getDatabaseConfigFromEnv } from "./schemas/index.js"; -import { resetDatabase } from "./utils/index.js"; +import { getMigrationsFolder, parseArguments, resetDatabase } from "./utils/index.js"; configDotenv(); @@ -21,7 +20,10 @@ configDotenv(); * * Environment variables required: * - DATABASE_URL: PostgreSQL connection string - * - DATABASE_SCHEMA: Schema name to reset (e.g. "grants_stack") + * + * Script arguments: + * - folder: Folder name to migrate (e.g. "chainData" or "registries") + * - schema: Database schema name where migrations are applied * * The script will: * - Drop the schema if it exists @@ -34,22 +36,20 @@ configDotenv(); */ const main = async (): Promise => { - const { DATABASE_URL, DATABASE_SCHEMA } = getDatabaseConfigFromEnv(); + const { DATABASE_URL } = getDatabaseConfigFromEnv(); + const { folder, schema } = parseArguments(); const db = createKyselyDatabase({ connectionString: DATABASE_URL, - withSchema: DATABASE_SCHEMA, + withSchema: schema, }); - console.log(`Resetting database schema '${DATABASE_SCHEMA}'...`); + console.log(`Resetting database schema '${schema}'...`); const resetResults = await resetDatabase({ db, - schema: DATABASE_SCHEMA, - migrationsFolder: path.join( - path.dirname(new URL(import.meta.url).pathname), - "./migrations", - ), + schema, + migrationsFolder: getMigrationsFolder(folder), }); if (resetResults && resetResults?.length > 0) { diff --git a/scripts/migrations/src/schemas/index.ts b/scripts/migrations/src/schemas/index.ts index a18141a..5b5ec4e 100644 --- a/scripts/migrations/src/schemas/index.ts +++ b/scripts/migrations/src/schemas/index.ts @@ -2,7 +2,6 @@ import { z } from "zod"; const dbEnvSchema = z.object({ DATABASE_URL: z.string().url(), - DATABASE_SCHEMA: z.string().min(1), }); export type DbEnvConfig = z.infer; diff --git a/scripts/migrations/src/utils/index.ts b/scripts/migrations/src/utils/index.ts index 7b157b3..e720a9c 100644 --- a/scripts/migrations/src/utils/index.ts +++ b/scripts/migrations/src/utils/index.ts @@ -1 +1,2 @@ export * from "./kysely.js"; +export * from "./parsing.js"; diff --git a/scripts/migrations/src/utils/parsing.ts b/scripts/migrations/src/utils/parsing.ts new file mode 100644 index 0000000..c062589 --- /dev/null +++ b/scripts/migrations/src/utils/parsing.ts @@ -0,0 +1,49 @@ +import { existsSync } from "fs"; +import path from "path"; +import yargs from "yargs"; +import { hideBin } from "yargs/helpers"; +import { z } from "zod"; + +const zodSchema = z.object({ + folder: z + .string() + .refine((value) => ["chainData", "registries"].includes(value), { + message: "Schema name must be either 'chainData' or 'registries'", + }) + .describe("Folder name to migrate"), + schema: z.string().describe("Database schema name where migrations are applied"), +}); + +export const parseArguments = (): z.infer => { + return yargs(hideBin(process.argv)) + .option("folder", { + type: "string", + demandOption: true, + description: "Folder name to migrate", + }) + .option("schema", { + alias: "s", + type: "string", + demandOption: true, + description: "Database schema name where migrations are applied", + }) + .check((argv) => { + zodSchema.parse(argv); + return true; + }) + .parseSync(); +}; + +export const getMigrationsFolder = (folder: string): string => { + const migrationsFolder = path.join( + path.dirname(new URL(import.meta.url).pathname), + `../migrations/${folder}`, + ); + console.log("migrationsFolder", migrationsFolder); + + if (!existsSync(migrationsFolder)) { + throw new Error(`Migrations folder '${folder}' not found`); + } + + return migrationsFolder; +}; From 6b7e3ab80d55e418d5e9fc5e16361409a9c1df30 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:58:18 -0300 Subject: [PATCH 3/8] test: add unit tests --- .../services/sharedDependencies.service.ts | 2 +- .../unit/sharedDependencies.service.spec.ts | 29 +++ packages/data-flow/src/orchestrator.ts | 30 ++- .../registries/cachedStrategyRegistry.spec.ts | 140 ++++++++++++++ .../registries/dbStrategyRegistry.spec.ts | 94 ++++++++++ .../test/registries/strategyRegistry.spec.ts | 171 ++++++++++++++++++ .../data-flow/test/unit/orchestrator.spec.ts | 88 ++++++++- .../test/unit/strategyRegistry.spec.ts | 67 ------- 8 files changed, 539 insertions(+), 82 deletions(-) create mode 100644 packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts create mode 100644 packages/data-flow/test/registries/dbStrategyRegistry.spec.ts create mode 100644 packages/data-flow/test/registries/strategyRegistry.spec.ts delete mode 100644 packages/data-flow/test/unit/strategyRegistry.spec.ts diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 240d43b..52e0c4e 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -79,7 +79,7 @@ export class SharedDependenciesService { env.DATABASE_REGISTRIES_SCHEMA, ); const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( - new Logger({ className: "InMemoryStrategyRegistry" }), + new Logger({ className: "InMemoryCachedStrategyRegistry" }), new DatabaseStrategyRegistry( new Logger({ className: "DatabaseStrategyRegistry" }), strategyRepository, diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index 3452ad3..31c662f 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -17,6 +17,11 @@ vi.mock("@grants-stack-indexer/repository", () => ({ KyselyApplicationRepository: vi.fn(), KyselyDonationRepository: vi.fn(), KyselyApplicationPayoutRepository: vi.fn(), + KyselyStrategyRepository: vi.fn().mockImplementation(() => ({ + getStrategies: vi.fn().mockResolvedValue([]), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + })), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -33,6 +38,27 @@ vi.mock("@grants-stack-indexer/indexer-client", () => ({ EnvioIndexerClient: vi.fn(), })); +// Update the mock to handle async initialization +vi.mock("@grants-stack-indexer/data-flow", () => { + const mockStrategyRegistry = { + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + }; + + return { + InMemoryEventsRegistry: vi.fn(), + InMemoryCachedStrategyRegistry: { + initialize: vi.fn().mockResolvedValue(mockStrategyRegistry), + }, + DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({ + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + })), + }; +}); + describe("SharedDependenciesService", () => { const mockEnv: Pick< Environment, @@ -89,5 +115,8 @@ describe("SharedDependenciesService", () => { // Verify registries expect(dependencies.registries).toHaveProperty("eventsRegistry"); expect(dependencies.registries).toHaveProperty("strategyRegistry"); + + // Verify InMemoryCachedStrategyRegistry initialization + expect(dependencies.registries.strategyRegistry).toBeDefined(); }); }); diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 39fd1c1..b648cbd 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -113,14 +113,17 @@ export class Orchestrator { await this.eventsRegistry.saveLastProcessedEvent(this.chainId, event); event = await this.enhanceStrategyId(event); - if (event.contractName === "Strategy" && "strategyId" in event) { + if (this.isPoolCreated(event)) { + // we save the strategy id with handled = false because we don't know if the strategy is handled yet + await this.strategyRegistry.saveStrategyId( + this.chainId, + event.srcAddress, + event.strategyId, + false, + ); + } else if (event.contractName === "Strategy" && "strategyId" in event) { if (!existsHandler(event.strategyId)) { - await this.strategyRegistry.saveStrategyId( - this.chainId, - event.srcAddress, - event.strategyId, - false, - ); + // we skip the event if the strategy id is not handled yet continue; } else { await this.strategyRegistry.saveStrategyId( @@ -239,6 +242,17 @@ export class Orchestrator { return strategyId; } + /** + * Check if the event is a PoolCreated event from Allo contract + * @param event - The event + * @returns True if the event is a PoolCreated event from Allo contract, false otherwise + */ + private isPoolCreated( + event: ProcessorEvent, + ): event is ProcessorEvent<"Allo", "PoolCreated"> { + return isAlloEvent(event) && event.eventName === "PoolCreated"; + } + /** * Check if the event requires a strategy id * @param event - The event @@ -247,6 +261,6 @@ export class Orchestrator { private requiresStrategyId( event: ProcessorEvent, ): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> { - return (isAlloEvent(event) && event.eventName === "PoolCreated") || isStrategyEvent(event); + return this.isPoolCreated(event) || isStrategyEvent(event); } } diff --git a/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts b/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts new file mode 100644 index 0000000..7829786 --- /dev/null +++ b/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts @@ -0,0 +1,140 @@ +import { Address, Hex } from "viem"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { Strategy } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { IStrategyRegistry } from "../../src/internal.js"; +import { InMemoryCachedStrategyRegistry } from "../../src/registries/cachedStrategyRegistry.js"; + +describe("InMemoryCachedStrategyRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const mockStrategyRegistry: IStrategyRegistry = { + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + }; + + const chainId = 1 as ChainId; + const strategyAddress = "0x123" as Address; + const strategyId = "0xabc" as Hex; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("initialize with existing strategies", async () => { + const strategies: Strategy[] = [ + { + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }, + ]; + + vi.mocked(mockStrategyRegistry.getStrategies).mockResolvedValue(strategies); + + const registry = await InMemoryCachedStrategyRegistry.initialize( + logger, + mockStrategyRegistry, + ); + const cached = await registry.getStrategyId(chainId, strategyAddress); + + expect(cached).toEqual(strategies[0]); + expect(mockStrategyRegistry.getStrategyId).not.toHaveBeenCalled(); + }); + + it("fetch from underlying registry when not in cache", async () => { + const strategy: Strategy = { + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }; + + vi.mocked(mockStrategyRegistry.getStrategies).mockResolvedValue([]); + vi.mocked(mockStrategyRegistry.getStrategyId).mockResolvedValue(strategy); + + const registry = await InMemoryCachedStrategyRegistry.initialize( + logger, + mockStrategyRegistry, + ); + const result = await registry.getStrategyId(chainId, strategyAddress); + + expect(result).toEqual(strategy); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenCalledWith(chainId, strategyAddress); + }); + + it("save strategy and update cache", async () => { + vi.mocked(mockStrategyRegistry.getStrategies).mockResolvedValue([]); + + const registry = await InMemoryCachedStrategyRegistry.initialize( + logger, + mockStrategyRegistry, + ); + await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); + + const cached = await registry.getStrategyId(chainId, strategyAddress); + expect(cached).toEqual({ + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }); + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + chainId, + strategyAddress, + strategyId, + true, + ); + }); + + it("don't save if strategy already exists with same handled status", async () => { + const strategy: Strategy = { + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }; + + vi.mocked(mockStrategyRegistry.getStrategies).mockResolvedValue([strategy]); + + const registry = await InMemoryCachedStrategyRegistry.initialize( + logger, + mockStrategyRegistry, + ); + await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); + + expect(mockStrategyRegistry.saveStrategyId).not.toHaveBeenCalled(); + }); + + it("delegate getStrategies to underlying registry", async () => { + const strategies: Strategy[] = [ + { + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }, + ]; + + vi.mocked(mockStrategyRegistry.getStrategies).mockResolvedValue(strategies); + + const registry = await InMemoryCachedStrategyRegistry.initialize( + logger, + mockStrategyRegistry, + ); + const params = { handled: true, chainId }; + const result = await registry.getStrategies(params); + + expect(result).toEqual(strategies); + expect(mockStrategyRegistry.getStrategies).toHaveBeenCalledWith(params); + }); +}); diff --git a/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts b/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts new file mode 100644 index 0000000..d782444 --- /dev/null +++ b/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts @@ -0,0 +1,94 @@ +import { Address, Hex } from "viem"; +import { describe, expect, it, vi } from "vitest"; + +import { IStrategyRepository, Strategy } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { DatabaseStrategyRegistry } from "../../src/registries/dbStrategyRegistry.js"; + +describe("DatabaseStrategyRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const mockStrategyRepository: IStrategyRepository = { + getStrategies: vi.fn(), + getStrategyByChainIdAndAddress: vi.fn(), + saveStrategy: vi.fn(), + }; + + const chainId = 1 as ChainId; + + it("return undefined for non-existent strategy address", async () => { + const registry = new DatabaseStrategyRegistry(logger, mockStrategyRepository); + const strategyAddress = "0x123" as Address; + + vi.mocked(mockStrategyRepository.getStrategyByChainIdAndAddress).mockResolvedValue( + undefined, + ); + + const strategy = await registry.getStrategyId(chainId, strategyAddress); + expect(strategy).toBeUndefined(); + expect(mockStrategyRepository.getStrategyByChainIdAndAddress).toHaveBeenCalledWith( + chainId, + strategyAddress, + ); + }); + + it("save and retrieve strategy id", async () => { + const registry = new DatabaseStrategyRegistry(logger, mockStrategyRepository); + const strategyAddress = "0x123" as Address; + const strategyId = "0xabc" as Hex; + const strategy: Strategy = { + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }; + + vi.mocked(mockStrategyRepository.saveStrategy).mockResolvedValue(); + vi.mocked(mockStrategyRepository.getStrategyByChainIdAndAddress).mockResolvedValue( + strategy, + ); + + await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); + const retrieved = await registry.getStrategyId(chainId, strategyAddress); + + expect(mockStrategyRepository.saveStrategy).toHaveBeenCalledWith(strategy); + expect(retrieved).toEqual(strategy); + }); + + it("get all strategies", async () => { + const registry = new DatabaseStrategyRegistry(logger, mockStrategyRepository); + const strategies: Strategy[] = [ + { + id: "0xabc" as Hex, + address: "0x123" as Address, + chainId: 1 as ChainId, + handled: true, + }, + { + id: "0xdef" as Hex, + address: "0x456" as Address, + chainId: 1 as ChainId, + handled: false, + }, + ]; + + vi.mocked(mockStrategyRepository.getStrategies).mockResolvedValue(strategies); + + const result = await registry.getStrategies(); + expect(result).toEqual(strategies); + }); + + it("get strategies with filters", async () => { + const registry = new DatabaseStrategyRegistry(logger, mockStrategyRepository); + const params = { handled: true, chainId: 1 as ChainId }; + + await registry.getStrategies(params); + expect(mockStrategyRepository.getStrategies).toHaveBeenCalledWith(params); + }); +}); diff --git a/packages/data-flow/test/registries/strategyRegistry.spec.ts b/packages/data-flow/test/registries/strategyRegistry.spec.ts new file mode 100644 index 0000000..80f3c2f --- /dev/null +++ b/packages/data-flow/test/registries/strategyRegistry.spec.ts @@ -0,0 +1,171 @@ +import { Address, Hex } from "viem"; +import { describe, expect, it, vi } from "vitest"; + +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { InMemoryStrategyRegistry } from "../../src/registries/strategyRegistry.js"; + +describe("InMemoryStrategyRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + const chainId = 1 as ChainId; + + it("return undefined for non-existent strategy address", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const strategyAddress = "0x123" as Address; + + const strategyId = await registry.getStrategyId(chainId, strategyAddress); + expect(strategyId).toBeUndefined(); + }); + + it("save and retrieve strategy id", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const strategyAddress = "0x123" as Address; + const strategyId = "0xabc" as Hex; + + await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); + const retrieved = await registry.getStrategyId(chainId, strategyAddress); + + expect(retrieved).toEqual({ + id: strategyId, + address: strategyAddress, + chainId, + handled: true, + }); + }); + + it("handle multiple strategy addresses independently", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const firstAddress = "0x123" as Address; + const secondAddress = "0x456" as Address; + const firstStrategyId = "0xabc" as Hex; + const secondStrategyId = "0xdef" as Hex; + + await registry.saveStrategyId(chainId, firstAddress, firstStrategyId, true); + await registry.saveStrategyId(chainId, secondAddress, secondStrategyId, true); + + const retrievedFirst = await registry.getStrategyId(chainId, firstAddress); + const retrievedSecond = await registry.getStrategyId(chainId, secondAddress); + + expect(retrievedFirst).toEqual({ + id: firstStrategyId, + address: firstAddress, + chainId, + handled: true, + }); + expect(retrievedSecond).toEqual({ + id: secondStrategyId, + address: secondAddress, + chainId, + handled: true, + }); + }); + + it("get all strategies without filters", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const firstChainId = 1 as ChainId; + const secondChainId = 5 as ChainId; + + // Add strategies to different chains with different handled status + await registry.saveStrategyId(firstChainId, "0x123" as Address, "0xabc" as Hex, true); + await registry.saveStrategyId(firstChainId, "0x456" as Address, "0xdef" as Hex, false); + await registry.saveStrategyId(secondChainId, "0x789" as Address, "0xghi" as Hex, true); + + const strategies = await registry.getStrategies(); + expect(strategies).toHaveLength(3); + expect(strategies).toEqual( + expect.arrayContaining([ + { + id: "0xabc" as Hex, + address: "0x123" as Address, + chainId: firstChainId, + handled: true, + }, + { + id: "0xdef" as Hex, + address: "0x456" as Address, + chainId: firstChainId, + handled: false, + }, + { + id: "0xghi" as Hex, + address: "0x789" as Address, + chainId: secondChainId, + handled: true, + }, + ]), + ); + }); + + it("filter strategies by chainId", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const firstChainId = 1 as ChainId; + const secondChainId = 5 as ChainId; + + await registry.saveStrategyId(firstChainId, "0x123" as Address, "0xabc" as Hex, true); + await registry.saveStrategyId(secondChainId, "0x456" as Address, "0xdef" as Hex, true); + + const strategies = await registry.getStrategies({ chainId: firstChainId }); + expect(strategies).toHaveLength(1); + expect(strategies[0]).toEqual({ + id: "0xabc" as Hex, + address: "0x123" as Address, + chainId: firstChainId, + handled: true, + }); + }); + + it("filter strategies by handled status", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const chainId = 1 as ChainId; + + await registry.saveStrategyId(chainId, "0x123" as Address, "0xabc" as Hex, true); + await registry.saveStrategyId(chainId, "0x456" as Address, "0xdef" as Hex, false); + + const handledStrategies = await registry.getStrategies({ handled: true }); + expect(handledStrategies).toHaveLength(1); + expect(handledStrategies[0]).toEqual({ + id: "0xabc" as Hex, + address: "0x123" as Address, + chainId, + handled: true, + }); + + const unhandledStrategies = await registry.getStrategies({ handled: false }); + expect(unhandledStrategies).toHaveLength(1); + expect(unhandledStrategies[0]).toEqual({ + id: "0xdef" as Hex, + address: "0x456" as Address, + chainId, + handled: false, + }); + }); + + it("filter strategies by both chainId and handled status", async () => { + const registry = new InMemoryStrategyRegistry(logger); + const firstChainId = 1 as ChainId; + const secondChainId = 5 as ChainId; + + // Add mix of strategies with different chains and handled status + await registry.saveStrategyId(firstChainId, "0x123" as Address, "0xabc" as Hex, true); + await registry.saveStrategyId(firstChainId, "0x456" as Address, "0xdef" as Hex, false); + await registry.saveStrategyId(secondChainId, "0x789" as Address, "0xghi" as Hex, true); + + const strategies = await registry.getStrategies({ + chainId: firstChainId, + handled: true, + }); + + expect(strategies).toHaveLength(1); + expect(strategies[0]).toEqual({ + id: "0xabc" as Hex, + address: "0x123" as Address, + chainId: firstChainId, + handled: true, + }); + }); +}); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index e1bd5d7..a0c4a08 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -259,10 +259,6 @@ describe("Orchestrator", { sequential: true }, () => { }); expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledTimes(1); - expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( - strategyAddress, - strategyId, - ); expect(orchestrator["eventsProcessor"].processEvent).toHaveBeenCalledWith({ ...mockEvent, strategyId, @@ -275,6 +271,79 @@ describe("Orchestrator", { sequential: true }, () => { ); }); + it("save strategyId to registry on PoolCreated event", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + + const mockEvent = createMockEvent("Allo", "PoolCreated", 1, { + strategy: strategyAddress, + poolId: "1", + profileId: "0x123", + token: "0x123", + amount: "100", + metadata: ["1", "1"], + }); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(undefined); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + chainId, + strategyAddress, + strategyId, + false, + ); + }); + + it("updates strategyId as handled=true on handled Strategy event", async () => { + const strategyAddress = "0x123" as Address; + const strategyId = + "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; + + const mockEvent = createMockEvent("Strategy", "RegisteredWithSender", 1, { + recipientId: "0x123", + data: "0x123", + sender: "0x123", + }); + + const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); + vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") + .mockResolvedValueOnce([mockEvent]) + .mockResolvedValue([]); + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ + id: strategyId, + address: strategyAddress, + chainId, + handled: false, + }); + + runPromise = orchestrator.run(abortController.signal); + + await vi.waitFor(() => { + if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); + }); + + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + chainId, + strategyAddress, + strategyId, + true, + ); + }); + const strategyEvents: Record = { RegisteredWithSender: "", DistributedWithRecipientAddress: "", @@ -351,7 +420,10 @@ describe("Orchestrator", { sequential: true }, () => { ...mockEvent, strategyId, }); - expect(mockStrategyRegistry.getStrategyId).toHaveBeenCalledWith(strategyAddress); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenCalledWith( + chainId, + strategyAddress, + ); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( @@ -399,6 +471,7 @@ describe("Orchestrator", { sequential: true }, () => { expect(orchestrator["eventsProcessor"].processEvent).not.toHaveBeenCalled(); expect(orchestrator["dataLoader"].applyChanges).not.toHaveBeenCalled(); expect(mockEventsRegistry.saveLastProcessedEvent).not.toHaveBeenCalled(); + expect(mockStrategyRegistry.saveStrategyId).not.toHaveBeenCalled(); }); it("uses cached strategy ID from registry", async () => { @@ -467,7 +540,10 @@ describe("Orchestrator", { sequential: true }, () => { ); expect(mockEvmProvider.readContract).toHaveBeenCalledTimes(1); - expect(mockStrategyRegistry.getStrategyId).toHaveBeenLastCalledWith(strategyAddress); + expect(mockStrategyRegistry.getStrategyId).toHaveBeenLastCalledWith( + chainId, + strategyAddress, + ); expect(eventsProcessorSpy).toHaveBeenLastCalledWith({ ...registeredEvent, strategyId, diff --git a/packages/data-flow/test/unit/strategyRegistry.spec.ts b/packages/data-flow/test/unit/strategyRegistry.spec.ts deleted file mode 100644 index 650a2a2..0000000 --- a/packages/data-flow/test/unit/strategyRegistry.spec.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { Address, Hex } from "viem"; -import { describe, expect, it, vi } from "vitest"; - -import { ChainId, ILogger } from "@grants-stack-indexer/shared"; - -import { InMemoryStrategyRegistry } from "../../src/registries/strategyRegistry.js"; - -describe("InMemoryStrategyRegistry", () => { - const logger: ILogger = { - debug: vi.fn(), - error: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - }; - const chainId = 1 as ChainId; - - it("return undefined for non-existent strategy address", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const strategyAddress = "0x123" as Address; - - const strategyId = await registry.getStrategyId(chainId, strategyAddress); - expect(strategyId).toBeUndefined(); - }); - - it("save and retrieve strategy id", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const strategyAddress = "0x123" as Address; - const strategyId = "0xabc" as Hex; - - await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); - const retrieved = await registry.getStrategyId(chainId, strategyAddress); - - expect(retrieved).toEqual({ - id: strategyId, - address: strategyAddress, - chainId, - handled: true, - }); - }); - - it("handle multiple strategy addresses independently", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const firstAddress = "0x123" as Address; - const secondAddress = "0x456" as Address; - const firstStrategyId = "0xabc" as Hex; - const secondStrategyId = "0xdef" as Hex; - - await registry.saveStrategyId(chainId, firstAddress, firstStrategyId, true); - await registry.saveStrategyId(chainId, secondAddress, secondStrategyId, true); - - const retrievedFirst = await registry.getStrategyId(chainId, firstAddress); - const retrievedSecond = await registry.getStrategyId(chainId, secondAddress); - - expect(retrievedFirst).toEqual({ - id: firstStrategyId, - address: firstAddress, - chainId, - handled: true, - }); - expect(retrievedSecond).toEqual({ - id: secondStrategyId, - address: secondAddress, - chainId, - handled: true, - }); - }); -}); From 3978835188af5b75becb0c8ab88b5bea24f8ce6c Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:04:58 -0300 Subject: [PATCH 4/8] docs: update readme files --- apps/processing/README.md | 3 ++- packages/data-flow/README.md | 7 ++++++- scripts/migrations/README.md | 28 +++++++++++++--------------- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/apps/processing/README.md b/apps/processing/README.md index 96f7b4f..c405a5c 100644 --- a/apps/processing/README.md +++ b/apps/processing/README.md @@ -28,7 +28,8 @@ Available options: |-----------------------------|--------------------------------------------------------------------------------------------------------------------------------|-----------|----------------------------------|-----------------------------------------------------------------| | `CHAINS` | JSON array of chain configurations | N/A | Yes | Each chain object requires: `id` (number), `name` (string), `rpcUrls` (string array). Optional: `fetchLimit` (default: 500), `fetchDelayMs` (default: 1000) | | `DATABASE_URL` | PostgreSQL Data Layer database connection URL | N/A | Yes | | -| `DATABASE_SCHEMA` | PostgreSQL Data Layer database schema name | public | Yes | | +| `DATABASE_SCHEMA` | PostgreSQL Data Layer database chain-data schema name | public | Yes | | +| `DATABASE_REGISTRIES_SCHEMA` | PostgreSQL Data Layer database registries schema name | public | Yes | | | `INDEXER_GRAPHQL_URL` | GraphQL endpoint for the indexer | N/A | Yes | | | `INDEXER_ADMIN_SECRET` | Admin secret for indexer authentication | N/A | Yes | | | `IPFS_GATEWAYS_URL` | Array of IPFS gateway URLs | N/A | Yes | Multiple gateways for redundancy | diff --git a/packages/data-flow/README.md b/packages/data-flow/README.md index 0ab5fe7..af31563 100644 --- a/packages/data-flow/README.md +++ b/packages/data-flow/README.md @@ -90,9 +90,14 @@ The `EventsProcessor` class is responsible for processing events in the processi The `EventsFetcher` class is responsible for fetching events from the blockchain. -### [StrategyRegistry](./src/strategyRegistry.ts) +### [StrategyRegistry](./src/registries/) The `StrategyRegistry` stores strategy IDs to populate strategy events with them given the Strategy address. +There are 3 implementations: + +- `InMemoryStrategyRegistry`: stores map in-memory +- `DatabaseStrategyRegistry`: persists data to database using IStrategyRepository +- `InMemoryCachedStrategyRegistry`: stores map in-memory as cache and persists to database ### [DataLoader](./src/data-loader/dataLoader.ts) diff --git a/scripts/migrations/README.md b/scripts/migrations/README.md index fb442af..1d06bcf 100644 --- a/scripts/migrations/README.md +++ b/scripts/migrations/README.md @@ -4,10 +4,12 @@ This package contains scripts for managing the database schema and migrations. ## Available Scripts -| Script | Description | -| ------------------- | --------------------------------------- | -| `script:db:migrate` | Runs all pending database migrations | -| `script:db:reset` | Drops and recreates the database schema | +| Script | Description | +| -------------------- | ------------------------------------------------------------ | +| `migrate:chain-data` | Runs all pending database migrations on chainData tables | +| `migrate:registries` | Runs all pending database migrations on registries tables | +| `reset:chain-data` | Drops and recreates the database schema on chainData tables | +| `reset:registries` | Drops and recreates the database schema on registries tables | ## Environment Setup @@ -16,17 +18,13 @@ This package contains scripts for managing the database schema and migrations. ```env # Database connection URL DATABASE_URL=postgresql://user:password@localhost:5432/mydb - -# Schema name to manage -DATABASE_SCHEMA=grants_stack ``` ### Environment Variables -| Variable | Description | Example | -| ----------------- | ------------------------- | ------------------------------------------------ | -| `DATABASE_URL` | PostgreSQL connection URL | `postgresql://user:password@localhost:5432/mydb` | -| `DATABASE_SCHEMA` | Database schema name | `grants_stack` | +| Variable | Description | Example | +| -------------- | ------------------------- | ------------------------------------------------ | +| `DATABASE_URL` | PostgreSQL connection URL | `postgresql://user:password@localhost:5432/mydb` | ## Usage @@ -41,7 +39,7 @@ pnpm install To apply all pending migrations: ```bash -pnpm script:db:migrate +pnpm migrate:chain-data --schema=schema_name ``` This will: @@ -57,7 +55,7 @@ This will: To completely reset the database schema: ```bash -pnpm script:db:reset +pnpm reset:chain-data --schema=schema_name ``` **Warning**: This will: @@ -70,10 +68,10 @@ pnpm script:db:reset ### Adding New Migrations -1. Create a new migration file in [`packages/repository/src/migrations`](../../packages//repository//migrations) +1. Create a new migration file in [`packages/repository/src/migrations`](../../packages//repository/migrations) 2. Name it using the format: `YYYYMMDDTHHmmss_description.ts` 3. Implement the `up` and `down` functions -4. Run `pnpm script:db:migrate` to apply the new migration +4. Run `pnpm migrate:chain-data` to apply the new migration Example migration file: From 6adb6664b3d5f9d7d68ca2277756f9eaa0dd43a2 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:17:30 -0300 Subject: [PATCH 5/8] feat: revert multiple schemas setup --- apps/processing/README.md | 3 +-- apps/processing/src/config/env.ts | 1 - .../services/sharedDependencies.service.ts | 2 +- package.json | 6 ++--- scripts/migrations/README.md | 20 +++++++++------- scripts/migrations/package.json | 6 ++--- scripts/migrations/src/migrateDb.script.ts | 5 ++-- .../20241029T120000_initial.ts | 2 +- .../20241210T175001_strategy_registry.ts | 2 +- scripts/migrations/src/resetDb.script.ts | 5 ++-- scripts/migrations/src/utils/parsing.ts | 24 +++++++------------ 11 files changed, 32 insertions(+), 44 deletions(-) rename scripts/migrations/src/migrations/{chainData => }/20241029T120000_initial.ts (99%) rename scripts/migrations/src/migrations/{registries => }/20241210T175001_strategy_registry.ts (95%) diff --git a/apps/processing/README.md b/apps/processing/README.md index c405a5c..96f7b4f 100644 --- a/apps/processing/README.md +++ b/apps/processing/README.md @@ -28,8 +28,7 @@ Available options: |-----------------------------|--------------------------------------------------------------------------------------------------------------------------------|-----------|----------------------------------|-----------------------------------------------------------------| | `CHAINS` | JSON array of chain configurations | N/A | Yes | Each chain object requires: `id` (number), `name` (string), `rpcUrls` (string array). Optional: `fetchLimit` (default: 500), `fetchDelayMs` (default: 1000) | | `DATABASE_URL` | PostgreSQL Data Layer database connection URL | N/A | Yes | | -| `DATABASE_SCHEMA` | PostgreSQL Data Layer database chain-data schema name | public | Yes | | -| `DATABASE_REGISTRIES_SCHEMA` | PostgreSQL Data Layer database registries schema name | public | Yes | | +| `DATABASE_SCHEMA` | PostgreSQL Data Layer database schema name | public | Yes | | | `INDEXER_GRAPHQL_URL` | GraphQL endpoint for the indexer | N/A | Yes | | | `INDEXER_ADMIN_SECRET` | Admin secret for indexer authentication | N/A | Yes | | | `IPFS_GATEWAYS_URL` | Array of IPFS gateway URLs | N/A | Yes | Multiple gateways for redundancy | diff --git a/apps/processing/src/config/env.ts b/apps/processing/src/config/env.ts index 543ec86..4a201e2 100644 --- a/apps/processing/src/config/env.ts +++ b/apps/processing/src/config/env.ts @@ -30,7 +30,6 @@ const baseSchema = z.object({ }, "Chain IDs must be unique"), DATABASE_URL: z.string(), DATABASE_SCHEMA: z.string().default("public"), - DATABASE_REGISTRIES_SCHEMA: z.string().default("public"), INDEXER_GRAPHQL_URL: z.string().url(), INDEXER_ADMIN_SECRET: z.string(), PRICING_SOURCE: z.enum(["dummy", "coingecko"]).default("coingecko"), diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 52e0c4e..612d8b6 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -76,7 +76,7 @@ export class SharedDependenciesService { ); const strategyRepository = new KyselyStrategyRepository( kyselyDatabase, - env.DATABASE_REGISTRIES_SCHEMA, + env.DATABASE_SCHEMA, ); const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize( new Logger({ className: "InMemoryCachedStrategyRegistry" }), diff --git a/package.json b/package.json index 9679466..68e8a44 100644 --- a/package.json +++ b/package.json @@ -10,17 +10,15 @@ "build": "turbo run build", "check-types": "turbo run check-types", "clean": "turbo run clean", + "db:migrate": "pnpm run --filter @grants-stack-indexer/migrations db:migrate", + "db:reset": "pnpm run --filter @grants-stack-indexer/migrations db:reset", "dev": "turbo run dev", "format": "turbo run format", "format:fix": "turbo run format:fix", "preinstall": "npx only-allow pnpm", "lint": "turbo run lint", "lint:fix": "turbo run lint:fix", - "migrate:chain-data": "pnpm run --filter @grants-stack-indexer/migrations migrate:chain-data", - "migrate:registries": "pnpm run --filter @grants-stack-indexer/migrations migrate:registries", "prepare": "husky", - "reset:chain-data": "pnpm run --filter @grants-stack-indexer/migrations reset:chain-data", - "reset:registries": "pnpm run --filter @grants-stack-indexer/migrations reset:registries", "start": "turbo run start", "test": "turbo run test", "test:cov": "turbo run test:cov", diff --git a/scripts/migrations/README.md b/scripts/migrations/README.md index 1d06bcf..1d9736b 100644 --- a/scripts/migrations/README.md +++ b/scripts/migrations/README.md @@ -4,12 +4,10 @@ This package contains scripts for managing the database schema and migrations. ## Available Scripts -| Script | Description | -| -------------------- | ------------------------------------------------------------ | -| `migrate:chain-data` | Runs all pending database migrations on chainData tables | -| `migrate:registries` | Runs all pending database migrations on registries tables | -| `reset:chain-data` | Drops and recreates the database schema on chainData tables | -| `reset:registries` | Drops and recreates the database schema on registries tables | +| Script | Description | +| ------------ | --------------------------------------- | +| `db:migrate` | Runs all pending database migrations | +| `db:reset` | Drops and recreates the database schema | ## Environment Setup @@ -39,9 +37,13 @@ pnpm install To apply all pending migrations: ```bash -pnpm migrate:chain-data --schema=schema_name +pnpm db:migrate --schema=schema_name ``` +Optional arguments: + +- `--schema` or `-s`: Database schema name where migrations are applied. Defaults to `public`. + This will: 1. Load environment variables @@ -55,7 +57,7 @@ This will: To completely reset the database schema: ```bash -pnpm reset:chain-data --schema=schema_name +pnpm db:reset --schema=schema_name ``` **Warning**: This will: @@ -71,7 +73,7 @@ pnpm reset:chain-data --schema=schema_name 1. Create a new migration file in [`packages/repository/src/migrations`](../../packages//repository/migrations) 2. Name it using the format: `YYYYMMDDTHHmmss_description.ts` 3. Implement the `up` and `down` functions -4. Run `pnpm migrate:chain-data` to apply the new migration +4. Run `pnpm db:migrate` to apply the new migration Example migration file: diff --git a/scripts/migrations/package.json b/scripts/migrations/package.json index ffe8e67..c4c3180 100644 --- a/scripts/migrations/package.json +++ b/scripts/migrations/package.json @@ -16,14 +16,12 @@ "build": "tsc -p tsconfig.build.json", "check-types": "tsc --noEmit -p ./tsconfig.json", "clean": "rm -rf dist/", + "db:migrate": "tsx src/migrateDb.script.ts", + "db:reset": "tsx src/resetDb.script.ts", "format": "prettier --check \"{src,test}/**/*.{js,ts,json}\"", "format:fix": "prettier --write \"{src,test}/**/*.{js,ts,json}\"", "lint": "eslint \"{src,test}/**/*.{js,ts,json}\"", "lint:fix": "pnpm lint --fix", - "migrate:chain-data": "tsx src/migrateDb.script.ts --folder chainData", - "migrate:registries": "tsx src/migrateDb.script.ts --folder registries", - "reset:chain-data": "tsx src/resetDb.script.ts --folder chainData", - "reset:registries": "tsx src/resetDb.script.ts --folder registries", "test": "vitest run --config vitest.config.ts --passWithNoTests", "test:cov": "vitest run --config vitest.config.ts --coverage --passWithNoTests" }, diff --git a/scripts/migrations/src/migrateDb.script.ts b/scripts/migrations/src/migrateDb.script.ts index 90bc5fa..bef2362 100644 --- a/scripts/migrations/src/migrateDb.script.ts +++ b/scripts/migrations/src/migrateDb.script.ts @@ -22,7 +22,6 @@ configDotenv(); * - DATABASE_URL: PostgreSQL connection string * * Script arguments: - * - folder: Folder name to migrate (e.g. "chainData" or "registries") * - schema: Database schema name where migrations are applied * * The script will: @@ -34,7 +33,7 @@ configDotenv(); export const main = async (): Promise => { const { DATABASE_URL } = getDatabaseConfigFromEnv(); - const { folder, schema } = parseArguments(); + const { schema } = parseArguments(); const db = createKyselyDatabase({ connectionString: DATABASE_URL, @@ -46,7 +45,7 @@ export const main = async (): Promise => { const migrationResults = await migrateToLatest({ db, schema, - migrationsFolder: getMigrationsFolder(folder), + migrationsFolder: getMigrationsFolder(), }); if (migrationResults && migrationResults?.length > 0) { diff --git a/scripts/migrations/src/migrations/chainData/20241029T120000_initial.ts b/scripts/migrations/src/migrations/20241029T120000_initial.ts similarity index 99% rename from scripts/migrations/src/migrations/chainData/20241029T120000_initial.ts rename to scripts/migrations/src/migrations/20241029T120000_initial.ts index 904a139..778d0e8 100644 --- a/scripts/migrations/src/migrations/chainData/20241029T120000_initial.ts +++ b/scripts/migrations/src/migrations/20241029T120000_initial.ts @@ -1,6 +1,6 @@ import { Kysely, sql } from "kysely"; -import { getSchemaName } from "../../utils/index.js"; +import { getSchemaName } from "../utils/index.js"; /** * The up function is called when you update your database schema to the next version and down when you go back to previous version. diff --git a/scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts b/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts similarity index 95% rename from scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts rename to scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts index 704a75d..85595e2 100644 --- a/scripts/migrations/src/migrations/registries/20241210T175001_strategy_registry.ts +++ b/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts @@ -1,6 +1,6 @@ import { Kysely } from "kysely"; -import { getSchemaName } from "../../utils/index.js"; +import { getSchemaName } from "../utils/index.js"; /** * The up function is called when you update your database schema to the next version and down when you go back to previous version. diff --git a/scripts/migrations/src/resetDb.script.ts b/scripts/migrations/src/resetDb.script.ts index b55f6a2..d5ca249 100644 --- a/scripts/migrations/src/resetDb.script.ts +++ b/scripts/migrations/src/resetDb.script.ts @@ -22,7 +22,6 @@ configDotenv(); * - DATABASE_URL: PostgreSQL connection string * * Script arguments: - * - folder: Folder name to migrate (e.g. "chainData" or "registries") * - schema: Database schema name where migrations are applied * * The script will: @@ -37,7 +36,7 @@ configDotenv(); const main = async (): Promise => { const { DATABASE_URL } = getDatabaseConfigFromEnv(); - const { folder, schema } = parseArguments(); + const { schema } = parseArguments(); const db = createKyselyDatabase({ connectionString: DATABASE_URL, @@ -49,7 +48,7 @@ const main = async (): Promise => { const resetResults = await resetDatabase({ db, schema, - migrationsFolder: getMigrationsFolder(folder), + migrationsFolder: getMigrationsFolder(), }); if (resetResults && resetResults?.length > 0) { diff --git a/scripts/migrations/src/utils/parsing.ts b/scripts/migrations/src/utils/parsing.ts index c062589..664bed0 100644 --- a/scripts/migrations/src/utils/parsing.ts +++ b/scripts/migrations/src/utils/parsing.ts @@ -4,28 +4,23 @@ import yargs from "yargs"; import { hideBin } from "yargs/helpers"; import { z } from "zod"; +const DEFAULT_SCHEMA = "public"; + const zodSchema = z.object({ - folder: z + schema: z .string() - .refine((value) => ["chainData", "registries"].includes(value), { - message: "Schema name must be either 'chainData' or 'registries'", - }) - .describe("Folder name to migrate"), - schema: z.string().describe("Database schema name where migrations are applied"), + .default(DEFAULT_SCHEMA) + .describe("Database schema name where migrations are applied"), }); export const parseArguments = (): z.infer => { return yargs(hideBin(process.argv)) - .option("folder", { - type: "string", - demandOption: true, - description: "Folder name to migrate", - }) .option("schema", { alias: "s", type: "string", demandOption: true, description: "Database schema name where migrations are applied", + default: DEFAULT_SCHEMA, }) .check((argv) => { zodSchema.parse(argv); @@ -34,15 +29,14 @@ export const parseArguments = (): z.infer => { .parseSync(); }; -export const getMigrationsFolder = (folder: string): string => { +export const getMigrationsFolder = (): string => { const migrationsFolder = path.join( path.dirname(new URL(import.meta.url).pathname), - `../migrations/${folder}`, + `../migrations`, ); - console.log("migrationsFolder", migrationsFolder); if (!existsSync(migrationsFolder)) { - throw new Error(`Migrations folder '${folder}' not found`); + throw new Error(`Migrations folder not found`); } return migrationsFolder; From aa3ee2776048258d2dccdc6b2be57f5dcdc5b8c1 Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Fri, 13 Dec 2024 15:02:32 -0300 Subject: [PATCH 6/8] fix: imports, renamings and natspecs --- .../services/sharedDependencies.service.ts | 8 +- .../unit/sharedDependencies.service.spec.ts | 1 - packages/data-flow/README.md | 1 - packages/data-flow/src/external.ts | 3 +- packages/data-flow/src/registries/index.ts | 4 +- .../{ => strategy}/cachedStrategyRegistry.ts | 13 +- .../{ => strategy}/dbStrategyRegistry.ts | 2 +- .../src/registries/strategy/index.ts | 2 + .../src/registries/strategyRegistry.ts | 58 ------ .../registries/cachedStrategyRegistry.spec.ts | 2 +- .../registries/dbStrategyRegistry.spec.ts | 2 +- .../test/registries/strategyRegistry.spec.ts | 171 ------------------ .../strategyRepository.interface.ts | 19 ++ .../kysely/strategy.repository.ts | 3 + .../20241210T175001_strategy_registry.ts | 6 - 15 files changed, 45 insertions(+), 250 deletions(-) rename packages/data-flow/src/registries/{ => strategy}/cachedStrategyRegistry.ts (83%) rename packages/data-flow/src/registries/{ => strategy}/dbStrategyRegistry.ts (96%) create mode 100644 packages/data-flow/src/registries/strategy/index.ts delete mode 100644 packages/data-flow/src/registries/strategyRegistry.ts delete mode 100644 packages/data-flow/test/registries/strategyRegistry.spec.ts diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 612d8b6..92a9eba 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -1,13 +1,11 @@ import { CoreDependencies, - InMemoryEventsRegistry, - IStrategyRegistry, -} from "@grants-stack-indexer/data-flow"; -import { DatabaseStrategyRegistry, IEventsRegistry, InMemoryCachedStrategyRegistry, -} from "@grants-stack-indexer/data-flow/dist/src/internal.js"; + InMemoryEventsRegistry, + IStrategyRegistry, +} from "@grants-stack-indexer/data-flow"; import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client"; import { IpfsProvider } from "@grants-stack-indexer/metadata"; import { PricingProviderFactory } from "@grants-stack-indexer/pricing"; diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index 31c662f..437c27e 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -38,7 +38,6 @@ vi.mock("@grants-stack-indexer/indexer-client", () => ({ EnvioIndexerClient: vi.fn(), })); -// Update the mock to handle async initialization vi.mock("@grants-stack-indexer/data-flow", () => { const mockStrategyRegistry = { getStrategies: vi.fn(), diff --git a/packages/data-flow/README.md b/packages/data-flow/README.md index af31563..d6ab601 100644 --- a/packages/data-flow/README.md +++ b/packages/data-flow/README.md @@ -95,7 +95,6 @@ The `EventsFetcher` class is responsible for fetching events from the blockchain The `StrategyRegistry` stores strategy IDs to populate strategy events with them given the Strategy address. There are 3 implementations: -- `InMemoryStrategyRegistry`: stores map in-memory - `DatabaseStrategyRegistry`: persists data to database using IStrategyRepository - `InMemoryCachedStrategyRegistry`: stores map in-memory as cache and persists to database diff --git a/packages/data-flow/src/external.ts b/packages/data-flow/src/external.ts index 28cb58e..4a22f36 100644 --- a/packages/data-flow/src/external.ts +++ b/packages/data-flow/src/external.ts @@ -1,7 +1,8 @@ export { DataLoader, InMemoryEventsRegistry, - InMemoryStrategyRegistry, + InMemoryCachedStrategyRegistry, + DatabaseStrategyRegistry, Orchestrator, } from "./internal.js"; diff --git a/packages/data-flow/src/registries/index.ts b/packages/data-flow/src/registries/index.ts index 3431897..c6319bf 100644 --- a/packages/data-flow/src/registries/index.ts +++ b/packages/data-flow/src/registries/index.ts @@ -1,3 +1 @@ -export * from "./cachedStrategyRegistry.js"; -export * from "./dbStrategyRegistry.js"; -export * from "./strategyRegistry.js"; +export * from "./strategy/index.js"; diff --git a/packages/data-flow/src/registries/cachedStrategyRegistry.ts b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts similarity index 83% rename from packages/data-flow/src/registries/cachedStrategyRegistry.ts rename to packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts index 2babeac..c89c9f9 100644 --- a/packages/data-flow/src/registries/cachedStrategyRegistry.ts +++ b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts @@ -1,7 +1,7 @@ import { Strategy } from "@grants-stack-indexer/repository"; import { Address, ChainId, Hex, ILogger } from "@grants-stack-indexer/shared"; -import { IStrategyRegistry } from "../internal.js"; +import { IStrategyRegistry } from "../../internal.js"; /** * Proxy class to cache the strategy ids in memory or fallback to another strategy registry @@ -17,10 +17,19 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { this.cache = structuredClone(cache); } + /** @inheritdoc */ async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { return this.strategyRegistry.getStrategies(params); } + /** + * Creates a new cached strategy registry instance. It will load the strategies into memory and cache them and + * fallback to the strategy registry if the strategy is not found in the cache. + * + * @param logger - The logger instance + * @param strategyRegistry - The strategy registry instance + * @returns The initialized cached strategy registry + */ static async initialize( logger: ILogger, strategyRegistry: IStrategyRegistry, @@ -40,6 +49,7 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { return new InMemoryCachedStrategyRegistry(logger, strategyRegistry, cache); } + /** @inheritdoc */ async getStrategyId(chainId: ChainId, strategyAddress: Address): Promise { const cache = this.cache.get(chainId)?.get(strategyAddress); if (cache) { @@ -53,6 +63,7 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { return strategy; } + /** @inheritdoc */ async saveStrategyId( chainId: ChainId, strategyAddress: Address, diff --git a/packages/data-flow/src/registries/dbStrategyRegistry.ts b/packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts similarity index 96% rename from packages/data-flow/src/registries/dbStrategyRegistry.ts rename to packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts index 318e6be..5dcfe50 100644 --- a/packages/data-flow/src/registries/dbStrategyRegistry.ts +++ b/packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts @@ -3,7 +3,7 @@ import type { Address, Hex } from "viem"; import { IStrategyRepository, Strategy } from "@grants-stack-indexer/repository"; import { ChainId, ILogger } from "@grants-stack-indexer/shared"; -import { IStrategyRegistry } from "../internal.js"; +import { IStrategyRegistry } from "../../internal.js"; /** * Class to store strategy ids in Database diff --git a/packages/data-flow/src/registries/strategy/index.ts b/packages/data-flow/src/registries/strategy/index.ts new file mode 100644 index 0000000..8ab6469 --- /dev/null +++ b/packages/data-flow/src/registries/strategy/index.ts @@ -0,0 +1,2 @@ +export * from "./cachedStrategyRegistry.js"; +export * from "./dbStrategyRegistry.js"; diff --git a/packages/data-flow/src/registries/strategyRegistry.ts b/packages/data-flow/src/registries/strategyRegistry.ts deleted file mode 100644 index 1ff55cf..0000000 --- a/packages/data-flow/src/registries/strategyRegistry.ts +++ /dev/null @@ -1,58 +0,0 @@ -import type { Address, Hex } from "viem"; - -import { Strategy } from "@grants-stack-indexer/repository"; -import { ChainId, ILogger } from "@grants-stack-indexer/shared"; - -import type { IStrategyRegistry } from "../internal.js"; - -/** - * Class to store strategy ids in memory - */ -export class InMemoryStrategyRegistry implements IStrategyRegistry { - private strategiesMap: Map> = new Map(); - constructor(private logger: ILogger) {} - - /** @inheritdoc */ - async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { - return Array.from(this.strategiesMap.entries()) - .filter(([chainId]) => params?.chainId === undefined || chainId === params.chainId) - .map(([chainId, strategies]) => - Array.from(strategies.entries()) - .filter( - ([_address, strategy]) => - params?.handled === undefined || strategy.handled === params.handled, - ) - .map(([address, strategy]) => ({ - id: strategy.id, - address, - chainId, - handled: strategy.handled, - })), - ) - .flat(); - } - - /** @inheritdoc */ - async getStrategyId(chainId: ChainId, strategyAddress: Address): Promise { - return this.strategiesMap.get(chainId)?.get(strategyAddress); - } - - /** @inheritdoc */ - async saveStrategyId( - chainId: ChainId, - strategyAddress: Address, - strategyId: Hex, - handled: boolean, - ): Promise { - this.logger.debug(`Saving strategy id ${strategyId} for address ${strategyAddress}`); - if (!this.strategiesMap.has(chainId)) { - this.strategiesMap.set(chainId, new Map()); - } - this.strategiesMap.get(chainId)!.set(strategyAddress, { - address: strategyAddress, - id: strategyId, - chainId, - handled, - }); - } -} diff --git a/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts b/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts index 7829786..d3edecb 100644 --- a/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts +++ b/packages/data-flow/test/registries/cachedStrategyRegistry.spec.ts @@ -5,7 +5,7 @@ import { Strategy } from "@grants-stack-indexer/repository"; import { ChainId, ILogger } from "@grants-stack-indexer/shared"; import { IStrategyRegistry } from "../../src/internal.js"; -import { InMemoryCachedStrategyRegistry } from "../../src/registries/cachedStrategyRegistry.js"; +import { InMemoryCachedStrategyRegistry } from "../../src/registries/strategy/cachedStrategyRegistry.js"; describe("InMemoryCachedStrategyRegistry", () => { const logger: ILogger = { diff --git a/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts b/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts index d782444..c80fa93 100644 --- a/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts +++ b/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts @@ -4,7 +4,7 @@ import { describe, expect, it, vi } from "vitest"; import { IStrategyRepository, Strategy } from "@grants-stack-indexer/repository"; import { ChainId, ILogger } from "@grants-stack-indexer/shared"; -import { DatabaseStrategyRegistry } from "../../src/registries/dbStrategyRegistry.js"; +import { DatabaseStrategyRegistry } from "../../src/registries/strategy/dbStrategyRegistry.js"; describe("DatabaseStrategyRegistry", () => { const logger: ILogger = { diff --git a/packages/data-flow/test/registries/strategyRegistry.spec.ts b/packages/data-flow/test/registries/strategyRegistry.spec.ts deleted file mode 100644 index 80f3c2f..0000000 --- a/packages/data-flow/test/registries/strategyRegistry.spec.ts +++ /dev/null @@ -1,171 +0,0 @@ -import { Address, Hex } from "viem"; -import { describe, expect, it, vi } from "vitest"; - -import { ChainId, ILogger } from "@grants-stack-indexer/shared"; - -import { InMemoryStrategyRegistry } from "../../src/registries/strategyRegistry.js"; - -describe("InMemoryStrategyRegistry", () => { - const logger: ILogger = { - debug: vi.fn(), - error: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - }; - const chainId = 1 as ChainId; - - it("return undefined for non-existent strategy address", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const strategyAddress = "0x123" as Address; - - const strategyId = await registry.getStrategyId(chainId, strategyAddress); - expect(strategyId).toBeUndefined(); - }); - - it("save and retrieve strategy id", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const strategyAddress = "0x123" as Address; - const strategyId = "0xabc" as Hex; - - await registry.saveStrategyId(chainId, strategyAddress, strategyId, true); - const retrieved = await registry.getStrategyId(chainId, strategyAddress); - - expect(retrieved).toEqual({ - id: strategyId, - address: strategyAddress, - chainId, - handled: true, - }); - }); - - it("handle multiple strategy addresses independently", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const firstAddress = "0x123" as Address; - const secondAddress = "0x456" as Address; - const firstStrategyId = "0xabc" as Hex; - const secondStrategyId = "0xdef" as Hex; - - await registry.saveStrategyId(chainId, firstAddress, firstStrategyId, true); - await registry.saveStrategyId(chainId, secondAddress, secondStrategyId, true); - - const retrievedFirst = await registry.getStrategyId(chainId, firstAddress); - const retrievedSecond = await registry.getStrategyId(chainId, secondAddress); - - expect(retrievedFirst).toEqual({ - id: firstStrategyId, - address: firstAddress, - chainId, - handled: true, - }); - expect(retrievedSecond).toEqual({ - id: secondStrategyId, - address: secondAddress, - chainId, - handled: true, - }); - }); - - it("get all strategies without filters", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const firstChainId = 1 as ChainId; - const secondChainId = 5 as ChainId; - - // Add strategies to different chains with different handled status - await registry.saveStrategyId(firstChainId, "0x123" as Address, "0xabc" as Hex, true); - await registry.saveStrategyId(firstChainId, "0x456" as Address, "0xdef" as Hex, false); - await registry.saveStrategyId(secondChainId, "0x789" as Address, "0xghi" as Hex, true); - - const strategies = await registry.getStrategies(); - expect(strategies).toHaveLength(3); - expect(strategies).toEqual( - expect.arrayContaining([ - { - id: "0xabc" as Hex, - address: "0x123" as Address, - chainId: firstChainId, - handled: true, - }, - { - id: "0xdef" as Hex, - address: "0x456" as Address, - chainId: firstChainId, - handled: false, - }, - { - id: "0xghi" as Hex, - address: "0x789" as Address, - chainId: secondChainId, - handled: true, - }, - ]), - ); - }); - - it("filter strategies by chainId", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const firstChainId = 1 as ChainId; - const secondChainId = 5 as ChainId; - - await registry.saveStrategyId(firstChainId, "0x123" as Address, "0xabc" as Hex, true); - await registry.saveStrategyId(secondChainId, "0x456" as Address, "0xdef" as Hex, true); - - const strategies = await registry.getStrategies({ chainId: firstChainId }); - expect(strategies).toHaveLength(1); - expect(strategies[0]).toEqual({ - id: "0xabc" as Hex, - address: "0x123" as Address, - chainId: firstChainId, - handled: true, - }); - }); - - it("filter strategies by handled status", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const chainId = 1 as ChainId; - - await registry.saveStrategyId(chainId, "0x123" as Address, "0xabc" as Hex, true); - await registry.saveStrategyId(chainId, "0x456" as Address, "0xdef" as Hex, false); - - const handledStrategies = await registry.getStrategies({ handled: true }); - expect(handledStrategies).toHaveLength(1); - expect(handledStrategies[0]).toEqual({ - id: "0xabc" as Hex, - address: "0x123" as Address, - chainId, - handled: true, - }); - - const unhandledStrategies = await registry.getStrategies({ handled: false }); - expect(unhandledStrategies).toHaveLength(1); - expect(unhandledStrategies[0]).toEqual({ - id: "0xdef" as Hex, - address: "0x456" as Address, - chainId, - handled: false, - }); - }); - - it("filter strategies by both chainId and handled status", async () => { - const registry = new InMemoryStrategyRegistry(logger); - const firstChainId = 1 as ChainId; - const secondChainId = 5 as ChainId; - - // Add mix of strategies with different chains and handled status - await registry.saveStrategyId(firstChainId, "0x123" as Address, "0xabc" as Hex, true); - await registry.saveStrategyId(firstChainId, "0x456" as Address, "0xdef" as Hex, false); - await registry.saveStrategyId(secondChainId, "0x789" as Address, "0xghi" as Hex, true); - - const strategies = await registry.getStrategies({ - chainId: firstChainId, - handled: true, - }); - - expect(strategies).toHaveLength(1); - expect(strategies[0]).toEqual({ - id: "0xabc" as Hex, - address: "0x123" as Address, - chainId: firstChainId, - handled: true, - }); - }); -}); diff --git a/packages/repository/src/interfaces/strategyRepository.interface.ts b/packages/repository/src/interfaces/strategyRepository.interface.ts index e5332f9..b9aa195 100644 --- a/packages/repository/src/interfaces/strategyRepository.interface.ts +++ b/packages/repository/src/interfaces/strategyRepository.interface.ts @@ -3,10 +3,29 @@ import { Address, ChainId } from "@grants-stack-indexer/shared"; import { Strategy } from "../internal.js"; export interface IStrategyRepository { + /** + * Retrieves a strategy by its chain ID and address. + * @param chainId - The chain ID of the strategy. + * @param strategyAddress - The address of the strategy. + * @returns A promise that resolves to the strategy object or undefined if not found. + */ getStrategyByChainIdAndAddress( chainId: ChainId, strategyAddress: Address, ): Promise; + + /** + * Saves a strategy to the repository. + * @param strategy - The strategy to save. + */ saveStrategy(strategy: Strategy): Promise; + + /** + * Retrieves all strategies from the repository. + * @param params - The parameters to filter the strategies. + * @param params.handled - Whether to include handled strategies. + * @param params.chainId - The chain ID to filter the strategies. + * @returns A promise that resolves to an array of strategies. + */ getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise; } diff --git a/packages/repository/src/repositories/kysely/strategy.repository.ts b/packages/repository/src/repositories/kysely/strategy.repository.ts index e6cb91c..d8a6124 100644 --- a/packages/repository/src/repositories/kysely/strategy.repository.ts +++ b/packages/repository/src/repositories/kysely/strategy.repository.ts @@ -11,6 +11,7 @@ export class KyselyStrategyRepository implements IStrategyRepository { private readonly schemaName: string, ) {} + /** @inheritdoc */ async getStrategyByChainIdAndAddress( chainId: ChainId, strategyAddress: Address, @@ -23,6 +24,8 @@ export class KyselyStrategyRepository implements IStrategyRepository { .selectAll() .executeTakeFirst(); } + + /** @inheritdoc */ async saveStrategy(strategy: Strategy): Promise { await this.db .withSchema(this.schemaName) diff --git a/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts b/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts index 85595e2..45a5936 100644 --- a/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts +++ b/scripts/migrations/src/migrations/20241210T175001_strategy_registry.ts @@ -1,7 +1,5 @@ import { Kysely } from "kysely"; -import { getSchemaName } from "../utils/index.js"; - /** * The up function is called when you update your database schema to the next version and down when you go back to previous version. * The only argument for the functions is an instance of Kysely. It's important to use Kysely and not Kysely. @@ -12,10 +10,6 @@ export async function up(db: Kysely): Promise { const ADDRESS_TYPE = "text"; const CHAIN_ID_TYPE = "integer"; - const schema = getSchemaName(db.schema); - - console.log("schema", schema); - await db.schema .createTable("strategies") .addColumn("address", ADDRESS_TYPE) From fe62481ed3ce1ac2d5abcd8d897a552977a2c6bf Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Fri, 13 Dec 2024 19:13:33 -0300 Subject: [PATCH 7/8] fix: rename class and argument --- .../src/services/sharedDependencies.service.ts | 4 ++-- .../src/interfaces/strategyRegistry.interface.ts | 2 +- .../registries/strategy/cachedStrategyRegistry.ts | 4 ++-- .../src/registries/strategy/dbStrategyRegistry.ts | 8 ++++---- .../test/registries/dbStrategyRegistry.spec.ts | 4 ++-- packages/repository/src/external.ts | 4 ++-- .../interfaces/strategyRepository.interface.ts | 10 +++++----- .../repository/src/repositories/kysely/index.ts | 2 +- ...pository.ts => strategyRegistry.repository.ts} | 15 ++++++++------- 9 files changed, 27 insertions(+), 26 deletions(-) rename packages/repository/src/repositories/kysely/{strategy.repository.ts => strategyRegistry.repository.ts} (72%) diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 92a9eba..b2e05ae 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -16,7 +16,7 @@ import { KyselyDonationRepository, KyselyProjectRepository, KyselyRoundRepository, - KyselyStrategyRepository, + KyselyStrategyRegistryRepository, } from "@grants-stack-indexer/repository"; import { Logger } from "@grants-stack-indexer/shared"; @@ -72,7 +72,7 @@ export class SharedDependenciesService { const eventsRegistry = new InMemoryEventsRegistry( new Logger({ className: "InMemoryEventsRegistry" }), ); - const strategyRepository = new KyselyStrategyRepository( + const strategyRepository = new KyselyStrategyRegistryRepository( kyselyDatabase, env.DATABASE_SCHEMA, ); diff --git a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts index cb187c3..965ad44 100644 --- a/packages/data-flow/src/interfaces/strategyRegistry.interface.ts +++ b/packages/data-flow/src/interfaces/strategyRegistry.interface.ts @@ -34,5 +34,5 @@ export interface IStrategyRegistry { * Get all the strategies * @returns The strategies */ - getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise; + getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise; } diff --git a/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts index c89c9f9..16899cc 100644 --- a/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts +++ b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts @@ -18,8 +18,8 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { } /** @inheritdoc */ - async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { - return this.strategyRegistry.getStrategies(params); + async getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise { + return this.strategyRegistry.getStrategies(filters); } /** diff --git a/packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts b/packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts index 5dcfe50..e29fa3f 100644 --- a/packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts +++ b/packages/data-flow/src/registries/strategy/dbStrategyRegistry.ts @@ -1,6 +1,6 @@ import type { Address, Hex } from "viem"; -import { IStrategyRepository, Strategy } from "@grants-stack-indexer/repository"; +import { IStrategyRegistryRepository, Strategy } from "@grants-stack-indexer/repository"; import { ChainId, ILogger } from "@grants-stack-indexer/shared"; import { IStrategyRegistry } from "../../internal.js"; @@ -11,12 +11,12 @@ import { IStrategyRegistry } from "../../internal.js"; export class DatabaseStrategyRegistry implements IStrategyRegistry { constructor( private logger: ILogger, - private strategyRepository: IStrategyRepository, + private strategyRepository: IStrategyRegistryRepository, ) {} /** @inheritdoc */ - async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { - return this.strategyRepository.getStrategies(params); + async getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise { + return this.strategyRepository.getStrategies(filters); } /** @inheritdoc */ diff --git a/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts b/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts index c80fa93..1974cdb 100644 --- a/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts +++ b/packages/data-flow/test/registries/dbStrategyRegistry.spec.ts @@ -1,7 +1,7 @@ import { Address, Hex } from "viem"; import { describe, expect, it, vi } from "vitest"; -import { IStrategyRepository, Strategy } from "@grants-stack-indexer/repository"; +import { IStrategyRegistryRepository, Strategy } from "@grants-stack-indexer/repository"; import { ChainId, ILogger } from "@grants-stack-indexer/shared"; import { DatabaseStrategyRegistry } from "../../src/registries/strategy/dbStrategyRegistry.js"; @@ -14,7 +14,7 @@ describe("DatabaseStrategyRegistry", () => { warn: vi.fn(), }; - const mockStrategyRepository: IStrategyRepository = { + const mockStrategyRepository: IStrategyRegistryRepository = { getStrategies: vi.fn(), getStrategyByChainIdAndAddress: vi.fn(), saveStrategy: vi.fn(), diff --git a/packages/repository/src/external.ts b/packages/repository/src/external.ts index b8f9d36..fec981b 100644 --- a/packages/repository/src/external.ts +++ b/packages/repository/src/external.ts @@ -8,7 +8,7 @@ export type { IApplicationReadRepository, IDonationRepository, IApplicationPayoutRepository, - IStrategyRepository, + IStrategyRegistryRepository, DatabaseConfig, } from "./internal.js"; @@ -53,7 +53,7 @@ export { KyselyApplicationRepository, KyselyDonationRepository, KyselyApplicationPayoutRepository, - KyselyStrategyRepository, + KyselyStrategyRegistryRepository, } from "./repositories/kysely/index.js"; export { diff --git a/packages/repository/src/interfaces/strategyRepository.interface.ts b/packages/repository/src/interfaces/strategyRepository.interface.ts index b9aa195..c58d344 100644 --- a/packages/repository/src/interfaces/strategyRepository.interface.ts +++ b/packages/repository/src/interfaces/strategyRepository.interface.ts @@ -2,7 +2,7 @@ import { Address, ChainId } from "@grants-stack-indexer/shared"; import { Strategy } from "../internal.js"; -export interface IStrategyRepository { +export interface IStrategyRegistryRepository { /** * Retrieves a strategy by its chain ID and address. * @param chainId - The chain ID of the strategy. @@ -22,10 +22,10 @@ export interface IStrategyRepository { /** * Retrieves all strategies from the repository. - * @param params - The parameters to filter the strategies. - * @param params.handled - Whether to include handled strategies. - * @param params.chainId - The chain ID to filter the strategies. + * @param filters - The parameters to filter the strategies. + * @param filters.handled - Whether to include handled strategies. + * @param filters.chainId - The chain ID to filter the strategies. * @returns A promise that resolves to an array of strategies. */ - getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise; + getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise; } diff --git a/packages/repository/src/repositories/kysely/index.ts b/packages/repository/src/repositories/kysely/index.ts index 5d79682..ae4143a 100644 --- a/packages/repository/src/repositories/kysely/index.ts +++ b/packages/repository/src/repositories/kysely/index.ts @@ -3,4 +3,4 @@ export * from "./round.repository.js"; export * from "./application.repository.js"; export * from "./donation.repository.js"; export * from "./applicationPayout.repository.js"; -export * from "./strategy.repository.js"; +export * from "./strategyRegistry.repository.js"; diff --git a/packages/repository/src/repositories/kysely/strategy.repository.ts b/packages/repository/src/repositories/kysely/strategyRegistry.repository.ts similarity index 72% rename from packages/repository/src/repositories/kysely/strategy.repository.ts rename to packages/repository/src/repositories/kysely/strategyRegistry.repository.ts index d8a6124..a251f42 100644 --- a/packages/repository/src/repositories/kysely/strategy.repository.ts +++ b/packages/repository/src/repositories/kysely/strategyRegistry.repository.ts @@ -2,10 +2,10 @@ import { Kysely } from "kysely"; import { Address, ChainId } from "@grants-stack-indexer/shared"; -import { IStrategyRepository } from "../../interfaces/index.js"; +import { IStrategyRegistryRepository } from "../../interfaces/index.js"; import { Database, Strategy } from "../../internal.js"; -export class KyselyStrategyRepository implements IStrategyRepository { +export class KyselyStrategyRegistryRepository implements IStrategyRegistryRepository { constructor( private readonly db: Kysely, private readonly schemaName: string, @@ -35,15 +35,16 @@ export class KyselyStrategyRepository implements IStrategyRepository { .execute(); } - async getStrategies(params?: { handled?: boolean; chainId?: ChainId }): Promise { + /** @inheritdoc */ + async getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise { const query = this.db.withSchema(this.schemaName).selectFrom("strategies"); - if (params?.chainId) { - query.where("chainId", "=", params.chainId); + if (filters?.chainId) { + query.where("chainId", "=", filters.chainId); } - if (params?.handled) { - query.where("handled", "=", params.handled); + if (filters?.handled) { + query.where("handled", "=", filters.handled); } return query.selectAll().execute(); From 9223013d93cbf362996699d7701917a1f39a170c Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Fri, 13 Dec 2024 19:56:12 -0300 Subject: [PATCH 8/8] feat: improve logic on saving strategy id --- .../unit/sharedDependencies.service.spec.ts | 2 +- packages/data-flow/src/orchestrator.ts | 11 +--- .../data-flow/test/unit/orchestrator.spec.ts | 65 +++++++------------ 3 files changed, 28 insertions(+), 50 deletions(-) diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index 437c27e..9d309b9 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -17,7 +17,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ KyselyApplicationRepository: vi.fn(), KyselyDonationRepository: vi.fn(), KyselyApplicationPayoutRepository: vi.fn(), - KyselyStrategyRepository: vi.fn().mockImplementation(() => ({ + KyselyStrategyRegistryRepository: vi.fn().mockImplementation(() => ({ getStrategies: vi.fn().mockResolvedValue([]), getStrategyId: vi.fn(), saveStrategyId: vi.fn(), diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index b648cbd..7b41b3c 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -114,24 +114,17 @@ export class Orchestrator { event = await this.enhanceStrategyId(event); if (this.isPoolCreated(event)) { - // we save the strategy id with handled = false because we don't know if the strategy is handled yet + const handleable = existsHandler(event.strategyId); await this.strategyRegistry.saveStrategyId( this.chainId, event.srcAddress, event.strategyId, - false, + handleable, ); } else if (event.contractName === "Strategy" && "strategyId" in event) { if (!existsHandler(event.strategyId)) { // we skip the event if the strategy id is not handled yet continue; - } else { - await this.strategyRegistry.saveStrategyId( - this.chainId, - event.srcAddress, - event.strategyId, - true, - ); } } diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index a0c4a08..0412e5a 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -141,6 +141,12 @@ describe("Orchestrator", { sequential: true }, () => { const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); + vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ + id: "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf", + address: "0x123", + chainId, + handled: false, + }); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") .mockResolvedValueOnce(mockEvents) .mockResolvedValue([]); @@ -155,6 +161,9 @@ describe("Orchestrator", { sequential: true }, () => { vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockImplementation(() => { + return Promise.resolve(); + }); runPromise = orchestrator.run(abortController.signal); @@ -273,7 +282,8 @@ describe("Orchestrator", { sequential: true }, () => { it("save strategyId to registry on PoolCreated event", async () => { const strategyAddress = "0x123" as Address; - const strategyId = + const strategyId = "0xunknown" as Hex; + const existingStrategyId = "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; const mockEvent = createMockEvent("Allo", "PoolCreated", 1, { @@ -287,10 +297,12 @@ describe("Orchestrator", { sequential: true }, () => { const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue(undefined); - vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId); + vi.spyOn(mockEvmProvider, "readContract") + .mockResolvedValueOnce(strategyId) + .mockResolvedValueOnce(existingStrategyId); vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") - .mockResolvedValueOnce([mockEvent]) + .mockResolvedValueOnce([mockEvent, mockEvent]) .mockResolvedValue([]); runPromise = orchestrator.run(abortController.signal); @@ -299,47 +311,18 @@ describe("Orchestrator", { sequential: true }, () => { if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); }); - expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenNthCalledWith( + 1, chainId, strategyAddress, strategyId, false, ); - }); - - it("updates strategyId as handled=true on handled Strategy event", async () => { - const strategyAddress = "0x123" as Address; - const strategyId = - "0x6f9291df02b2664139cec5703c124e4ebce32879c74b6297faa1468aa5ff9ebf" as Hex; - - const mockEvent = createMockEvent("Strategy", "RegisteredWithSender", 1, { - recipientId: "0x123", - data: "0x123", - sender: "0x123", - }); - - const eventsProcessorSpy = vi.spyOn(orchestrator["eventsProcessor"], "processEvent"); - vi.spyOn(mockEventsRegistry, "getLastProcessedEvent").mockResolvedValue(undefined); - vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") - .mockResolvedValueOnce([mockEvent]) - .mockResolvedValue([]); - vi.spyOn(mockStrategyRegistry, "getStrategyId").mockResolvedValue({ - id: strategyId, - address: strategyAddress, - chainId, - handled: false, - }); - - runPromise = orchestrator.run(abortController.signal); - - await vi.waitFor(() => { - if (eventsProcessorSpy.mock.calls.length < 1) throw new Error("Not yet called"); - }); - - expect(mockStrategyRegistry.saveStrategyId).toHaveBeenCalledWith( + expect(mockStrategyRegistry.saveStrategyId).toHaveBeenNthCalledWith( + 2, chainId, strategyAddress, - strategyId, + existingStrategyId, true, ); }); @@ -508,11 +491,10 @@ describe("Orchestrator", { sequential: true }, () => { chainId, handled: true, }); + vi.spyOn(mockEvmProvider, "readContract").mockResolvedValue(strategyId); vi.spyOn(mockIndexerClient, "getEventsAfterBlockNumberAndLogIndex") - .mockResolvedValueOnce([poolCreatedEvent]) - .mockResolvedValueOnce([]) - .mockResolvedValueOnce([registeredEvent]) + .mockResolvedValueOnce([poolCreatedEvent, registeredEvent]) .mockResolvedValue([]); eventsProcessorSpy.mockResolvedValue([]); @@ -526,6 +508,9 @@ describe("Orchestrator", { sequential: true }, () => { vi.spyOn(mockEventsRegistry, "saveLastProcessedEvent").mockImplementation(() => { return Promise.resolve(); }); + vi.spyOn(mockStrategyRegistry, "saveStrategyId").mockImplementation(() => { + return Promise.resolve(); + }); runPromise = orchestrator.run(abortController.signal);