From d0b270d8783e44411c06d87bd4bc257f02e4006e Mon Sep 17 00:00:00 2001 From: Roland Bewick Date: Fri, 5 Jan 2024 15:25:19 +0700 Subject: [PATCH 1/3] fix: remove infinite loop in subscription causing high cpu --- service.go | 144 ++++++++++++++++++++++++++--------------------------- 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/service.go b/service.go index f6def421..871d9966 100644 --- a/service.go +++ b/service.go @@ -54,87 +54,87 @@ func (svc *Service) GetUser(c echo.Context) (user *User, err error) { } func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscription) error { - for { - if sub.Relay.ConnectionError != nil { - return sub.Relay.ConnectionError - } - select { - case <-ctx.Done(): - svc.Logger.Info("Exiting subscription.") - return nil - case <-sub.EndOfStoredEvents: - if !svc.ReceivedEOS { - svc.Logger.Info("Received EOS") + go func() { + <-sub.EndOfStoredEvents + svc.ReceivedEOS = true + svc.Logger.Info("Received EOS") + }() + + go func() { + for event := range sub.Events { + resp, err := svc.HandleEvent(ctx, event) + if err != nil { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "eventKind": event.Kind, + }).Errorf("Failed to process event: %v", err) } - svc.ReceivedEOS = true - case event := <-sub.Events: - go func() { - resp, err := svc.HandleEvent(ctx, event) + if resp != nil { + status, err := sub.Relay.Publish(ctx, *resp) if err != nil { svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "eventKind": event.Kind, - }).Errorf("Failed to process event: %v", err) + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + }).Errorf("Failed to publish reply: %v", err) + return } - if resp != nil { - status, err := sub.Relay.Publish(ctx, *resp) - if err != nil { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - }).Errorf("Failed to publish reply: %v", err) - return - } - nostrEvent := NostrEvent{} - result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent) - if result.Error != nil { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - }).Error(result.Error) - return - } - nostrEvent.ReplyId = resp.ID + nostrEvent := NostrEvent{} + result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent) + if result.Error != nil { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + }).Error(result.Error) + return + } + nostrEvent.ReplyId = resp.ID - if status == nostr.PublishStatusSucceeded { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED - nostrEvent.RepliedAt = time.Now() - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Published reply") - } else if status == nostr.PublishStatusFailed { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Failed to publish reply") - } else { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Reply sent but no response from relay (timeout)") - } + if status == nostr.PublishStatusSucceeded { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED + nostrEvent.RepliedAt = time.Now() + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Published reply") + } else if status == nostr.PublishStatusFailed { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Failed to publish reply") + } else { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Reply sent but no response from relay (timeout)") } - }() + } } + }() + + <-ctx.Done() + if ctx.Err() != context.Canceled { + svc.Logger.Errorf("Subscription error %v", ctx.Err()) + return ctx.Err() } + svc.Logger.Info("Exiting subscription.") + return nil } func (svc *Service) HandleEvent(ctx context.Context, event *nostr.Event) (result *nostr.Event, err error) { From dc6a62c0555d67dd71a0e1a946f5809110b3f0f2 Mon Sep 17 00:00:00 2001 From: Roland Bewick Date: Fri, 5 Jan 2024 15:43:32 +0700 Subject: [PATCH 2/3] fix: add check for relay connection error --- service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/service.go b/service.go index 871d9966..5efa4b1f 100644 --- a/service.go +++ b/service.go @@ -129,6 +129,10 @@ func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscripti }() <-ctx.Done() + if sub.Relay.ConnectionError != nil { + svc.Logger.Errorf("Relay error %v", ctx.Err()) + return sub.Relay.ConnectionError + } if ctx.Err() != context.Canceled { svc.Logger.Errorf("Subscription error %v", ctx.Err()) return ctx.Err() From dd59887a3d8aaf178e4df05c4f68e9716fec7b97 Mon Sep 17 00:00:00 2001 From: Roland Bewick Date: Fri, 5 Jan 2024 19:52:05 +0700 Subject: [PATCH 3/3] fix: listen to relay context done message --- .env.example | 1 + service.go | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.env.example b/.env.example index e0e7c094..5aa2ed4f 100644 --- a/.env.example +++ b/.env.example @@ -2,6 +2,7 @@ DATABASE_URI=file:nwc.db NOSTR_PRIVKEY= COOKIE_SECRET=secretsecret RELAY=wss://relay.getalby.com/v1 +#RELAY=ws://localhost:7447/v1 PUBLIC_RELAY= PORT=8080 diff --git a/service.go b/service.go index 5efa4b1f..8707bdf4 100644 --- a/service.go +++ b/service.go @@ -128,17 +128,18 @@ func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscripti } }() - <-ctx.Done() - if sub.Relay.ConnectionError != nil { - svc.Logger.Errorf("Relay error %v", ctx.Err()) + select { + case <-sub.Relay.Context().Done(): + svc.Logger.Errorf("Relay error %v", sub.Relay.ConnectionError) return sub.Relay.ConnectionError + case <-ctx.Done(): + if ctx.Err() != context.Canceled { + svc.Logger.Errorf("Subscription error %v", ctx.Err()) + return ctx.Err() + } + svc.Logger.Info("Exiting subscription.") + return nil } - if ctx.Err() != context.Canceled { - svc.Logger.Errorf("Subscription error %v", ctx.Err()) - return ctx.Err() - } - svc.Logger.Info("Exiting subscription.") - return nil } func (svc *Service) HandleEvent(ctx context.Context, event *nostr.Event) (result *nostr.Event, err error) {