From 2e77c8825240b21314657aa0493150e1505c569a Mon Sep 17 00:00:00 2001 From: im-adithya Date: Thu, 22 Aug 2024 20:03:45 +0530 Subject: [PATCH 1/4] feat: add exponential backoff to subscriptions --- internal/nostr/nostr.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 8a240c6..6062ada 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -699,6 +699,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri var relay *nostr.Relay var isCustomRelay bool var err error + waitToReconnectSeconds := 0 for { // close relays with connection errors before connecting again @@ -707,14 +708,16 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri if relay != nil && isCustomRelay { relay.Close() } + time.Sleep(time.Duration(waitToReconnectSeconds) * time.Second) relay, isCustomRelay, err = svc.getRelayConnection(ctx, subscription.RelayUrl) if err != nil { // TODO: notify user about relay failure svc.Logger.WithError(err).WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, - }).Error("Failed get relay connection, retrying in 5s...") - time.Sleep(5 * time.Second) // sleep for 5 seconds + }).Errorf("Failed to subscribe to relay, retrying in %vs...", waitToReconnectSeconds) + waitToReconnectSeconds = max(waitToReconnectSeconds, 1) + waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) continue } @@ -724,8 +727,9 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri svc.Logger.WithError(err).WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, - }).Error("Failed to subscribe to relay, retrying in 5s...") - time.Sleep(5 * time.Second) // sleep for 5 seconds + }).Errorf("Failed to subscribe to relay, retrying in %vs...", waitToReconnectSeconds) + waitToReconnectSeconds = max(waitToReconnectSeconds, 1) + waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) continue } @@ -745,8 +749,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri svc.Logger.WithError(err).WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, - }).Error("Subscription stopped due to relay error, reconnecting in 5s...") - time.Sleep(5 * time.Second) // sleep for 5 seconds + }).Error("Subscription stopped due to relay error, reconnecting...") continue } else { if isCustomRelay { From d2f55fdcb56690a9880d882e7907d97d041d9a1e Mon Sep 17 00:00:00 2001 From: im-adithya Date: Tue, 27 Aug 2024 15:35:26 +0530 Subject: [PATCH 2/4] chore: reset waitToReconnectSeconds to 0 --- internal/nostr/nostr.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 6062ada..2f61bd8 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -715,7 +715,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri svc.Logger.WithError(err).WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, - }).Errorf("Failed to subscribe to relay, retrying in %vs...", waitToReconnectSeconds) + }).Errorf("Failed to connect to relay, retrying in %vs...", waitToReconnectSeconds) waitToReconnectSeconds = max(waitToReconnectSeconds, 1) waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) continue @@ -742,6 +742,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri "relay_url": subscription.RelayUrl, }).Debug("Started subscription") + waitToReconnectSeconds = 0 + err = svc.processEvents(ctx, subscription, onReceiveEOS, handleEvent) if err != nil { From d529dcc8cea0e7c97b62ef94d196df90dd5bd60a Mon Sep 17 00:00:00 2001 From: im-adithya Date: Tue, 27 Aug 2024 15:55:47 +0530 Subject: [PATCH 3/4] fix: check relay.connection before closing --- internal/nostr/nostr.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 2f61bd8..9ca3280 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -705,31 +705,31 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri // close relays with connection errors before connecting again // because context expiration has no effect on relays // TODO: Call relay.Connect on already initialized relays - if relay != nil && isCustomRelay { + if relay != nil && relay.Connection != nil && isCustomRelay { relay.Close() } time.Sleep(time.Duration(waitToReconnectSeconds) * time.Second) relay, isCustomRelay, err = svc.getRelayConnection(ctx, subscription.RelayUrl) if err != nil { // TODO: notify user about relay failure + waitToReconnectSeconds = max(waitToReconnectSeconds, 1) + waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) svc.Logger.WithError(err).WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, - }).Errorf("Failed to connect to relay, retrying in %vs...", waitToReconnectSeconds) - waitToReconnectSeconds = max(waitToReconnectSeconds, 1) - waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) + }).Errorf("Failed to connect to relay, retrying in %vs...", waitToReconnectSeconds) continue } sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter}) if err != nil { // TODO: notify user about subscription failure + waitToReconnectSeconds = max(waitToReconnectSeconds, 1) + waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) svc.Logger.WithError(err).WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, }).Errorf("Failed to subscribe to relay, retrying in %vs...", waitToReconnectSeconds) - waitToReconnectSeconds = max(waitToReconnectSeconds, 1) - waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900) continue } From 333d4ffedd454099464ee32aaeb3c761b60c72c1 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Tue, 27 Aug 2024 15:57:03 +0530 Subject: [PATCH 4/4] fix: stop subscription if ctx is cancelled during connection --- internal/nostr/nostr.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 9ca3280..301af7c 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -702,12 +702,18 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri waitToReconnectSeconds := 0 for { - // close relays with connection errors before connecting again - // because context expiration has no effect on relays - // TODO: Call relay.Connect on already initialized relays + // context expiration has no effect on relays if relay != nil && relay.Connection != nil && isCustomRelay { relay.Close() } + if ctx.Err() != nil { + svc.Logger.WithFields(logrus.Fields{ + "subscription_id": subscription.ID, + "relay_url": subscription.RelayUrl, + }).Debug("Context canceled, stopping subscription") + svc.stopSubscription(subscription) + return + } time.Sleep(time.Duration(waitToReconnectSeconds) * time.Second) relay, isCustomRelay, err = svc.getRelayConnection(ctx, subscription.RelayUrl) if err != nil {