From be22d96f0ac701c996ed101b99f40a2f6cdf75bf Mon Sep 17 00:00:00 2001 From: Nour Alharithi Date: Wed, 8 Nov 2023 10:37:10 -0800 Subject: [PATCH 1/2] fix race condition --- sdk/src/accounts/webSocketAccountSubscriber.ts | 13 ++++++++++++- .../accounts/webSocketProgramAccountSubscriber.ts | 13 ++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/sdk/src/accounts/webSocketAccountSubscriber.ts b/sdk/src/accounts/webSocketAccountSubscriber.ts index 50779d8e1..d996f7e5b 100644 --- a/sdk/src/accounts/webSocketAccountSubscriber.ts +++ b/sdk/src/accounts/webSocketAccountSubscriber.ts @@ -14,6 +14,7 @@ export class WebSocketAccountSubscriber implements AccountSubscriber { onChange: (data: T) => void; listenerId?: number; resubTimeoutMs?: number; + isUnsubscribing = false; timeoutId?: NodeJS.Timeout; receivingData: boolean; @@ -34,7 +35,7 @@ export class WebSocketAccountSubscriber implements AccountSubscriber { } async subscribe(onChange: (data: T) => void): Promise { - if (this.listenerId) { + if (this.listenerId || this.isUnsubscribing) { return; } @@ -80,6 +81,11 @@ export class WebSocketAccountSubscriber implements AccountSubscriber { throw new Error('onChange callback function must be set'); } this.timeoutId = setTimeout(async () => { + if (this.isUnsubscribing) { + // If we are in the process of unsubscribing, do not attempt to resubscribe + return; + } + if (this.receivingData) { console.log( `No ws data from ${this.accountName} in ${this.resubTimeoutMs}ms, resubscribing` @@ -154,12 +160,17 @@ export class WebSocketAccountSubscriber implements AccountSubscriber { } unsubscribe(): Promise { + this.isUnsubscribing = true; + clearTimeout(this.timeoutId); + this.timeoutId = undefined; + if (this.listenerId) { const promise = this.program.provider.connection.removeAccountChangeListener( this.listenerId ); this.listenerId = undefined; + this.isUnsubscribing = false; return promise; } } diff --git a/sdk/src/accounts/webSocketProgramAccountSubscriber.ts b/sdk/src/accounts/webSocketProgramAccountSubscriber.ts index bcb7b9a2f..0a16cc177 100644 --- a/sdk/src/accounts/webSocketProgramAccountSubscriber.ts +++ b/sdk/src/accounts/webSocketProgramAccountSubscriber.ts @@ -21,6 +21,7 @@ export class WebSocketProgramAccountSubscriber onChange: (accountId: PublicKey, data: T, context: Context) => void; listenerId?: number; resubTimeoutMs?: number; + isUnsubscribing = false; timeoutId?: NodeJS.Timeout; options: { filters: MemcmpFilter[]; commitment?: Commitment }; @@ -48,7 +49,7 @@ export class WebSocketProgramAccountSubscriber async subscribe( onChange: (accountId: PublicKey, data: T, context: Context) => void ): Promise { - if (this.listenerId) { + if (this.listenerId || this.isUnsubscribing) { return; } @@ -81,6 +82,11 @@ export class WebSocketProgramAccountSubscriber throw new Error('onChange callback function must be set'); } this.timeoutId = setTimeout(async () => { + if (this.isUnsubscribing) { + // If we are in the process of unsubscribing, do not attempt to resubscribe + return; + } + if (this.receivingData) { console.log( `No ws data from ${this.subscriptionName} in ${this.resubTimeoutMs}ms, resubscribing` @@ -140,12 +146,17 @@ export class WebSocketProgramAccountSubscriber } unsubscribe(): Promise { + this.isUnsubscribing = true; + clearTimeout(this.timeoutId); + this.timeoutId = undefined; + if (this.listenerId) { const promise = this.program.provider.connection.removeAccountChangeListener( this.listenerId ); this.listenerId = undefined; + this.isUnsubscribing = false; return promise; } } From f50419f9f71c2c5d8d71e6b6bb24a452f78f44c5 Mon Sep 17 00:00:00 2001 From: Nour Alharithi Date: Wed, 8 Nov 2023 11:20:19 -0800 Subject: [PATCH 2/2] unsubscribing is false after remove listener resolves --- sdk/src/accounts/webSocketAccountSubscriber.ts | 15 +++++++++------ .../accounts/webSocketProgramAccountSubscriber.ts | 14 ++++++++------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sdk/src/accounts/webSocketAccountSubscriber.ts b/sdk/src/accounts/webSocketAccountSubscriber.ts index d996f7e5b..0303147b6 100644 --- a/sdk/src/accounts/webSocketAccountSubscriber.ts +++ b/sdk/src/accounts/webSocketAccountSubscriber.ts @@ -15,6 +15,7 @@ export class WebSocketAccountSubscriber implements AccountSubscriber { listenerId?: number; resubTimeoutMs?: number; isUnsubscribing = false; + timeoutId?: NodeJS.Timeout; receivingData: boolean; @@ -165,13 +166,15 @@ export class WebSocketAccountSubscriber implements AccountSubscriber { this.timeoutId = undefined; if (this.listenerId) { - const promise = - this.program.provider.connection.removeAccountChangeListener( - this.listenerId - ); - this.listenerId = undefined; - this.isUnsubscribing = false; + const promise = this.program.provider.connection + .removeAccountChangeListener(this.listenerId) + .then(() => { + this.listenerId = undefined; + this.isUnsubscribing = false; + }); return promise; + } else { + this.isUnsubscribing = false; } } } diff --git a/sdk/src/accounts/webSocketProgramAccountSubscriber.ts b/sdk/src/accounts/webSocketProgramAccountSubscriber.ts index 0a16cc177..327521887 100644 --- a/sdk/src/accounts/webSocketProgramAccountSubscriber.ts +++ b/sdk/src/accounts/webSocketProgramAccountSubscriber.ts @@ -151,13 +151,15 @@ export class WebSocketProgramAccountSubscriber this.timeoutId = undefined; if (this.listenerId) { - const promise = - this.program.provider.connection.removeAccountChangeListener( - this.listenerId - ); - this.listenerId = undefined; - this.isUnsubscribing = false; + const promise = this.program.provider.connection + .removeAccountChangeListener(this.listenerId) + .then(() => { + this.listenerId = undefined; + this.isUnsubscribing = false; + }); return promise; + } else { + this.isUnsubscribing = false; } } }