Skip to content

Commit

Permalink
Merge pull request #602 from drift-labs/ws-resub
Browse files Browse the repository at this point in the history
resub after timeout
  • Loading branch information
NourAlharithi authored Sep 19, 2023
2 parents 2080219 + 4bf8f8c commit 467bc34
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 64 deletions.
9 changes: 8 additions & 1 deletion sdk/src/accounts/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from '../types';
import StrictEventEmitter from 'strict-event-emitter-types';
import { EventEmitter } from 'events';
import { PublicKey } from '@solana/web3.js';
import { Context, PublicKey } from '@solana/web3.js';
import { Account } from '@solana/spl-token';
import { OracleInfo, OraclePriceData } from '..';

Expand All @@ -21,6 +21,13 @@ export interface AccountSubscriber<T> {
setData(userAccount: T, slot?: number): void;
}

export interface ProgramAccountSubscriber<T> {
subscribe(
onChange: (accountId: PublicKey, data: T, context: Context) => void
): Promise<void>;
unsubscribe(): Promise<void>;
}

export class NotSubscribedError extends Error {
name = 'NotSubscribedError';
}
Expand Down
38 changes: 36 additions & 2 deletions sdk/src/accounts/webSocketAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
decodeBufferFn: (buffer: Buffer) => T;
onChange: (data: T) => void;
listenerId?: number;
resubTimeoutMs?: number;
timeoutId?: NodeJS.Timeout;

receivingData: boolean;

public constructor(
accountName: string,
program: Program,
accountPublicKey: PublicKey,
decodeBuffer?: (buffer: Buffer) => T
decodeBuffer?: (buffer: Buffer) => T,
resubTimeoutMs?: number
) {
this.accountName = accountName;
this.program = program;
this.accountPublicKey = accountPublicKey;
this.decodeBufferFn = decodeBuffer;
this.resubTimeoutMs = resubTimeoutMs;
this.receivingData = false;
}

async subscribe(onChange: (data: T) => void): Promise<void> {
Expand All @@ -39,10 +46,21 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
this.listenerId = this.program.provider.connection.onAccountChange(
this.accountPublicKey,
(accountInfo, context) => {
this.handleRpcResponse(context, accountInfo);
if (this.resubTimeoutMs) {
this.receivingData = true;
clearTimeout(this.timeoutId);
this.handleRpcResponse(context, accountInfo);
this.setTimeout();
} else {
this.handleRpcResponse(context, accountInfo);
}
},
(this.program.provider as AnchorProvider).opts.commitment
);

if (this.resubTimeoutMs) {
this.setTimeout();
}
}

setData(data: T, slot?: number): void {
Expand All @@ -57,6 +75,22 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
};
}

private setTimeout(): void {
if (!this.onChange) {
throw new Error('onChange callback function must be set');
}
this.timeoutId = setTimeout(async () => {
if (this.receivingData) {
console.log(
`No ws data from ${this.accountName} in ${this.resubTimeoutMs}ms, resubscribing`
);
await this.unsubscribe();
this.receivingData = false;
await this.subscribe(this.onChange);
}
}, this.resubTimeoutMs);
}

async fetch(): Promise<void> {
const rpcResponse =
await this.program.provider.connection.getAccountInfoAndContext(
Expand Down
20 changes: 15 additions & 5 deletions sdk/src/accounts/webSocketDriftClientAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export class WebSocketDriftClientAccountSubscriber
oracleInfos: OracleInfo[];
oracleClientCache = new OracleClientCache();

resubTimeoutMs?: number;
shouldFindAllMarketsAndOracles: boolean;

eventEmitter: StrictEventEmitter<EventEmitter, DriftClientAccountEvents>;
Expand All @@ -54,7 +55,8 @@ export class WebSocketDriftClientAccountSubscriber
perpMarketIndexes: number[],
spotMarketIndexes: number[],
oracleInfos: OracleInfo[],
shouldFindAllMarketsAndOracles: boolean
shouldFindAllMarketsAndOracles: boolean,
resubTimeoutMs?: number
) {
this.isSubscribed = false;
this.program = program;
Expand All @@ -63,6 +65,7 @@ export class WebSocketDriftClientAccountSubscriber
this.spotMarketIndexes = spotMarketIndexes;
this.oracleInfos = oracleInfos;
this.shouldFindAllMarketsAndOracles = shouldFindAllMarketsAndOracles;
this.resubTimeoutMs = resubTimeoutMs;
}

public async subscribe(): Promise<boolean> {
Expand Down Expand Up @@ -96,7 +99,9 @@ export class WebSocketDriftClientAccountSubscriber
this.stateAccountSubscriber = new WebSocketAccountSubscriber(
'state',
this.program,
statePublicKey
statePublicKey,
undefined,
this.resubTimeoutMs
);
await this.stateAccountSubscriber.subscribe((data: StateAccount) => {
this.eventEmitter.emit('stateAccountUpdate', data);
Expand Down Expand Up @@ -136,7 +141,9 @@ export class WebSocketDriftClientAccountSubscriber
const accountSubscriber = new WebSocketAccountSubscriber<PerpMarketAccount>(
'perpMarket',
this.program,
perpMarketPublicKey
perpMarketPublicKey,
undefined,
this.resubTimeoutMs
);
await accountSubscriber.subscribe((data: PerpMarketAccount) => {
this.eventEmitter.emit('perpMarketAccountUpdate', data);
Expand All @@ -161,7 +168,9 @@ export class WebSocketDriftClientAccountSubscriber
const accountSubscriber = new WebSocketAccountSubscriber<SpotMarketAccount>(
'spotMarket',
this.program,
marketPublicKey
marketPublicKey,
undefined,
this.resubTimeoutMs
);
await accountSubscriber.subscribe((data: SpotMarketAccount) => {
this.eventEmitter.emit('spotMarketAccountUpdate', data);
Expand Down Expand Up @@ -192,7 +201,8 @@ export class WebSocketDriftClientAccountSubscriber
oracleInfo.publicKey,
(buffer: Buffer) => {
return client.getOraclePriceDataFromBuffer(buffer);
}
},
this.resubTimeoutMs
);

await accountSubscriber.subscribe((data: OraclePriceData) => {
Expand Down
152 changes: 152 additions & 0 deletions sdk/src/accounts/webSocketProgramAccountSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { DataAndSlot, BufferAndSlot, ProgramAccountSubscriber } from './types';
import { AnchorProvider, Program } from '@coral-xyz/anchor';
import {
Commitment,
Context,
KeyedAccountInfo,
MemcmpFilter,
PublicKey,
} from '@solana/web3.js';
import * as Buffer from 'buffer';

export class WebSocketProgramAccountSubscriber<T>
implements ProgramAccountSubscriber<T>
{
subscriptionName: string;
accountDiscriminator: string;
dataAndSlot?: DataAndSlot<T> & { accountId: PublicKey };
bufferAndSlot?: BufferAndSlot;
program: Program;
decodeBuffer: (accountName: string, ix: Buffer) => T;
onChange: (accountId: PublicKey, data: T, context: Context) => void;
listenerId?: number;
resubTimeoutMs?: number;
timeoutId?: NodeJS.Timeout;
options: { filters: MemcmpFilter[]; commitment?: Commitment };

receivingData = false;

public constructor(
subscriptionName: string,
accountDiscriminator: string,
program: Program,
decodeBufferFn: (accountName: string, ix: Buffer) => T,
options: { filters: MemcmpFilter[]; commitment?: Commitment } = {
filters: [],
},
resubTimeoutMs?: number
) {
this.subscriptionName = subscriptionName;
this.accountDiscriminator = accountDiscriminator;
this.program = program;
this.decodeBuffer = decodeBufferFn;
this.resubTimeoutMs = resubTimeoutMs;
this.options = options;
this.receivingData = false;
}

async subscribe(
onChange: (accountId: PublicKey, data: T, context: Context) => void
): Promise<void> {
if (this.listenerId) {
return;
}

this.onChange = onChange;

this.listenerId = this.program.provider.connection.onProgramAccountChange(
this.program.programId,
(keyedAccountInfo, context) => {
if (this.resubTimeoutMs) {
this.receivingData = true;
clearTimeout(this.timeoutId);
this.handleRpcResponse(context, keyedAccountInfo);
this.setTimeout();
} else {
this.handleRpcResponse(context, keyedAccountInfo);
}
},
this.options.commitment ??
(this.program.provider as AnchorProvider).opts.commitment,
this.options.filters
);

if (this.resubTimeoutMs) {
this.setTimeout();
}
}

private setTimeout(): void {
if (!this.onChange) {
throw new Error('onChange callback function must be set');
}
this.timeoutId = setTimeout(async () => {
if (this.receivingData) {
console.log(
`No ws data from ${this.subscriptionName} in ${this.resubTimeoutMs}ms, resubscribing`
);
await this.unsubscribe();
this.receivingData = false;
await this.subscribe(this.onChange);
}
}, this.resubTimeoutMs);
}

handleRpcResponse(
context: Context,
keyedAccountInfo: KeyedAccountInfo
): void {
const newSlot = context.slot;
let newBuffer: Buffer | undefined = undefined;
if (keyedAccountInfo) {
newBuffer = keyedAccountInfo.accountInfo.data;
}

if (!this.bufferAndSlot) {
this.bufferAndSlot = {
buffer: newBuffer,
slot: newSlot,
};
if (newBuffer) {
const account = this.decodeBuffer(this.accountDiscriminator, newBuffer);
this.dataAndSlot = {
data: account,
slot: newSlot,
accountId: keyedAccountInfo.accountId,
};
this.onChange(keyedAccountInfo.accountId, account, context);
}
return;
}

if (newSlot <= this.bufferAndSlot.slot) {
return;
}

const oldBuffer = this.bufferAndSlot.buffer;
if (newBuffer && (!oldBuffer || !newBuffer.equals(oldBuffer))) {
this.bufferAndSlot = {
buffer: newBuffer,
slot: newSlot,
};
const account = this.decodeBuffer(this.accountDiscriminator, newBuffer);
this.dataAndSlot = {
data: account,
slot: newSlot,
accountId: keyedAccountInfo.accountId,
};
this.onChange(keyedAccountInfo.accountId, account, context);
}
}

unsubscribe(): Promise<void> {
if (this.listenerId) {
const promise =
this.program.provider.connection.removeAccountChangeListener(
this.listenerId
);
this.listenerId = undefined;
return promise;
}
}
}
12 changes: 10 additions & 2 deletions sdk/src/accounts/webSocketUserAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ import { UserAccount } from '../types';

export class WebSocketUserAccountSubscriber implements UserAccountSubscriber {
isSubscribed: boolean;
reconnectTimeoutMs?: number;
program: Program;
eventEmitter: StrictEventEmitter<EventEmitter, UserAccountEvents>;
userAccountPublicKey: PublicKey;

userDataAccountSubscriber: AccountSubscriber<UserAccount>;

public constructor(program: Program, userAccountPublicKey: PublicKey) {
public constructor(
program: Program,
userAccountPublicKey: PublicKey,
reconnectTimeoutMs?: number
) {
this.isSubscribed = false;
this.program = program;
this.userAccountPublicKey = userAccountPublicKey;
this.eventEmitter = new EventEmitter();
this.reconnectTimeoutMs = reconnectTimeoutMs;
}

async subscribe(userAccount?: UserAccount): Promise<boolean> {
Expand All @@ -35,7 +41,9 @@ export class WebSocketUserAccountSubscriber implements UserAccountSubscriber {
this.userDataAccountSubscriber = new WebSocketAccountSubscriber(
'user',
this.program,
this.userAccountPublicKey
this.userAccountPublicKey,
undefined,
this.reconnectTimeoutMs
);

if (userAccount) {
Expand Down
12 changes: 10 additions & 2 deletions sdk/src/accounts/webSocketUserStatsAccountSubsriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ export class WebSocketUserStatsAccountSubscriber
implements UserStatsAccountSubscriber
{
isSubscribed: boolean;
reconnectTimeoutMs?: number;
program: Program;
eventEmitter: StrictEventEmitter<EventEmitter, UserStatsAccountEvents>;
userStatsAccountPublicKey: PublicKey;

userStatsAccountSubscriber: AccountSubscriber<UserStatsAccount>;

public constructor(program: Program, userStatsAccountPublicKey: PublicKey) {
public constructor(
program: Program,
userStatsAccountPublicKey: PublicKey,
reconnectTimeoutMs?: number
) {
this.isSubscribed = false;
this.program = program;
this.userStatsAccountPublicKey = userStatsAccountPublicKey;
this.eventEmitter = new EventEmitter();
this.reconnectTimeoutMs = reconnectTimeoutMs;
}

async subscribe(userStatsAccount?: UserStatsAccount): Promise<boolean> {
Expand All @@ -37,7 +43,9 @@ export class WebSocketUserStatsAccountSubscriber
this.userStatsAccountSubscriber = new WebSocketAccountSubscriber(
'userStats',
this.program,
this.userStatsAccountPublicKey
this.userStatsAccountPublicKey,
undefined,
this.reconnectTimeoutMs
);

if (userStatsAccount) {
Expand Down
Loading

0 comments on commit 467bc34

Please sign in to comment.