From 5cefa3f187cb05420f6f89678ff5907aa06e87c8 Mon Sep 17 00:00:00 2001 From: bry Date: Thu, 14 Nov 2024 18:11:49 -0600 Subject: [PATCH] dont kill node process --- .../src/services/yellowstone.ts | 254 +++++++++--------- 1 file changed, 132 insertions(+), 122 deletions(-) diff --git a/packages/account-postgres-sink-service/src/services/yellowstone.ts b/packages/account-postgres-sink-service/src/services/yellowstone.ts index 95c790311..eb1f98e0d 100644 --- a/packages/account-postgres-sink-service/src/services/yellowstone.ts +++ b/packages/account-postgres-sink-service/src/services/yellowstone.ts @@ -5,7 +5,6 @@ import Client, { SubscribeUpdate, SubscribeUpdateAccount, } from "@triton-one/yellowstone-grpc"; -import retry, { Options as RetryOptions } from "async-retry"; import { FastifyInstance } from "fastify"; import { YELLOWSTONE_TOKEN, YELLOWSTONE_URL } from "../env"; import { getPluginsByAccountTypeByProgram } from "../plugins"; @@ -14,6 +13,7 @@ import { convertYellowstoneTransaction } from "../utils/convertYellowstoneTransa import { handleAccountWebhook } from "../utils/handleAccountWebhook"; import { handleTransactionWebhook } from "../utils/handleTransactionWebhook"; +const MAX_RECONNECT_ATTEMPTS = 5; export const setupYellowstone = async ( server: FastifyInstance, configs: IConfig[] @@ -22,142 +22,152 @@ export const setupYellowstone = async ( throw new Error("YELLOWSTONE_TOKEN undefined"); } + let isReconnecting = false; const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram( configs ); - const connect = async () => { - await retry( - async (bail, attempt) => { - const client = new Client(YELLOWSTONE_URL, YELLOWSTONE_TOKEN, { - "grpc.max_receive_message_length": 2065853043, - "grpc.keepalive_time_ms": 10000, - "grpc.keepalive_timeout_ms": 5000, - "grpc.keepalive_permit_without_calls": 1, - }); + const connect = async (attemptCount = 0) => { + if (attemptCount >= MAX_RECONNECT_ATTEMPTS) { + console.error( + `Yellowstone failed to connect after ${MAX_RECONNECT_ATTEMPTS} attempts.` + ); + process.exit(1); + } + const client = new Client(YELLOWSTONE_URL, YELLOWSTONE_TOKEN, { + "grpc.max_receive_message_length": 2065853043, + "grpc.keepalive_time_ms": 10000, + "grpc.keepalive_timeout_ms": 5000, + "grpc.keepalive_permit_without_calls": 1, + }); + + try { + const stream = await client.subscribe(); + console.log("Connected to Yellowstone"); + attemptCount = 0; + isReconnecting = false; + + stream.on("data", async (data: SubscribeUpdate) => { try { - const stream = await client.subscribe(); - console.log("Connected to Yellowstone"); - - stream.on("data", async (data: SubscribeUpdate) => { - try { - if (data.transaction) { - const transaction = await convertYellowstoneTransaction( - data.transaction.transaction - ); - - if (transaction) { - try { - await handleTransactionWebhook({ - fastify: server, - configs, - transaction, - }); - } catch (err) { - console.error(err); - } - } + if (data.transaction) { + const transaction = await convertYellowstoneTransaction( + data.transaction.transaction + ); + + if (transaction) { + try { + await handleTransactionWebhook({ + fastify: server, + configs, + transaction, + }); + } catch (err) { + console.error(err); } + } + } - if (data.account) { - const account = (data.account as SubscribeUpdateAccount) - ?.account; - if (account && configs) { - const owner = new PublicKey(account.owner).toBase58(); - const config = configs.find((x) => x.programId === owner); - - if (config) { - try { - await handleAccountWebhook({ - fastify: server, - programId: new PublicKey(config.programId), - accounts: config.accounts, - account: { - ...account, - pubkey: new PublicKey(account.pubkey).toBase58(), - data: [account.data], - }, - pluginsByAccountType: - pluginsByAccountTypeByProgram[owner] || {}, - }); - } catch (err) { - console.error(err); - } - } + if (data.account) { + const account = (data.account as SubscribeUpdateAccount)?.account; + if (account && configs) { + const owner = new PublicKey(account.owner).toBase58(); + const config = configs.find((x) => x.programId === owner); + + if (config) { + try { + await handleAccountWebhook({ + fastify: server, + programId: new PublicKey(config.programId), + accounts: config.accounts, + account: { + ...account, + pubkey: new PublicKey(account.pubkey).toBase58(), + data: [account.data], + }, + pluginsByAccountType: + pluginsByAccountTypeByProgram[owner] || {}, + }); + } catch (err) { + console.error(err); } } - } catch (err) { - console.error("Yellowstone: Error processing data:", err); - } - }); - - const request: SubscribeRequest = { - accounts: { - client: { - owner: configs.map((c) => c.programId), - account: [], - filters: [], - }, - }, - slots: {}, - transactions: { - client: { - vote: false, - failed: false, - accountInclude: configs.map((c) => c.programId), - accountExclude: [], - accountRequired: [], - }, - }, - entry: {}, - blocks: {}, - blocksMeta: {}, - accountsDataSlice: [], - ping: undefined, - commitment: CommitmentLevel.CONFIRMED, - }; - - stream.write(request, (err: any) => { - if (err) { - console.error(`Failed to write initial request: ${err}`); - throw err; } - }); - - stream.on("error", (err) => { - console.error("Yellowstone stream error:", err); - throw err; - }); - - stream.on("end", () => { - console.log("Yellowstone stream ended"); - throw new Error("Stream ended"); - }); - - stream.on("close", () => { - console.log("Yellowstone stream closed"); - throw new Error("Stream closed"); - }); - } catch (err) { - console.log( - `Yellowstone connection error on attempt ${attempt}:`, - err - ); - if (attempt >= 5) { - bail(new Error(`Yellowstone failed to connect after 5 attempts.`)); } + } catch (err) { + console.error("Yellowstone: Error processing data:", err); } - }, - { - retries: 5, - factor: 1.1, // Small incremental delay - minTimeout: 0, // First retry with no delay - maxTimeout: 2000, // Cap max delay at 2 seconds - onRetry: (_, attempt) => - console.log(`Attempting to reconnect (attempt ${attempt} of 5)`), + }); + + const request: SubscribeRequest = { + accounts: { + client: { + owner: configs.map((c) => c.programId), + account: [], + filters: [], + }, + }, + slots: {}, + transactions: { + client: { + vote: false, + failed: false, + accountInclude: configs.map((c) => c.programId), + accountExclude: [], + accountRequired: [], + }, + }, + entry: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + ping: undefined, + commitment: CommitmentLevel.CONFIRMED, + }; + + stream.write(request, (err: any) => { + if (err) { + console.error(`Failed to write initial request: ${err}`); + stream.end(); + } + }); + + stream.on("error", (err) => { + console.error("Yellowstone stream error:", err); + stream.end(); + }); + + stream.on("end", () => { + console.log("Yellowstone stream ended"); + if (!isReconnecting) { + isReconnecting = true; + handleReconnect(attemptCount + 1); + } + }); + + stream.on("close", () => { + console.log("Yellowstone stream closed"); + if (!isReconnecting) { + isReconnecting = true; + handleReconnect(attemptCount + 1); + } + }); + } catch (err) { + console.log("Yellowstone connection error:", err); + if (!isReconnecting) { + isReconnecting = true; + handleReconnect(attemptCount + 1); } + } + }; + + const handleReconnect = async (nextAttempt: number) => { + console.log( + `Attempting to reconnect (attempt ${nextAttempt} of ${MAX_RECONNECT_ATTEMPTS})...` ); + + const delay = nextAttempt === 1 ? 0 : 1000; + setTimeout(() => connect(nextAttempt), delay); }; await connect();