Skip to content

Commit

Permalink
fix jito reconnect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wphan committed Oct 9, 2024
1 parent 9473f90 commit 7057dfa
Showing 1 changed file with 63 additions and 40 deletions.
103 changes: 63 additions & 40 deletions src/bundleSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export type BundleStats = {
droppedBlockhashNotFound: number;
};

const initBundleStats: BundleStats = {
accepted: 0,
stateAuctionBidRejected: 0,
winningBatchBidRejected: 0,
simulationFailure: 0,
internalError: 0,
droppedBundle: 0,
droppedPruned: 0,
droppedBlockhashExpired: 0,
droppedBlockhashNotFound: 0,
};

export class BundleSender {
private ws: WebSocket | undefined;
private searcherClient: SearcherClient;
Expand Down Expand Up @@ -85,19 +97,7 @@ export class BundleSender {
private countDroppedbundles = 0;

private lastTipStream: TipStream | undefined;
private bundleStats: BundleStats = {
accepted: 0,
stateAuctionBidRejected: 0,
winningBatchBidRejected: 0,
simulationFailure: 0,
internalError: 0,
droppedBundle: 0,

// custom stats
droppedPruned: 0,
droppedBlockhashExpired: 0,
droppedBlockhashNotFound: 0,
};
private bundleStats: BundleStats = initBundleStats;

constructor(
private connection: Connection,
Expand Down Expand Up @@ -210,34 +210,57 @@ export class BundleSender {
`${logPrefix} Called connectJitoTipStream but this.ws is already connected, disconnecting it...`
);
this.ws.close();
return;
this.ws = undefined;
}

this.ws = new WebSocket(jitoBundlePriceEndpoint);
this.bundlesSent = 0;
this.bundleResultsReceived = 0;
logger.info(
`${logPrefix} establishing jito ws connection (${jitoBundlePriceEndpoint})`
);

this.ws.on('message', (data: string) => {
const tipStream = JSON.parse(data) as Array<TipStream>;
if (tipStream.length > 0) {
tipStream[0].ts = new Date(tipStream[0].time).getTime();
this.lastTipStream = tipStream[0];
}
});
this.ws.on('close', () => {
logger.info(
`${logPrefix}: jito ws closed ${
this.shuttingDown ? 'shutting down...' : 'reconnecting in 5s...'
}`
);
this.ws = undefined;
if (!this.shuttingDown) {
setTimeout(this.connectJitoTipStream.bind(this), 5000);
}
});
this.ws.on('error', (e) => {
logger.error(`${logPrefix}: jito ws error: ${JSON.stringify(e)}`);
});
const connect = () => {
this.ws = new WebSocket(jitoBundlePriceEndpoint);

this.ws.on('open', () => {
logger.info(
`${logPrefix} WebSocket connection established, resetting bundle stats`
);
this.bundleStats = initBundleStats;
this.bundlesSent = 0;
this.bundleResultsReceived = 0;
});

this.ws.on('message', (data: string) => {
try {
const tipStream = JSON.parse(data) as Array<TipStream>;
if (tipStream.length > 0) {
tipStream[0].ts = new Date(tipStream[0].time).getTime();
this.lastTipStream = tipStream[0];
}
} catch (error) {
logger.error(
`${logPrefix} Error parsing WebSocket message: ${error}`
);
}
});

this.ws.on('close', (code: number, reason: string) => {
logger.info(
`${logPrefix}: jito ws closed (code: ${code}, reason: ${reason}) ${
this.shuttingDown ? 'shutting down...' : 'reconnecting in 5s...'
}`
);
this.ws = undefined;
if (!this.shuttingDown) {
setTimeout(connect, 5000);
}
});

this.ws.on('error', (e) => {
logger.error(`${logPrefix}: jito ws error: ${e.message}`);
});
};

connect();
}

async subscribe() {
Expand Down Expand Up @@ -436,10 +459,9 @@ export class BundleSender {
logger.warn(
`${logPrefix} sent ${this.bundlesSent} bundles but only redeived ${this.bundleResultsReceived} results, disconnecting jito ws...`
);
// reconnect will happen on ws close message
this.ws?.close();
return;
}
this.bundlesSent++;

// +1 for tip tx, jito max is 5
let b: Bundle | Error = new Bundle(
Expand Down Expand Up @@ -474,6 +496,7 @@ export class BundleSender {
logger.info(
`${logPrefix} sent bundle with uuid ${bundleId} (tx: ${txSig}: ${ts}) ${metadata}`
);
this.bundlesSent++;
} catch (e) {
const err = e as Error;
logger.error(
Expand Down

0 comments on commit 7057dfa

Please sign in to comment.