Skip to content

Commit

Permalink
Merge pull request #686 from drift-labs/sdk-unsubscribe-fix
Browse files Browse the repository at this point in the history
fix unsubscribe race condition
  • Loading branch information
NourAlharithi authored Nov 8, 2023
2 parents 54009dc + f50419f commit 6755a9e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
26 changes: 20 additions & 6 deletions sdk/src/accounts/webSocketAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
onChange: (data: T) => void;
listenerId?: number;
resubTimeoutMs?: number;
isUnsubscribing = false;

timeoutId?: NodeJS.Timeout;

receivingData: boolean;
Expand All @@ -34,7 +36,7 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
}

async subscribe(onChange: (data: T) => void): Promise<void> {
if (this.listenerId) {
if (this.listenerId || this.isUnsubscribing) {
return;
}

Expand Down Expand Up @@ -80,6 +82,11 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
throw new Error('onChange callback function must be set');
}
this.timeoutId = setTimeout(async () => {
if (this.isUnsubscribing) {
// If we are in the process of unsubscribing, do not attempt to resubscribe
return;
}

if (this.receivingData) {
console.log(
`No ws data from ${this.accountName} in ${this.resubTimeoutMs}ms, resubscribing`
Expand Down Expand Up @@ -154,13 +161,20 @@ export class WebSocketAccountSubscriber<T> implements AccountSubscriber<T> {
}

unsubscribe(): Promise<void> {
this.isUnsubscribing = true;
clearTimeout(this.timeoutId);
this.timeoutId = undefined;

if (this.listenerId) {
const promise =
this.program.provider.connection.removeAccountChangeListener(
this.listenerId
);
this.listenerId = undefined;
const promise = this.program.provider.connection
.removeAccountChangeListener(this.listenerId)
.then(() => {
this.listenerId = undefined;
this.isUnsubscribing = false;
});
return promise;
} else {
this.isUnsubscribing = false;
}
}
}
25 changes: 19 additions & 6 deletions sdk/src/accounts/webSocketProgramAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export class WebSocketProgramAccountSubscriber<T>
onChange: (accountId: PublicKey, data: T, context: Context) => void;
listenerId?: number;
resubTimeoutMs?: number;
isUnsubscribing = false;
timeoutId?: NodeJS.Timeout;
options: { filters: MemcmpFilter[]; commitment?: Commitment };

Expand Down Expand Up @@ -48,7 +49,7 @@ export class WebSocketProgramAccountSubscriber<T>
async subscribe(
onChange: (accountId: PublicKey, data: T, context: Context) => void
): Promise<void> {
if (this.listenerId) {
if (this.listenerId || this.isUnsubscribing) {
return;
}

Expand Down Expand Up @@ -81,6 +82,11 @@ export class WebSocketProgramAccountSubscriber<T>
throw new Error('onChange callback function must be set');
}
this.timeoutId = setTimeout(async () => {
if (this.isUnsubscribing) {
// If we are in the process of unsubscribing, do not attempt to resubscribe
return;
}

if (this.receivingData) {
console.log(
`No ws data from ${this.subscriptionName} in ${this.resubTimeoutMs}ms, resubscribing`
Expand Down Expand Up @@ -140,13 +146,20 @@ export class WebSocketProgramAccountSubscriber<T>
}

unsubscribe(): Promise<void> {
this.isUnsubscribing = true;
clearTimeout(this.timeoutId);
this.timeoutId = undefined;

if (this.listenerId) {
const promise =
this.program.provider.connection.removeAccountChangeListener(
this.listenerId
);
this.listenerId = undefined;
const promise = this.program.provider.connection
.removeAccountChangeListener(this.listenerId)
.then(() => {
this.listenerId = undefined;
this.isUnsubscribing = false;
});
return promise;
} else {
this.isUnsubscribing = false;
}
}
}

0 comments on commit 6755a9e

Please sign in to comment.