Skip to content

Commit

Permalink
feat: Added fulfil_request
Browse files Browse the repository at this point in the history
  • Loading branch information
xlassix committed Aug 23, 2024
1 parent 5d8da34 commit 5dc62db
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ ROOCH_CHAIN_ID="testnet"
ROOCH_ORACLE_ADDRESS="0x0000000000000000000000000000000000000000"
ROOCH_INDEXER_CRON="*/5 * * * * *"
DATABASE_URL="file:./.db"
X_API_KEY=""
X_API_SECRET=""


# Optionals
SENTRY_DSN=""
Expand Down
8 changes: 8 additions & 0 deletions orchestrator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const baseConfig = {
sentryDSN: process.env.SENTRY_DSN ?? "",
ecdsaPrivateKey: process.env.SENTRY_DSN ?? "",
batchSize: process.env.BATCH_SIZE ?? 1000,
xApiSecret: process.env.X_API_SECRET ?? "",
xApiKey: process.env.X_API_KEY ?? "",
};
interface IEnvVars {
preferredChain: SupportedChain;
Expand All @@ -21,6 +23,8 @@ interface IEnvVars {
sentryDSN?: string;
ecdsaPrivateKey?: string;
batchSize: number;
xApiKey: string;
xApiSecret: string;
}

const envVarsSchema = Joi.object({
Expand All @@ -42,6 +46,8 @@ const envVarsSchema = Joi.object({
}),
SupportedChain.ROOCH,
),
xApiSecret: Joi.string().required(),
xApiKey: Joi.string().required(),
roochIndexerCron: Joi.string().default("*/5 * * * * *"),
sentryDSN: Joi.string().allow("", null),
ecdsaPrivateKey: Joi.string().allow("", null),
Expand All @@ -62,6 +68,8 @@ export default {
chain: envVars.preferredChain,
ecdsaPrivateKey: envVars.ecdsaPrivateKey,
sentryDSN: envVars.sentryDSN,
xApiSecret: envVars.xApiSecret,
xApiKey: envVars.xApiKey,
rooch: {
chainId: envVars.roochChainId,
oracleAddress: envVars.roochOracleAddress,
Expand Down
2 changes: 2 additions & 0 deletions orchestrator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import "dotenv/config";

import env from "./env";
import RoochIndexer from "./indexer/rooch";
import { xInstance } from "./request/twitter";
// import { log } from "./logger";

(async () => {
// Check env variables to determine which chains to subscribe to for events.
// Start cron job to check for new events from Rooch Oracles
await xInstance.requestAccessToken();

if (env.rooch.privateKey) {
// https://www.npmjs.com/package/cron#cronjob-class
Expand Down
158 changes: 143 additions & 15 deletions orchestrator/src/indexer/rooch.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
import env from "@/env";
import { log } from "@/logger";
import { xInstance } from "@/request/twitter";
import { type IRequestAdded, type JsonRpcResponse, RequestStatus, type RoochNetwork } from "@/types";
import { getRoochNodeUrl } from "@roochnetwork/rooch-sdk";
import { Args, RoochClient, Secp256k1Keypair, Transaction, getRoochNodeUrl } from "@roochnetwork/rooch-sdk";
import axios from "axios";
import prismaClient from "../../prisma";

const ALLOWED_HOST = ["x.com", "api.x.com", "twitter.com", "api.twitter.com"];

function isValidJson(jsonString: string): boolean {
if (jsonString.trim().length === 0) {
return true;
}
try {
JSON.parse(jsonString);
return true;
} catch {
return false;
}
}

export default class RoochIndexer {
private keyPair: Secp256k1Keypair;
private orchestrator: string;

constructor(
private privateKey: string,
private chainId: RoochNetwork,
private oracleAddress: string,
) {
this.keyPair = Secp256k1Keypair.fromSecretKey(this.privateKey);
this.orchestrator = `0x${this.keyPair.getSchnorrPublicKey()}`.toLowerCase();
log.info(`Rooch Indexer initialized`);
log.info(`Chain ID: ${this.chainId}`);
log.info(`Oracle Address: ${this.oracleAddress}`);
log.info(`Orchestrator Address: ${this.orchestrator}`);
}

async fetchEvents<T>(
Expand Down Expand Up @@ -42,7 +63,7 @@ export default class RoochIndexer {
},
);

log.info("Events fetched successfully", response.data);
// log.info("Events fetched successfully", response.data);

return response.data;
} catch (error) {
Expand All @@ -51,6 +72,88 @@ export default class RoochIndexer {
}
}

async sendFulfillment(data: IRequestAdded, result: string) {
const client = new RoochClient({
url: getRoochNodeUrl(this.chainId),
});
const session = await client.createSession({
sessionArgs: {
appName: "your app name",
appUrl: "your app url",
scopes: [`${this.oracleAddress}::oracles::fulfil_request`],
},
signer: this.keyPair,
});

const tx = new Transaction();
tx.callFunction({
target: `${this.oracleAddress}::oracles::fulfil_request`,
args: [Args.objectId(data.request_id), Args.string(result)],
});

return await client.signAndExecuteTransaction({
transaction: tx,
signer: session,
});
}

async processRequestAddedEvent(data: IRequestAdded) {
const token = xInstance.getAccessToken();

if (data.oracle.toLowerCase() !== this.orchestrator) {
return null;
}
const url = data.params.value.url?.includes("http") ? data.params.value.url : `https://${data.params.value.url}`;
try {
const _url = new URL(url);

if (!ALLOWED_HOST.includes(_url.hostname.toLowerCase())) {
return { status: 406, message: `${_url.hostname} is supposed by this orchestrator` };
}
} catch (err) {
return { status: 406, message: `Invalid Domain Name` };
}

try {
if (isValidJson(data.params.value.headers)) {
const request = await axios({
method: data.params.value.method,
data: data.params.value.body,
url: url,
headers: {
...JSON.parse(data.params.value.headers),
Authorization: `Bearer ${token}`,
},
});
return { status: request.status, message: request.data };
} else {
const request = await axios({
method: data.params.value.method,
data: data.params.value.body,
url: url,
headers: {
Authorization: `Bearer ${token}`,
},
});
return { status: request.status, message: request.data };
}
} catch (error) {
if (axios.isAxiosError(error)) {
// Handle Axios-specific errors
if (error.response) {
// Server responded with a status other than 2xx
return { status: error.response.status, message: error.response.data };
} else if (error.request) {
// No response received
return { status: 504, message: "No response received" };
}
} else {
// Handle non-Axios errors
return { status: 500, message: "Unexpected error" };
}
}
}

async run() {
log.info("Rooch indexer running...", Date.now());

Expand All @@ -65,23 +168,48 @@ export default class RoochIndexer {
const newRequestsEvents = await this.fetchEvents<IRequestAdded>("RequestAdded", latestCommit?.eventSeq ?? null);

if (!newRequestsEvents || "data" in newRequestsEvents) {
log.error(newRequestsEvents);
//TODO: HANDLE ERROR
return;
}

await prismaClient.events.createMany({
data: newRequestsEvents?.result.data.map((request) => ({
eventHandleId: request.event_id.event_handle_id,
eventSeq: +request.event_id.event_seq,
eventData: request.event_data,
eventType: request.event_type,
eventIndex: request.event_index,
decoded_event_data: JSON.stringify(request.decoded_event_data),
retries: 0,
status: RequestStatus.INDEXED,
})),
});

await Promise.all(
newRequestsEvents.result.data.map(async (event) => {
const data = await this.processRequestAddedEvent(event.decoded_event_data.value);
if (data) {
try {
const temp = await this.sendFulfillment(event.decoded_event_data.value, JSON.stringify(data));
await prismaClient.events.create({
data: {
eventHandleId: event.event_id.event_handle_id,
eventSeq: +event.event_id.event_seq,
eventData: event.event_data,
eventType: event.event_type,
eventIndex: event.event_index,
decoded_event_data: JSON.stringify(event.decoded_event_data),
retries: 0,
status: RequestStatus.SUCCESS,
response: JSON.stringify(data),
},
});
} catch (err) {
await prismaClient.events.create({
data: {
eventHandleId: event.event_id.event_handle_id,
eventSeq: +event.event_id.event_seq,
eventData: event.event_data,
eventType: event.event_type,
eventIndex: event.event_index,
decoded_event_data: JSON.stringify(event.decoded_event_data),
retries: 0,
status: RequestStatus.FAILED,
response: JSON.stringify(data),
},
});
}
}
}),
);
// const newFulfilmentEvents = await this.fetchEvents("FulfilmentAdded");

// Filter the events to if they're only relevant to this Oracle (Orchestrator)
Expand Down
51 changes: 51 additions & 0 deletions orchestrator/src/request/twitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import env from "@/env";
import axios from "axios";

class Twitter {
private initialized = false;
private accessToken: string | null = null;
private SERVER_DOMAIN = "api.twitter.com";

constructor(
private apiKey: string,
private apiKeySecret: string,
) {}

async requestAccessToken() {
try {
// Fetch bearer token
const response = await axios.post(
`https://${this.SERVER_DOMAIN}/oauth2/token`,
new URLSearchParams({
grant_type: "client_credentials",
}).toString(),
{
headers: {
"Content-Type": "application/x-www-form-urlencoded",
},
auth: {
username: this.apiKey,
password: this.apiKeySecret,
},
},
);
const accessToken = response.data.access_token;

this.accessToken = accessToken;
this.initialized = true;
return accessToken;
} catch (error: any) {
console.error("Error fetching bearer token:", typeof error, error.message);
throw error;
}
}

getAccessToken(): string | null {
if (!this.initialized) {
throw new Error("Class not initialized");
}
return this.accessToken;
}
}

export const xInstance = new Twitter(env.xApiKey, env.xApiSecret);
11 changes: 7 additions & 4 deletions orchestrator/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export type RoochEnv = {
oracleAddress: string;
indexerCron?: string;
};
export const ALLOWED_HOST = ["x.com", "api.x.com", "twitter.com", "api.twitter.com"];

export const RoochNetworkList = ["testnet", "devnet", "localnet"] as const;

Expand Down Expand Up @@ -41,17 +42,18 @@ interface VecValue {
vec: string[];
}

interface Value {
export interface IRequestAdded {
notify: NotifyValue;
oracle: string;
params: ParamsValue;
pick: string;
request_id: string;
}

export interface IRequestAdded {
interface IDecoded<T> {
abilities: number;
type: string;
value: Value;
value: T;
}

export interface IEvent<T> {
Expand All @@ -62,7 +64,7 @@ export interface IEvent<T> {
event_type: string;
event_data: string;
event_index: string;
decoded_event_data: T;
decoded_event_data: IDecoded<T>;
}

interface Result<T> {
Expand All @@ -80,4 +82,5 @@ export const RequestStatus = {
INVALID_URL: 3,
INVALID_PAYLOAD: 4,
UNREACHABLE: 5,
FAILED: 6,
};
7 changes: 5 additions & 2 deletions rooch/sources/oracles.move
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module verity::oracles {
pick: String, // An optional JQ string to pick the value from the response JSON data structure.
oracle: address,
response: Option<String>
// recommendation include a optional Int field for status to contain errors_code/statuscode
}

// Global params for the oracle system
Expand All @@ -48,7 +49,8 @@ module verity::oracles {
params: HTTPRequest,
pick: String, // An optional JQ string to pick the value from the response JSON data structure.
oracle: address,
notify: Option<vector<u8>>
notify: Option<vector<u8>>,
request_id: ObjectID
}

struct Fulfilment has copy, drop {
Expand Down Expand Up @@ -137,7 +139,8 @@ module verity::oracles {
params,
pick,
oracle,
notify
notify,
request_id
});

request_id
Expand Down

0 comments on commit 5dc62db

Please sign in to comment.