Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/refactor-event-stream #6

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
23 changes: 23 additions & 0 deletions prisma/migrations/20241115163701_init/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- CreateTable
CREATE TABLE "Policy" (
"nftId" BIGINT NOT NULL,
"productNftId" BIGINT NOT NULL,
"bundleNftId" BIGINT NOT NULL,
"riskId" TEXT NOT NULL,
"referralId" TEXT NOT NULL,
"sumInsuredAmount" BIGINT NOT NULL,
"premiumAmount" BIGINT NOT NULL,
"premiumPaid" BIGINT NOT NULL,
"lifetime" INTEGER NOT NULL,
"activateAt" INTEGER NOT NULL,
"created_blockNumber" INTEGER NOT NULL,
"created_timestamp" INTEGER NOT NULL,
"created_txHash" TEXT NOT NULL,
"created_from" TEXT NOT NULL,
"modified_blockNumber" INTEGER NOT NULL,
"modified_timestamp" INTEGER NOT NULL,
"modified_txHash" TEXT NOT NULL,
"modified_from" TEXT NOT NULL,

CONSTRAINT "Policy_pkey" PRIMARY KEY ("nftId")
);
3 changes: 3 additions & 0 deletions prisma/migrations/20241115164122_init/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Policy" ALTER COLUMN "lifetime" SET DATA TYPE BIGINT,
ALTER COLUMN "activateAt" SET DATA TYPE BIGINT;
3 changes: 3 additions & 0 deletions prisma/migrations/20241115164640_init/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Policy" ALTER COLUMN "created_timestamp" SET DATA TYPE BIGINT,
ALTER COLUMN "modified_timestamp" SET DATA TYPE BIGINT;
23 changes: 22 additions & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,25 @@ model Instance {
modified_blockNumber Int
modified_txHash String
modified_from String
}
}

model Policy {
nftId BigInt @id
productNftId BigInt
bundleNftId BigInt
riskId String
referralId String
sumInsuredAmount BigInt
premiumAmount BigInt
premiumPaid BigInt
lifetime BigInt
activateAt BigInt
created_blockNumber Int
created_timestamp BigInt
created_txHash String
created_from String
modified_blockNumber Int
modified_timestamp BigInt
modified_txHash String
modified_from String
}
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export const DUNE_QUERY_ID_BASE_LATEST_BLOCK = process.env.DUNE_QUERY_ID_BASE_LA
export const DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS = process.env.DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS || "4283531";
export const DUNE_QUERY_ID_NFT_TRANSFER_EVENTS = process.env.DUNE_QUERY_ID_NFT_TRANSFER_EVENTS || "4283862";
export const DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS = process.env.DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS || "4284748";
export const DUNE_QUERY_ID_GIF_EVENTS = process.env.DUNE_QUERY_ID_GIF_EVENTS || "4287183";
9 changes: 8 additions & 1 deletion src/dune.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,16 @@ export default class DuneApi {
if (totalRowCount === 0) {
totalRowCount = response.data.result.metadata.total_row_count;
}
const rowCount = response.data.result.metadata.row_count;
if (rowCount === 0) {
break;
}
if (rowCount !== response.data.result.rows.length) {
throw new Error(`Row count mismatch expected: ${rowCount} effective: ${response.data.result.rows.length}`);
}
rows.push(...response.data.result.rows);
offset += limit;
} while (totalRowCount > 0 && offset * limit < totalRowCount)
} while (totalRowCount > 0 && offset < totalRowCount)

logger.info(`Fetched ${rows.length} rows`);

Expand Down
63 changes: 34 additions & 29 deletions src/instance_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,41 @@ export default class InstanceProcessor {
}
}

async processInstanceServiceEvents(instanceEvents: Array<DecodedLogEntry>): Promise<Array<Instance>> {
return instanceEvents.map(event => {
logger.info(`Processing instance service event ${event.tx_hash} - ${event.event_name} - ${event.data}`);
const data = this.decodeIInstanceServiceEvent(event);
if (data === null || data === undefined) {
logger.error(`Failed to decode event ${event.tx_hash} - ${event.event_name} - ${event.data}`);
return null as unknown as Instance;
}
if (data.name !== 'LogInstanceServiceInstanceCreated') {
return null as unknown as Instance;
}
async processInstanceServiceEvent(event: DecodedLogEntry, instances: Map<BigInt, Instance>): Promise<Map<BigInt, Instance>> {
if (event.event_name !== 'LogInstanceServiceInstanceCreated') {
throw new Error(`Invalid event type ${event.event_name}`);
}

logger.debug(`args: ${JSON.stringify(data.args)}`);
const nftId = data.args[0] as BigInt;
const instanceAddress = data.args[1] as string;
return {
nftId,
instanceAddress,
created: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
},
modified: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
}
} as Instance;
}).filter(event => event !== null);
logger.debug(`Processing instance service instance created event`);

const data = this.decodeIInstanceServiceEvent(event);
if (data === null || data === undefined) {
logger.error(`Failed to decode event ${event.tx_hash} - ${event.event_name} - ${event.data}`);
return instances;
}
if (data.name !== 'LogInstanceServiceInstanceCreated') {
throw new Error(`Invalid event name ${data.name}`);
}

logger.debug(`args: ${JSON.stringify(data.args)}`);
const nftId = data.args[0] as BigInt;
const instanceAddress = data.args[1] as string;
const instance = {
nftId,
instanceAddress,
created: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
},
modified: {
blockNumber: event.block_number,
txHash: event.tx_hash,
from: event.tx_from
}
} as Instance;
instances.set(nftId, instance);
return instances;
}


Expand Down
82 changes: 60 additions & 22 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { PrismaClient } from '@prisma/client';
import axios from 'axios';
import * as dotenv from 'dotenv';
import { DUNE_API_BASE_URL, DUNE_API_KEY, DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS, DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS, DUNE_QUERY_ID_NFT_TRANSFER_EVENTS } from './constants';
import { DUNE_API_BASE_URL, DUNE_API_KEY, DUNE_QUERY_ID_GIF_EVENTS, DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS, DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS, DUNE_QUERY_ID_NFT_TRANSFER_EVENTS } from './constants';
import InstanceProcessor from './instance_processor';
import { logger } from './logger';
import NftProcessor from './nft_processor';
import { ObjectType } from './types/objecttype';
import DuneApi from './dune';
import { DecodedLogEntry } from './types/logdata';
import { Nft } from './types/nft';
import { Instance } from './types/instance';
import PolicyProcessor from './policy_processor';
import { Policy } from './types/policy';

dotenv.config();

Expand All @@ -20,37 +25,70 @@ class Main {
private dune: DuneApi;
private nftProcessor: NftProcessor;
private instanceProcessor: InstanceProcessor;
private policyProcessor: PolicyProcessor;

constructor(prisma: PrismaClient) {
this.dune = new DuneApi();
this.nftProcessor = new NftProcessor(prisma);
this.instanceProcessor = new InstanceProcessor(prisma);

this.policyProcessor = new PolicyProcessor(prisma);
}

public async main(): Promise<void> {
const nftRegistrationEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_NFT_REGISTRATION_EVENTS, 0);
const nftTransferEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_NFT_TRANSFER_EVENTS, 0);

let nfts = await this.nftProcessor.processNftRegistrationEvents(nftRegistrationEvents);
nfts = await this.nftProcessor.processNftTransferEvents(nftTransferEvents, nfts);
await this.nftProcessor.persistNfts(nfts);

// print one log per event
nfts.forEach(event => {
logger.info(`NFT: ${event.nftId} - ${ObjectType[event.objectType]} - ${event.objectAddress} - ${event.owner}`);
});

const instanceEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_INSTANCE_SERVICE_EVENTS, 0);
const instances = await this.instanceProcessor.processInstanceServiceEvents(instanceEvents);
await this.instanceProcessor.persistInstances(instances);

// print one log per event
instances.forEach(event => {
logger.info(`Instance: ${event.nftId} - ${event.instanceAddress}`);
});
const gifEvents = await this.dune.getLatestResult(DUNE_QUERY_ID_GIF_EVENTS, 0);
const { nfts, instances, policies } = await this.parseGifEvents(gifEvents);

await this.nftProcessor.persistNfts(Array.from(nfts.values()));
await this.instanceProcessor.persistInstances(Array.from(instances.values()));
await this.policyProcessor.persistPolicies(Array.from(policies.values()));

for (const nft of nfts.values()) {
logger.info(`NFT: ${nft.nftId} - ${ObjectType[nft.objectType]} - ${nft.objectAddress} - ${nft.owner}`);
};

for (const instance of instances.values()) {
logger.info(`Instance: ${instance.nftId} - ${instance.instanceAddress}`);
}

for (const policy of policies.values()) {
logger.info(`Policy: ${policy.nftId} - ${policy.riskId} - ${policy.sumInsuredAmount}`);
}
}

async parseGifEvents(gifEvents: Array<DecodedLogEntry>)
: Promise<{ nfts: Map<BigInt, Nft>, instances: Map<BigInt, Instance>, policies: Map<BigInt, Policy> }>
{
const nfts = new Map<BigInt, Nft>();
const instances = new Map<BigInt, Instance>();
const policies = new Map<BigInt, Policy>();

for (const event of gifEvents) {
// logger.debug(`Processing gif event ${event.tx_hash} - ${event.block_number} - ${event.event_name}`);

switch (event.event_name) {
case 'Transfer':
await this.nftProcessor.processNftTransferEvent(event, nfts);
break;
case 'LogRegistryObjectRegistered':
await this.nftProcessor.processNftRegistrationEvent(event, nfts);
break;
case 'LogInstanceServiceInstanceCreated':
await this.instanceProcessor.processInstanceServiceEvent(event, instances);
break;
case 'LogApplicationServiceApplicationCreated':
await this.policyProcessor.processApplicationCreatedEvent(event, policies);
break;
case 'LogPolicyServicePolicyCreated':
await this.policyProcessor.processPolicyCreatedEvent(event, policies);
break;
case 'LogPolicyServicePolicyPremiumCollected':
await this.policyProcessor.processPolicyPremiumCollectedEvent(event, policies);
break;
}
}

return { nfts, instances, policies };
}
}

const prisma = new PrismaClient()
Expand Down
Loading
Loading