Skip to content

Commit

Permalink
Merge pull request #6 from etherisc/feature/refactor-event-stream
Browse files Browse the repository at this point in the history
Feature/refactor-event-stream
  • Loading branch information
doerfli authored Nov 15, 2024
2 parents 0e6d89f + 4d197db commit 9cb5bc0
Show file tree
Hide file tree
Showing 13 changed files with 479 additions and 146 deletions.
File renamed without changes.
23 changes: 23 additions & 0 deletions prisma/migrations/20241115163701_init/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- CreateTable
CREATE TABLE "Policy" (
"nftId" BIGINT NOT NULL,
"productNftId" BIGINT NOT NULL,
"bundleNftId" BIGINT NOT NULL,
"riskId" TEXT NOT NULL,
"referralId" TEXT NOT NULL,
"sumInsuredAmount" BIGINT NOT NULL,
"premiumAmount" BIGINT NOT NULL,
"premiumPaid" BIGINT NOT NULL,
"lifetime" INTEGER NOT NULL,
"activateAt" INTEGER NOT NULL,
"created_blockNumber" INTEGER NOT NULL,
"created_timestamp" INTEGER NOT NULL,
"created_txHash" TEXT NOT NULL,
"created_from" TEXT NOT NULL,
"modified_blockNumber" INTEGER NOT NULL,
"modified_timestamp" INTEGER NOT NULL,
"modified_txHash" TEXT NOT NULL,
"modified_from" TEXT NOT NULL,

CONSTRAINT "Policy_pkey" PRIMARY KEY ("nftId")
);
3 changes: 3 additions & 0 deletions prisma/migrations/20241115164122_init/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Policy" ALTER COLUMN "lifetime" SET DATA TYPE BIGINT,
ALTER COLUMN "activateAt" SET DATA TYPE BIGINT;
3 changes: 3 additions & 0 deletions prisma/migrations/20241115164640_init/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Policy" ALTER COLUMN "created_timestamp" SET DATA TYPE BIGINT,
ALTER COLUMN "modified_timestamp" SET DATA TYPE BIGINT;
23 changes: 22 additions & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,25 @@ model Instance {
modified_blockNumber Int
modified_txHash String
modified_from String
}
}

model Policy {
nftId BigInt @id
productNftId BigInt
bundleNftId BigInt
riskId String
referralId String
sumInsuredAmount BigInt
premiumAmount BigInt
premiumPaid BigInt
lifetime BigInt
activateAt BigInt
created_blockNumber Int
created_timestamp BigInt
created_txHash String
created_from String
modified_blockNumber Int
modified_timestamp BigInt
modified_txHash String
modified_from String
}
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export const DUNE_QUERY_ID_BASE_LATEST_BLOCK = process.env.DUNE_QUERY_ID_BASE_LA
export const DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS = process.env.DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS || "4283531";
export const DUNE_QUERY_ID_NFT_TRANSFER_EVENTS = process.env.DUNE_QUERY_ID_NFT_TRANSFER_EVENTS || "4283862";
export const DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS = process.env.DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS || "4284748";
export const DUNE_QUERY_ID_GIF_EVENTS = process.env.DUNE_QUERY_ID_GIF_EVENTS || "4287183";
9 changes: 8 additions & 1 deletion src/dune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,16 @@ export default class DuneApi {
if (totalRowCount === 0) {
totalRowCount = response.data.result.metadata.total_row_count;
}
const rowCount = response.data.result.metadata.row_count;
if (rowCount === 0) {
break;
}
if (rowCount !== response.data.result.rows.length) {
throw new Error(`Row count mismatch expected: ${rowCount} effective: ${response.data.result.rows.length}`);
}
rows.push(...response.data.result.rows);
offset += limit;
} while (totalRowCount > 0 && offset * limit < totalRowCount)
} while (totalRowCount > 0 && offset < totalRowCount)

logger.info(`Fetched ${rows.length} rows`);

Expand Down
63 changes: 34 additions & 29 deletions src/instance_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,41 @@ export default class InstanceProcessor {
}
}

async processInstanceServiceEvents(instanceEvents: Array<DecodedLogEntry>): Promise<Array<Instance>> {
return instanceEvents.map(event => {
logger.info(`Processing instance service event ${event.tx_hash} - ${event.event_name} - ${event.data}`);
const data = this.decodeIInstanceServiceEvent(event);
if (data === null || data === undefined) {
logger.error(`Failed to decode event ${event.tx_hash} - ${event.event_name} - ${event.data}`);
return null as unknown as Instance;
}
if (data.name !== 'LogInstanceServiceInstanceCreated') {
return null as unknown as Instance;
}
async processInstanceServiceEvent(event: DecodedLogEntry, instances: Map<BigInt, Instance>): Promise<Map<BigInt, Instance>> {
if (event.event_name !== 'LogInstanceServiceInstanceCreated') {
throw new Error(`Invalid event type ${event.event_name}`);
}

logger.debug(`args: ${JSON.stringify(data.args)}`);
const nftId = data.args[0] as BigInt;
const instanceAddress = data.args[1] as string;
return {
nftId,
instanceAddress,
created: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
},
modified: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
}
} as Instance;
}).filter(event => event !== null);
logger.debug(`Processing instance service instance created event`);

const data = this.decodeIInstanceServiceEvent(event);
if (data === null || data === undefined) {
logger.error(`Failed to decode event ${event.tx_hash} - ${event.event_name} - ${event.data}`);
return instances;
}
if (data.name !== 'LogInstanceServiceInstanceCreated') {
throw new Error(`Invalid event name ${data.name}`);
}

logger.debug(`args: ${JSON.stringify(data.args)}`);
const nftId = data.args[0] as BigInt;
const instanceAddress = data.args[1] as string;
const instance = {
nftId,
instanceAddress,
created: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
},
modified: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
}
} as Instance;
instances.set(nftId, instance);
return instances;
}


Expand Down
82 changes: 60 additions & 22 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { PrismaClient } from '@prisma/client';
import axios from 'axios';
import * as dotenv from 'dotenv';
import { DUNE_API_BASE_URL, DUNE_API_KEY, DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS, DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS, DUNE_QUERY_ID_NFT_TRANSFER_EVENTS } from './constants';
import { DUNE_API_BASE_URL, DUNE_API_KEY, DUNE_QUERY_ID_GIF_EVENTS, DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS, DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS, DUNE_QUERY_ID_NFT_TRANSFER_EVENTS } from './constants';
import InstanceProcessor from './instance_processor';
import { logger } from './logger';
import NftProcessor from './nft_processor';
import { ObjectType } from './types/objecttype';
import DuneApi from './dune';
import { DecodedLogEntry } from './types/logdata';
import { Nft } from './types/nft';
import { Instance } from './types/instance';
import PolicyProcessor from './policy_processor';
import { Policy } from './types/policy';

dotenv.config();

Expand All @@ -20,37 +25,70 @@ class Main {
private dune: DuneApi;
private nftProcessor: NftProcessor;
private instanceProcessor: InstanceProcessor;
private policyProcessor: PolicyProcessor;

constructor(prisma: PrismaClient) {
this.dune = new DuneApi();
this.nftProcessor = new NftProcessor(prisma);
this.instanceProcessor = new InstanceProcessor(prisma);

this.policyProcessor = new PolicyProcessor(prisma);
}

public async main(): Promise<void> {
const nftRegistrationEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS, 0);
const nftTransferEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_NFT_TRANSFER_EVENTS, 0);

let nfts = await this.nftProcessor.processNftRegistrationEvents(nftRegistrationEvents);
nfts = await this.nftProcessor.processNftTransferEvents(nftTransferEvents, nfts);
await this.nftProcessor.persistNfts(nfts);

// print one log per event
nfts.forEach(event => {
logger.info(`NFT: ${event.nftId} - ${ObjectType[event.objectType]} - ${event.objectAddress} - ${event.owner}`);
});

const instanceEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS, 0);
const instances = await this.instanceProcessor.processInstanceServiceEvents(instanceEvents);
await this.instanceProcessor.persistInstances(instances);

// print one log per event
instances.forEach(event => {
logger.info(`Instance: ${event.nftId} - ${event.instanceAddress}`);
});
const gifEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_GIF_EVENTS, 0);
const { nfts, instances, policies } = await this.parseGifEvents(gifEvents);

await this.nftProcessor.persistNfts(Array.from(nfts.values()));
await this.instanceProcessor.persistInstances(Array.from(instances.values()));
await this.policyProcessor.persistPolicies(Array.from(policies.values()));

for (const nft of nfts.values()) {
logger.info(`NFT: ${nft.nftId} - ${ObjectType[nft.objectType]} - ${nft.objectAddress} - ${nft.owner}`);
};

for (const instance of instances.values()) {
logger.info(`Instance: ${instance.nftId} - ${instance.instanceAddress}`);
}

for (const policy of policies.values()) {
logger.info(`Policy: ${policy.nftId} - ${policy.riskId} - ${policy.sumInsuredAmount}`);
}
}

async parseGifEvents(gifEvents: Array<DecodedLogEntry>)
: Promise<{ nfts: Map<BigInt, Nft>, instances: Map<BigInt, Instance>, policies: Map<BigInt, Policy> }>
{
const nfts = new Map<BigInt, Nft>();
const instances = new Map<BigInt, Instance>();
const policies = new Map<BigInt, Policy>();

for (const event of gifEvents) {
// logger.debug(`Processing gif event ${event.tx_hash} - ${event.block_number} - ${event.event_name}`);

switch (event.event_name) {
case 'Transfer':
await this.nftProcessor.processNftTransferEvent(event, nfts);
break;
case 'LogRegistryObjectRegistered':
await this.nftProcessor.processNftRegistrationEvent(event, nfts);
break;
case 'LogInstanceServiceInstanceCreated':
await this.instanceProcessor.processInstanceServiceEvent(event, instances);
break;
case 'LogApplicationServiceApplicationCreated':
await this.policyProcessor.processApplicationCreatedEvent(event, policies);
break;
case 'LogPolicyServicePolicyCreated':
await this.policyProcessor.processPolicyCreatedEvent(event, policies);
break;
case 'LogPolicyServicePolicyPremiumCollected':
await this.policyProcessor.processPolicyPremiumCollectedEvent(event, policies);
break;
}
}

return { nfts, instances, policies };
}
}

const prisma = new PrismaClient()
Expand Down
Loading

0 comments on commit 9cb5bc0

Please sign in to comment.