Skip to content

Commit

Permalink
fix: remove infinite loop in subscription causing high cpu
Browse files Browse the repository at this point in the history
  • Loading branch information
rolznz committed Jan 5, 2024
1 parent e6c4236 commit d0b270d
Showing 1 changed file with 72 additions and 72 deletions.
144 changes: 72 additions & 72 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,87 +54,87 @@ func (svc *Service) GetUser(c echo.Context) (user *User, err error) {
}

func (svc *Service) StartSubscription(ctx context.Context, sub *nostr.Subscription) error {
for {
if sub.Relay.ConnectionError != nil {
return sub.Relay.ConnectionError
}
select {
case <-ctx.Done():
svc.Logger.Info("Exiting subscription.")
return nil
case <-sub.EndOfStoredEvents:
if !svc.ReceivedEOS {
svc.Logger.Info("Received EOS")
go func() {
<-sub.EndOfStoredEvents
svc.ReceivedEOS = true
svc.Logger.Info("Received EOS")
}()

go func() {
for event := range sub.Events {
resp, err := svc.HandleEvent(ctx, event)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"eventKind": event.Kind,
}).Errorf("Failed to process event: %v", err)
}
svc.ReceivedEOS = true
case event := <-sub.Events:
go func() {
resp, err := svc.HandleEvent(ctx, event)
if resp != nil {
status, err := sub.Relay.Publish(ctx, *resp)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"eventKind": event.Kind,
}).Errorf("Failed to process event: %v", err)
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Errorf("Failed to publish reply: %v", err)
return
}
if resp != nil {
status, err := sub.Relay.Publish(ctx, *resp)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Errorf("Failed to publish reply: %v", err)
return
}

nostrEvent := NostrEvent{}
result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent)
if result.Error != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Error(result.Error)
return
}
nostrEvent.ReplyId = resp.ID
nostrEvent := NostrEvent{}
result := svc.db.Where("nostr_id = ?", event.ID).First(&nostrEvent)
if result.Error != nil {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
}).Error(result.Error)
return
}
nostrEvent.ReplyId = resp.ID

if status == nostr.PublishStatusSucceeded {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED
nostrEvent.RepliedAt = time.Now()
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Published reply")
} else if status == nostr.PublishStatusFailed {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Failed to publish reply")
} else {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Reply sent but no response from relay (timeout)")
}
if status == nostr.PublishStatusSucceeded {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_CONFIRMED
nostrEvent.RepliedAt = time.Now()
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Published reply")
} else if status == nostr.PublishStatusFailed {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_FAILED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Failed to publish reply")
} else {
nostrEvent.State = NOSTR_EVENT_STATE_PUBLISH_UNCONFIRMED
svc.db.Save(&nostrEvent)
svc.Logger.WithFields(logrus.Fields{
"nostrEventId": nostrEvent.ID,
"eventId": event.ID,
"status": status,
"replyEventId": resp.ID,
"appId": nostrEvent.AppId,
}).Info("Reply sent but no response from relay (timeout)")
}
}()
}
}
}()

<-ctx.Done()
if ctx.Err() != context.Canceled {
svc.Logger.Errorf("Subscription error %v", ctx.Err())
return ctx.Err()
}
svc.Logger.Info("Exiting subscription.")
return nil
}

func (svc *Service) HandleEvent(ctx context.Context, event *nostr.Event) (result *nostr.Event, err error) {
Expand Down

0 comments on commit d0b270d

Please sign in to comment.