Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/parallel to sequential #12

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions orchestrator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ const baseConfig = {
ecdsaPrivateKey: process.env.SENTRY_DSN ?? "",
batchSize: process.env.BATCH_SIZE ?? 1000,
// Integrations
xApiSecret: process.env.X_API_SECRET ?? "",
xApiKey: process.env.X_API_KEY ?? "",
xBearerToken: process.env.X_BEARER_TOKEN ?? "",
};

interface IEnvVars {
Expand All @@ -41,8 +40,7 @@ interface IEnvVars {
sentryDSN?: string;
ecdsaPrivateKey?: string;
batchSize: number;
xApiKey: string;
xApiSecret: string;
xBearerToken: string;
}

const envVarsSchema = Joi.object({
Expand Down Expand Up @@ -91,8 +89,7 @@ const envVarsSchema = Joi.object({
aptosIndexerCron: Joi.string().default("*/5 * * * * *"),

// Integrations
xApiSecret: Joi.string().allow("").required(),
xApiKey: Joi.string().allow("").required(),
xBearerToken: Joi.string().allow("").required(),

// Common
sentryDSN: Joi.string().allow("", null),
Expand All @@ -115,8 +112,7 @@ export default {
ecdsaPrivateKey: envVars.ecdsaPrivateKey,
sentryDSN: envVars.sentryDSN,
integrations: {
xApiSecret: envVars.xApiSecret,
xApiKey: envVars.xApiKey,
xBearerToken: envVars.xBearerToken,
},
rooch: {
chainId: envVars.roochChainId,
Expand Down
6 changes: 1 addition & 5 deletions orchestrator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@ import "dotenv/config";
import env from "./env";
import AptosIndexer from "./indexer/aptos";
import RoochIndexer from "./indexer/rooch";
import { instance as xInstance } from "./integrations/xtwitter";
import { log } from "./logger";

(async () => {
// Check env variables to determine which chains to subscribe to for events.
// Start cron job to check for new events from Rooch Oracles
if (xInstance.isAvailable()) {
await xInstance.requestAccessToken();
}

if (env.rooch.privateKey && env.rooch.chainId.length > 0 && env.rooch.oracleAddress && env.chains.includes("ROOCH")) {
// https://www.npmjs.com/package/cron#cronjob-class

env.rooch.chainId.map((chain) => {
const rooch = new RoochIndexer(env.rooch.privateKey, chain, env.rooch.oracleAddress);
new CronJob(
"0 * * * *",
"*/15 * * * *",
() => {
rooch.sendUnfulfilledRequests();
},
Expand Down
32 changes: 26 additions & 6 deletions orchestrator/src/indexer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export abstract class Indexer {
}

applyAuthorizationHeader(hostname: string): string | undefined {
if (ALLOWED_HOST.includes(hostname) && xTwitterInstance.isInitialized()) {
xlassix marked this conversation as resolved.
Show resolved Hide resolved
if (ALLOWED_HOST.includes(hostname)) {
const token = xTwitterInstance.getAccessToken();
return `Bearer ${token}`;
}
Expand Down Expand Up @@ -164,21 +164,41 @@ export abstract class Indexer {

// Fetch the latest events from the Aptos Oracles Contract
const newRequestsEvents = await this.fetchRequestAddedEvents(Number(latestCommit?.eventSeq ?? 0) ?? 0);
for (let i = 0; i < newRequestsEvents.length; i++) {
try {
if (i > 0) await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate));

await Promise.all(
newRequestsEvents.map(async (event) => {
const event = newRequestsEvents[i];
const data = await this.processRequestAddedEvent(event);

if (data) {
try {
await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event
await this.save(event, data, RequestStatus.SUCCESS);
} catch (err: any) {
log.error({ err: err.message });
await this.save(event, data, RequestStatus.FAILED);
}
}
}),
);
} catch (error) {
console.error(`Error processing event ${i}:`, error);
}
}

// await Promise.all(
// newRequestsEvents.map(async (event) => {
// const data = await this.processRequestAddedEvent(event);
// if (data) {
// try {
// await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// // TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event
// await this.save(event, data, RequestStatus.SUCCESS);
// } catch (err: any) {
// log.error({ err: err.message });
// await this.save(event, data, RequestStatus.FAILED);
// }
// }
// }),
// );
}
}
30 changes: 25 additions & 5 deletions orchestrator/src/indexer/rooch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import env from "@/env";
import { instance as xTwitterInstance } from "@/integrations/xtwitter";
import { log } from "@/logger";
import type { IEvent, IRequestAdded, JsonRpcResponse, ProcessedRequestAdded, RoochNetwork } from "@/types";
import { decodeNotifyValue } from "@/util";
Expand Down Expand Up @@ -95,9 +96,26 @@ export default class RoochIndexer extends Indexer {
}
}

// Process all skipped requests concurrently
await Promise.all(
skippedRequests.map(async (event) => {
// // Process all skipped requests concurrently
// await Promise.all(
// skippedRequests.map(async (event) => {
// const data = await this.processRequestAddedEvent(event);
// if (data) {
// try {
// // Send fulfillment response
// const response = await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// log.debug({ response }); // Log the response
// } catch (err) {
// log.error({ err }); // Log any errors during fulfillment
// }
// }
// }),
// );

for (let i = 0; i < skippedRequests.length; i++) {
try {
await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate));
const event = skippedRequests[i];
const data = await this.processRequestAddedEvent(event);
if (data) {
try {
Expand All @@ -108,8 +126,10 @@ export default class RoochIndexer extends Indexer {
log.error({ err }); // Log any errors during fulfillment
}
}
}),
);
} catch (err) {
log.error({ err }); // Log any errors during fulfillment
}
}

return skippedRequests; // Return the list of processed requests
}
Expand Down
54 changes: 4 additions & 50 deletions orchestrator/src/integrations/xtwitter.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,20 @@
import env from "@/env";
import axios from "axios";

class XfkaTwitter {
private initialized = false;
private accessToken: string | null = null;
private SERVER_DOMAIN = "api.twitter.com";

constructor(
private apiKey: string,
private apiKeySecret: string,
) {}
constructor(private accessToken: string) {}

get hosts() {
return ["x.com", "api.x.com", "twitter.com", "api.twitter.com"];
}

isAvailable(): boolean {
if (this.apiKey && this.apiKeySecret) {
return true;
}
return false;
get getRequestRate() {
return 60 * 1000; //
}

isInitialized(): boolean {
return this.initialized;
}

async requestAccessToken() {
try {
// Fetch bearer token
const response = await axios.post(
`https://${this.SERVER_DOMAIN}/oauth2/token`,
new URLSearchParams({
grant_type: "client_credentials",
}).toString(),
{
headers: {
"Content-Type": "application/x-www-form-urlencoded",
},
auth: {
username: this.apiKey,
password: this.apiKeySecret,
},
},
);
const accessToken = response.data.access_token;

this.accessToken = accessToken;
this.initialized = true;
return accessToken;
} catch (error: any) {
console.error("Error fetching bearer token:", typeof error, error.message);
throw error;
}
}

getAccessToken(): string | null {
if (!this.initialized) {
throw new Error("Class not initialized");
}
return this.accessToken;
}
}

export const instance = new XfkaTwitter(env.integrations.xApiKey, env.integrations.xApiSecret);
export const instance = new XfkaTwitter(env.integrations.xBearerToken);
Loading