From d0b270d8783e44411c06d87bd4bc257f02e4006e Mon Sep 17 00:00:00 2001 From: Roland Bewick Date: Fri, 5 Jan 2024 15:25:19 +0700 Subject: [PATCH] fix: remove infinite loop in subscription causing high cpu --- service.go | 144 ++++++++++++++++++++++++++--------------------------- 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/service.go b/service.go index f6def421..871d9966 100644 --- a/service.go +++ b/service.go @@ -54,87 +54,87 @@ func (svc *Service) GetUser(c echo.Context) (user *User, err error) { } func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscription) error { - for { - if sub.Relay.ConnectionError != nil { - return sub.Relay.ConnectionError - } - select { - case <-ctx.Done(): - svc.Logger.Info("Exiting subscription.") - return nil - case <-sub.EndOfStoredEvents: - if !svc.ReceivedEOS { - svc.Logger.Info("Received EOS") + go func() { + <-sub.EndOfStoredEvents + svc.ReceivedEOS = true + svc.Logger.Info("Received EOS") + }() + + go func() { + for event := range sub.Events { + resp, err := svc.HandleEvent(ctx, event) + if err != nil { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "eventKind": event.Kind, + }).Errorf("Failed to process event: %v", err) } - svc.ReceivedEOS = true - case event := <-sub.Events: - go func() { - resp, err := svc.HandleEvent(ctx, event) + if resp != nil { + status, err := sub.Relay.Publish(ctx, *resp) if err != nil { svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "eventKind": event.Kind, - }).Errorf("Failed to process event: %v", err) + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + }).Errorf("Failed to publish reply: %v", err) + return } - if resp != nil { - status, err := sub.Relay.Publish(ctx, *resp) - if err != nil { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - }).Errorf("Failed to publish reply: %v", err) - return - } - nostrEvent := NostrEvent{} - result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent) - if result.Error != nil { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - }).Error(result.Error) - return - } - nostrEvent.ReplyId = resp.ID + nostrEvent := NostrEvent{} + result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent) + if result.Error != nil { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + }).Error(result.Error) + return + } + nostrEvent.ReplyId = resp.ID - if status == nostr.PublishStatusSucceeded { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED - nostrEvent.RepliedAt = time.Now() - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Published reply") - } else if status == nostr.PublishStatusFailed { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Failed to publish reply") - } else { - nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED - svc.db.Save(&nostrEvent) - svc.Logger.WithFields(logrus.Fields{ - "nostrEventId": nostrEvent.ID, - "eventId": event.ID, - "status": status, - "replyEventId": resp.ID, - "appId": nostrEvent.AppId, - }).Info("Reply sent but no response from relay (timeout)") - } + if status == nostr.PublishStatusSucceeded { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED + nostrEvent.RepliedAt = time.Now() + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Published reply") + } else if status == nostr.PublishStatusFailed { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Failed to publish reply") + } else { + nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED + svc.db.Save(&nostrEvent) + svc.Logger.WithFields(logrus.Fields{ + "nostrEventId": nostrEvent.ID, + "eventId": event.ID, + "status": status, + "replyEventId": resp.ID, + "appId": nostrEvent.AppId, + }).Info("Reply sent but no response from relay (timeout)") } - }() + } } + }() + + <-ctx.Done() + if ctx.Err() != context.Canceled { + svc.Logger.Errorf("Subscription error %v", ctx.Err()) + return ctx.Err() } + svc.Logger.Info("Exiting subscription.") + return nil } func (svc *Service) HandleEvent(ctx context.Context, event *nostr.Event) (result *nostr.Event, err error) {