diff --git a/README.md b/README.md index 6c4c279..179928e 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,9 @@ This `GET` request returns a pubkey's NWC capabilities (if any) ### Publish NWC Request -Returns the response event directly or to the Webhook URL if provided. +Publishes the NWC request event and returns the response + +#### Without webhook
@@ -62,7 +64,6 @@ Returns the response event directly or to the Webhook URL if provided. | name | type | data type | description | |-----------|-----------|-------------------------|-----------------------------------------------------------------------| | relayUrl | optional | string | If no relay is provided, it uses the default relay (wss://relay.getalby.com/v1) | -| webhookUrl | optional | string | Webhook URL to publish the response event, returns the event directly if not provided | | walletPubkey | required | string | Pubkey of the NWC Wallet Provider | | event | required | JSON object (see [example](#event-example)) | **Signed** request event | @@ -87,15 +88,7 @@ Returns the response event directly or to the Webhook URL if provided. // Source: https://pkg.go.dev/github.com/nbd-wtf/go-nostr@v0.30.0#Event ``` -#### Response (with webhook) - -```json -{ - "state": "WEBHOOK_RECEIVED" -} -``` - -#### Response (without webhook) +#### Response ```json { @@ -122,6 +115,55 @@ Returns the response event directly or to the Webhook URL if provided. ```
+#### With webhook + +
+ +POST /nip47/webhook + + +#### Request Body + +| name | type | data type | description | +|-----------|-----------|-------------------------|-----------------------------------------------------------------------| +| relayUrl | optional | string | If no relay is provided, it uses the default relay (wss://relay.getalby.com/v1) | +| webhookUrl | required | string | Webhook URL to publish the response event | +| walletPubkey | required | string | Pubkey of the NWC Wallet Provider | +| event | required | JSON object (see [example](#event-example)) | **Signed** request event | + + +#### Response + +```json +{ + "state": "WEBHOOK_RECEIVED" +} +``` + +#### Response to webhook + +```json +{ + "id": "a16ycf4a01bcxx........xxxxx", + "pubkey": "a16y69effexxxx........xxxxx", + "created_at": 1709033612, + "kind": 23195, + "tags": [ + [ + "p", + "f490f5xxxxx........xxxxx" + ], + [ + "e", + "a41aefxxxxx........xxxxx" + ] + ], + "content": "", + "sig": "", +} +``` +
+ ------------------------------------------------------------------------------------------ ### Publish Event diff --git a/cmd/server/main.go b/cmd/server/main.go index 168762a..92fc8fe 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -33,6 +33,7 @@ func main() { e.POST("/nip47/info", svc.InfoHandler) e.POST("/nip47", svc.NIP47Handler) + e.POST("/nip47/webhook", svc.NIP47WebhookHandler) e.POST("/nip47/notifications", svc.NIP47NotificationHandler) e.POST("/publish", svc.PublishHandler) e.POST("/subscriptions", svc.SubscriptionHandler) diff --git a/internal/nostr/models.go b/internal/nostr/models.go index c347a06..76e4a16 100644 --- a/internal/nostr/models.go +++ b/internal/nostr/models.go @@ -1,6 +1,7 @@ package nostr import ( + "context" "encoding/json" "time" @@ -9,37 +10,45 @@ import ( ) const ( - NIP_47_INFO_EVENT_KIND = 13194 - NIP_47_REQUEST_KIND = 23194 - NIP_47_RESPONSE_KIND = 23195 - - // state of request event - REQUEST_EVENT_PUBLISH_CONFIRMED = "confirmed" - REQUEST_EVENT_PUBLISH_FAILED = "failed" + NIP_47_INFO_EVENT_KIND = 13194 + NIP_47_REQUEST_KIND = 23194 + NIP_47_RESPONSE_KIND = 23195 + NIP_47_NOTIFICATION_KIND = 23196 + + REQUEST_EVENT_PUBLISH_CONFIRMED = "CONFIRMED" + REQUEST_EVENT_PUBLISH_FAILED = "FAILED" + EVENT_PUBLISHED = "PUBLISHED" + EVENT_ALREADY_PROCESSED = "ALREADY_PROCESSED" + WEBHOOK_RECEIVED = "WEBHOOK_RECEIVED" + SUBSCRIPTION_CLOSED = "CLOSED" + SUBSCRIPTION_ALREADY_CLOSED = "ALREADY_CLOSED" ) type Subscription struct { - ID uint - RelayUrl string `validate:"required"` - WebhookUrl string - Open bool - Ids *[]string `gorm:"-"` - Kinds *[]int `gorm:"-"` - Authors *[]string `gorm:"-"` // WalletPubkey is included in this - Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag - Since time.Time - Until time.Time - Limit int - Search string - CreatedAt time.Time - UpdatedAt time.Time - Uuid string `gorm:"type:uuid;default:gen_random_uuid()"` + ID uint + RelayUrl string + WebhookUrl string + Open bool + Ids *[]string `gorm:"-"` + Kinds *[]int `gorm:"-"` + Authors *[]string `gorm:"-"` // WalletPubkey is included in this + Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag + Since time.Time + Until time.Time + Limit int + Search string + CreatedAt time.Time + UpdatedAt time.Time + Uuid string `gorm:"type:uuid;default:gen_random_uuid()"` + EventChan chan *nostr.Event `gorm:"-"` + RequestEvent *nostr.Event `gorm:"-"` + RequestEventDB RequestEvent `gorm:"-"` // TODO: fix an elegant solution to store datatypes - IdsString string - KindsString string - AuthorsString string - TagsString string + IdsString string + KindsString string + AuthorsString string + TagsString string } func (s *Subscription) BeforeSave(tx *gorm.DB) error { @@ -116,9 +125,13 @@ func (s *Subscription) AfterFind(tx *gorm.DB) error { return nil } +type OnReceiveEOSFunc func(ctx context.Context, subscription *Subscription) + +type HandleEventFunc func(event *nostr.Event, subscription *Subscription) + type RequestEvent struct { ID uint - SubscriptionId uint `validate:"required"` + SubscriptionId *uint NostrId string `validate:"required"` Content string State string @@ -129,7 +142,7 @@ type RequestEvent struct { type ResponseEvent struct { ID uint RequestId *uint - SubscriptionId uint `validate:"required"` + SubscriptionId *uint NostrId string `validate:"required"` Content string RepliedAt time.Time @@ -152,6 +165,12 @@ type InfoResponse struct { } type NIP47Request struct { + RelayUrl string `json:"relayUrl"` + WalletPubkey string `json:"walletPubkey"` + SignedEvent *nostr.Event `json:"event"` +} + +type NIP47WebhookRequest struct { RelayUrl string `json:"relayUrl"` WalletPubkey string `json:"walletPubkey"` WebhookUrl string `json:"webhookUrl"` @@ -159,10 +178,10 @@ type NIP47Request struct { } type NIP47NotificationRequest struct { - RelayUrl string `json:"relayUrl"` - WebhookUrl string `json:"webhookUrl"` - WalletPubkey string `json:"walletPubkey"` - ConnPubkey string `json:"connectionPubkey"` + RelayUrl string `json:"relayUrl"` + WebhookUrl string `json:"webhookUrl"` + WalletPubkey string `json:"walletPubkey"` + ConnPubkey string `json:"connectionPubkey"` } type NIP47Response struct { diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 0cb17f6..13ea71d 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -5,7 +5,7 @@ import ( "context" "database/sql" "encoding/json" - "fmt" + "errors" "http-nostr/migrations" "net/http" "os" @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/joho/godotenv" "github.com/kelseyhightower/envconfig" "github.com/labstack/echo/v4" @@ -44,7 +45,7 @@ type Service struct { Relay *nostr.Relay Cfg *Config Logger *logrus.Logger - subscriptions map[uint]*nostr.Subscription + subscriptions map[string]*nostr.Subscription subscriptionsMutex sync.Mutex relayMutex sync.Mutex } @@ -112,7 +113,7 @@ func NewService(ctx context.Context) (*Service, error) { return nil, err } - subscriptions := make(map[uint]*nostr.Subscription) + subscriptions := make(map[string]*nostr.Subscription) var wg sync.WaitGroup svc := &Service{ @@ -137,7 +138,7 @@ func NewService(ctx context.Context) (*Service, error) { // Create a copy of the loop variable to // avoid passing address of the same variable subscription := sub - go svc.startSubscription(svc.Ctx, &subscription) + go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent) } return svc, nil @@ -270,7 +271,7 @@ func (svc *Service) PublishHandler(c echo.Context) error { return c.JSON(http.StatusOK, PublishResponse{ EventId: requestData.SignedEvent.ID, RelayUrl: requestData.RelayUrl, - State: "PUBLISHED", + State: EVENT_PUBLISHED, }) } @@ -298,103 +299,153 @@ func (svc *Service) NIP47Handler(c echo.Context) error { } svc.Logger.WithFields(logrus.Fields{ - "eventId": requestData.SignedEvent.ID, - "walletPubkey": requestData.WalletPubkey, - "relayUrl": requestData.RelayUrl, - "webhookUrl": requestData.WebhookUrl, - }).Info("Checking for duplicate request event") + "requestEventId": requestData.SignedEvent.ID, + "walletPubkey": requestData.WalletPubkey, + "relayUrl": requestData.RelayUrl, + }).Info("Processing request event") + + if svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).First(&RequestEvent{}).RowsAffected != 0 { + return c.JSON(http.StatusBadRequest, NIP47Response{ + State: EVENT_ALREADY_PROCESSED, + }) + } - subscription := Subscription{} - requestEvent := RequestEvent{} - findRequestResult := svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).Find(&requestEvent) - - if findRequestResult.RowsAffected != 0 { - responseEvent := ResponseEvent{} - findResponseResult := svc.db.Where("request_id = ?", requestEvent.ID).Find(&responseEvent) - if findResponseResult.RowsAffected != 0 { - return c.JSON(http.StatusBadRequest, NIP47Response{ - Event: &nostr.Event{ - ID: responseEvent.NostrId, - CreatedAt: nostr.Timestamp(responseEvent.RepliedAt.Unix()), - Content: responseEvent.Content, - }, - State: "ALREADY_PUBLISHED", + requestEvent := RequestEvent{ + NostrId: requestData.SignedEvent.ID, + Content: requestData.SignedEvent.Content, + } + + if err := svc.db.Create(&requestEvent).Error; err != nil { + return c.JSON(http.StatusInternalServerError, ErrorResponse{ + Message: "Failed to store request event", + Error: err.Error(), + }) + } + + subscription := svc.prepareNIP47Subscription(NIP47WebhookRequest{ + RelayUrl: requestData.RelayUrl, + WalletPubkey: requestData.WalletPubkey, + SignedEvent: requestData.SignedEvent, + }, requestEvent) + + ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second) + defer cancel() + go svc.startSubscription(ctx, &subscription, svc.publishEvent, svc.handleResponseEvent) + + select { + case <-ctx.Done(): + svc.Logger.WithFields(logrus.Fields{ + "requestEventId": requestData.SignedEvent.ID, + "walletPubkey": requestData.WalletPubkey, + "relayUrl": requestData.RelayUrl, + }).Info("Stopped subscription without receiving event") + if ctx.Err() == context.DeadlineExceeded { + return c.JSON(http.StatusGatewayTimeout, ErrorResponse{ + Message: "Request timed out", + Error: ctx.Err().Error(), }) } + return c.JSON(http.StatusInternalServerError, ErrorResponse{ + Message: "Context cancelled", + Error: ctx.Err().Error(), + }) + case event := <-subscription.EventChan: + svc.Logger.WithFields(logrus.Fields{ + "requestEventId": requestData.SignedEvent.ID, + "responseEventId": event.ID, + "walletPubkey": requestData.WalletPubkey, + "relayUrl": requestData.RelayUrl, + }).Info("Received response event") + return c.JSON(http.StatusOK, NIP47Response{ + Event: event, + State: EVENT_PUBLISHED, + }) } +} - svc.Logger.WithFields(logrus.Fields{ - "eventId": requestData.SignedEvent.ID, - "walletPubkey": requestData.WalletPubkey, - "relayUrl": requestData.RelayUrl, - "webhookUrl": requestData.WebhookUrl, - }).Info("Storing request event and subscription") +func (svc *Service) NIP47WebhookHandler(c echo.Context) error { + var requestData NIP47WebhookRequest + if err := c.Bind(&requestData); err != nil { + return c.JSON(http.StatusBadRequest, ErrorResponse{ + Message: "Error decoding nip-47 request", + Error: err.Error(), + }) + } - subscription = Subscription{ - RelayUrl: requestData.RelayUrl, - WebhookUrl: requestData.WebhookUrl, - Open: true, - Authors: &[]string{requestData.WalletPubkey}, - Kinds: &[]int{NIP_47_RESPONSE_KIND}, - Tags: &nostr.TagMap{"e": []string{requestData.SignedEvent.ID}}, - Since: time.Now(), - Limit: 1, + if (requestData.WalletPubkey == "") { + return c.JSON(http.StatusBadRequest, ErrorResponse{ + Message: "Wallet pubkey is empty", + Error: "no wallet pubkey in request data", + }) } - err := svc.db.Create(&subscription).Error - if err != nil { - return c.JSON(http.StatusInternalServerError, ErrorResponse{ - Message: "Failed to store subscription", - Error: err.Error(), + if (requestData.SignedEvent == nil) { + return c.JSON(http.StatusBadRequest, ErrorResponse{ + Message: "Signed event is empty", + Error: "no signed event in request data", + }) + } + + if (requestData.WebhookUrl == "") { + return c.JSON(http.StatusBadRequest, ErrorResponse{ + Message: "Webhook URL is empty", + Error: "no webhook url in request data", }) } - requestEvent = RequestEvent{ + svc.Logger.WithFields(logrus.Fields{ + "requestEventId": requestData.SignedEvent.ID, + "walletPubkey": requestData.WalletPubkey, + "relayUrl": requestData.RelayUrl, + "webhookUrl": requestData.WebhookUrl, + }).Info("Processing request event") + + if svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).First(&RequestEvent{}).RowsAffected != 0 { + return c.JSON(http.StatusBadRequest, NIP47Response{ + State: EVENT_ALREADY_PROCESSED, + }) + } + + requestEvent := RequestEvent{ NostrId: requestData.SignedEvent.ID, Content: requestData.SignedEvent.Content, - SubscriptionId: subscription.ID, } - err = svc.db.Create(&requestEvent).Error - if err != nil { + if err := svc.db.Create(&requestEvent).Error; err != nil { return c.JSON(http.StatusInternalServerError, ErrorResponse{ Message: "Failed to store request event", Error: err.Error(), }) } - if subscription.WebhookUrl != "" { - go func() { - ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second) - defer cancel() - event, _, err := svc.processRequest(ctx, &subscription, &requestEvent, &requestData) - if err != nil { - svc.Logger.WithError(err).Error("Failed to process request for webhook") - // what to pass to the webhook? - return - } - svc.postEventToWebhook(event, requestData.WebhookUrl) - }() - return c.JSON(http.StatusOK, NIP47Response{ - State: "WEBHOOK_RECEIVED", - }) - } + subscription := svc.prepareNIP47Subscription(requestData, requestEvent) - ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second) + ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second) defer cancel() - event, httpStatusCode, err := svc.processRequest(ctx, &subscription, &requestEvent, &requestData) - if err != nil { - return c.JSON(httpStatusCode, ErrorResponse{ - Message: "Failed to process nip-47 request", - Error: err.Error(), - }) - } + + go svc.startSubscription(ctx, &subscription, svc.publishEvent, svc.handleResponseEvent) return c.JSON(http.StatusOK, NIP47Response{ - Event: event, - State: "PUBLISHED", + State: WEBHOOK_RECEIVED, }) } +func (svc *Service) prepareNIP47Subscription(requestData NIP47WebhookRequest, requestEvent RequestEvent) (Subscription) { + return Subscription{ + RelayUrl: requestData.RelayUrl, + WebhookUrl: requestData.WebhookUrl, + Open: true, + Authors: &[]string{requestData.WalletPubkey}, + Kinds: &[]int{NIP_47_RESPONSE_KIND}, + Tags: &nostr.TagMap{"e": []string{requestData.SignedEvent.ID}}, + Since: time.Now(), + Limit: 1, + RequestEvent: requestData.SignedEvent, + RequestEventDB: requestEvent, + EventChan: make(chan *nostr.Event, 1), + Uuid: uuid.New().String(), + } +} + func (svc *Service) NIP47NotificationHandler(c echo.Context) error { var requestData NIP47NotificationRequest // send in a pubkey and authenticate by signing @@ -430,16 +481,15 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error { RelayUrl: requestData.RelayUrl, WebhookUrl: requestData.WebhookUrl, Open: true, + Since: time.Now(), Authors: &[]string{requestData.WalletPubkey}, - Kinds: &[]int{23196}, + Kinds: &[]int{NIP_47_NOTIFICATION_KIND}, } - tags := new(nostr.TagMap) - *tags = make(nostr.TagMap) - (*tags)["p"] = []string{requestData.ConnPubkey} - - subscription.Tags = tags + tags := make(nostr.TagMap) + (tags)["p"] = []string{requestData.ConnPubkey} + subscription.Tags = &tags err := svc.db.Create(&subscription).Error @@ -450,7 +500,7 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error { }) } - go svc.startSubscription(svc.Ctx, &subscription) + go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent) return c.JSON(http.StatusOK, SubscriptionResponse{ SubscriptionId: subscription.Uuid, @@ -509,7 +559,7 @@ func (svc *Service) SubscriptionHandler(c echo.Context) error { }) } - go svc.startSubscription(svc.Ctx, &subscription) + go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent) return c.JSON(http.StatusOK, SubscriptionResponse{ SubscriptionId: subscription.Uuid, @@ -539,22 +589,15 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error { "subscriptionId": subscription.ID, }).Info("Stopping subscription") - svc.subscriptionsMutex.Lock() - sub, exists := svc.subscriptions[subscription.ID] - if exists { - sub.Unsub() - delete(svc.subscriptions, subscription.ID) - } - svc.subscriptionsMutex.Unlock() - - if (!exists && !subscription.Open) { + err := svc.stopSubscription(&subscription) + if err != nil { svc.Logger.WithFields(logrus.Fields{ "subscriptionId": subscription.ID, }).Info("Subscription is stopped already") return c.JSON(http.StatusAlreadyReported, StopSubscriptionResponse{ Message: "Subscription is already closed", - State: "ALREADY_CLOSED", + State: SUBSCRIPTION_ALREADY_CLOSED, }) } @@ -568,42 +611,45 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error { return c.JSON(http.StatusOK, StopSubscriptionResponse{ Message: "Subscription stopped successfully", - State: "CLOSED", + State: SUBSCRIPTION_CLOSED, }) } -func (svc *Service) startSubscription(ctx context.Context, subscription *Subscription) { +func (svc *Service) stopSubscription(subscription *Subscription) error { + svc.subscriptionsMutex.Lock() + sub, exists := svc.subscriptions[subscription.Uuid] + if exists { + sub.Unsub() + delete(svc.subscriptions, subscription.Uuid) + } + svc.subscriptionsMutex.Unlock() + + if (!exists && !subscription.Open) { + return errors.New(SUBSCRIPTION_ALREADY_CLOSED) + } + + return nil +} + +func (svc *Service) startSubscription(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) { svc.Logger.WithFields(logrus.Fields{ "subscriptionId": subscription.ID, }).Info("Starting subscription") - filter := nostr.Filter{ - Limit: subscription.Limit, - Search: subscription.Search, - } - if subscription.Ids != nil { - filter.IDs = *subscription.Ids - } - if subscription.Kinds != nil { - filter.Kinds = *subscription.Kinds - } - if subscription.Authors != nil { - filter.Authors = *subscription.Authors - } - if subscription.Tags != nil { - filter.Tags = *subscription.Tags - } - if !subscription.Since.IsZero() { - since := nostr.Timestamp(subscription.Since.Unix()) - filter.Since = &since - } - if !subscription.Until.IsZero() { - until := nostr.Timestamp(subscription.Until.Unix()) - filter.Until = &until - } + filter := svc.subscriptionToFilter(subscription) + + var relay *nostr.Relay + var isCustomRelay bool + var err error for { - relay, isCustomRelay, err := svc.getRelayConnection(ctx, subscription.RelayUrl) + // close relays with connection errors before connecting again + // because context expiration has no effect on relays + // TODO: Call relay.Connect on already initialized relays + if relay != nil && isCustomRelay { + relay.Close() + } + relay, isCustomRelay, err = svc.getRelayConnection(ctx, subscription.RelayUrl) if err != nil { // TODO: notify user about relay failure svc.Logger.WithError(err).WithFields(logrus.Fields{ @@ -614,81 +660,142 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri continue } - for { - sub, err := relay.Subscribe(ctx, []nostr.Filter{filter}) - if err != nil { - // TODO: notify user about subscription failure - svc.Logger.WithError(err).WithFields(logrus.Fields{ - "subscriptionId": subscription.ID, - "relayUrl": subscription.RelayUrl, - }).Error("Failed to subscribe to relay, retrying in 5s...") - time.Sleep(5 * time.Second) // sleep for 5 seconds - break - } + sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter}) + if err != nil { + // TODO: notify user about subscription failure + svc.Logger.WithError(err).WithFields(logrus.Fields{ + "subscriptionId": subscription.ID, + "relayUrl": subscription.RelayUrl, + }).Error("Failed to subscribe to relay, retrying in 5s...") + time.Sleep(5 * time.Second) // sleep for 5 seconds + continue + } - svc.subscriptionsMutex.Lock() - svc.subscriptions[subscription.ID] = sub - svc.subscriptionsMutex.Unlock() + svc.subscriptionsMutex.Lock() + svc.subscriptions[subscription.Uuid] = sub + svc.subscriptionsMutex.Unlock() - svc.Logger.WithFields(logrus.Fields{ - "subscriptionId": subscription.ID, - }).Info("Started subscription") + svc.Logger.WithFields(logrus.Fields{ + "subscriptionId": subscription.ID, + }).Info("Started subscription") - err = svc.processEvents(ctx, subscription, sub) - // closing relay as we reach here due to either - // halting subscription or relay error + err = svc.processEvents(ctx, subscription, onReceiveEOS, handleEvent) + + if err != nil { + // TODO: notify user about subscription failure + svc.Logger.WithError(err).WithFields(logrus.Fields{ + "subscriptionId": subscription.ID, + "relayUrl": subscription.RelayUrl, + }).Error("Subscription stopped due to relay error, reconnecting in 5s...") + time.Sleep(5 * time.Second) // sleep for 5 seconds + continue + } else { if isCustomRelay { relay.Close() } - - if err != nil { - // TODO: notify user about subscription failure - svc.Logger.WithError(err).WithFields(logrus.Fields{ - "subscriptionId": subscription.ID, - "relayUrl": subscription.RelayUrl, - }).Error("Subscription stopped due to relay error, reconnecting in 5s...") - time.Sleep(5 * time.Second) // sleep for 5 seconds - break - } else { - svc.Logger.WithFields(logrus.Fields{ - "subscriptionId": subscription.ID, - "relayUrl": subscription.RelayUrl, - }).Info("Stopping subscription") - return + // Save the request event state and stop the + // subscription if it's an NIP47 request + if (subscription.RequestEvent != nil) { + if (subscription.RequestEventDB.State == "") { + subscription.RequestEventDB.State = REQUEST_EVENT_PUBLISH_FAILED + } + svc.db.Save(&subscription.RequestEventDB) + // stop the subscription as it is one time + svc.stopSubscription(subscription) } + svc.Logger.WithFields(logrus.Fields{ + "subscriptionId": subscription.ID, + "relayUrl": subscription.RelayUrl, + }).Info("Stopping subscription") + break } } } -func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, sub *nostr.Subscription) error { - receivedEOS := false - // Do not process historic events - go func() { +func (svc *Service) publishEvent(ctx context.Context, subscription *Subscription) { + svc.subscriptionsMutex.Lock() + sub := svc.subscriptions[subscription.Uuid] + svc.subscriptionsMutex.Unlock() + err := sub.Relay.Publish(ctx, *subscription.RequestEvent) + if err != nil { + // TODO: notify user about publish failure + svc.Logger.WithError(err).WithFields(logrus.Fields{ + "subscriptionId": subscription.ID, + "relayUrl": subscription.RelayUrl, + }).Error("Failed to publish to relay") + sub.Unsub() + } else { + svc.Logger.WithFields(logrus.Fields{ + "status": REQUEST_EVENT_PUBLISH_CONFIRMED, + "eventId": subscription.RequestEvent.ID, + }).Info("Published request event successfully") + subscription.RequestEventDB.State = REQUEST_EVENT_PUBLISH_CONFIRMED + } +} + +func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscription) { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "eventKind": event.Kind, + "requestEventId": subscription.RequestEvent.ID, + }).Info("Received response event") + responseEvent := ResponseEvent{ + NostrId: event.ID, + Content: event.Content, + RepliedAt: event.CreatedAt.Time(), + RequestId: &subscription.RequestEventDB.ID, + } + svc.db.Save(&responseEvent) + if subscription.WebhookUrl != "" { + svc.postEventToWebhook(event, subscription.WebhookUrl) + } else { + subscription.EventChan <- event + } + svc.subscriptionsMutex.Lock() + svc.subscriptions[subscription.Uuid].Unsub() + svc.subscriptionsMutex.Unlock() +} + +func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subscription) { + svc.Logger.WithFields(logrus.Fields{ + "eventId": event.ID, + "eventKind": event.Kind, + "subscriptionId": subscription.ID, + }).Info("Received event") + responseEvent := ResponseEvent{ + NostrId: event.ID, + Content: event.Content, + RepliedAt: event.CreatedAt.Time(), + SubscriptionId: &subscription.ID, + } + svc.db.Save(&responseEvent) + svc.postEventToWebhook(event, subscription.WebhookUrl) +} + +func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error { + svc.subscriptionsMutex.Lock() + sub := svc.subscriptions[subscription.Uuid] + svc.subscriptionsMutex.Unlock() + + go func(){ + // block till EOS is received <-sub.EndOfStoredEvents svc.Logger.WithFields(logrus.Fields{ "subscriptionId": subscription.ID, }).Info("Received EOS") - receivedEOS = true - }() + + if (onReceiveEOS != nil) { + onReceiveEOS(ctx, subscription) + } - go func(){ + // loop through incoming events for event := range sub.Events { - if receivedEOS { - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "eventKind": event.Kind, - "subscriptionId": subscription.ID, - }).Info("Received event") - responseEvent := ResponseEvent{ - SubscriptionId: subscription.ID, - NostrId: event.ID, - Content: event.Content, - RepliedAt: event.CreatedAt.Time(), - } - svc.db.Save(&responseEvent) - svc.postEventToWebhook(event, subscription.WebhookUrl) - } + go handleEvent(event, subscription) } + + svc.Logger.WithFields(logrus.Fields{ + "subscriptionId": subscription.ID, + }).Info("Relay subscription events channel ended") }() select { @@ -726,98 +833,6 @@ func (svc *Service) getRelayConnection(ctx context.Context, customRelayURL strin } } -func (svc *Service) processRequest(ctx context.Context, subscription *Subscription, requestEvent *RequestEvent, requestData *NIP47Request) (*nostr.Event, int, error) { - publishState := REQUEST_EVENT_PUBLISH_FAILED - defer func() { - subscription.Open = false - requestEvent.State = publishState - svc.db.Save(subscription) - svc.db.Save(requestEvent) - }() - relay, isCustomRelay, err := svc.getRelayConnection(ctx, subscription.RelayUrl) - if err != nil { - return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error connecting to relay: %w", err) - } - if isCustomRelay { - defer relay.Close() - } - - since := nostr.Timestamp(subscription.Since.Unix()) - filter := nostr.Filter{ - Kinds: *subscription.Kinds, - Authors: *subscription.Authors, - Tags: *subscription.Tags, - Since: &since, - Limit: subscription.Limit, - Search: subscription.Search, - } - - if subscription.Ids != nil { - filter.IDs = *subscription.Ids - } - if !subscription.Until.IsZero() { - until := nostr.Timestamp(subscription.Until.Unix()) - filter.Until = &until - } - - svc.Logger.WithFields(logrus.Fields{ - "eventId": requestData.SignedEvent.ID, - "walletPubkey": requestData.WalletPubkey, - "relayUrl": requestData.RelayUrl, - "webhookUrl": requestData.WebhookUrl, - }).Info("Subscribing to the relay for response event") - - sub, err := relay.Subscribe(ctx, []nostr.Filter{filter}) - if err != nil { - return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error subscribing to relay: %w", err) - } - - svc.Logger.WithFields(logrus.Fields{ - "eventId": requestData.SignedEvent.ID, - "walletPubkey": requestData.WalletPubkey, - "relayUrl": requestData.RelayUrl, - "webhookUrl": requestData.WebhookUrl, - }).Info("Publishing request event to the relay") - - err = relay.Publish(ctx, *requestData.SignedEvent) - if err != nil { - return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error publishing request event: %w", err) - } - - if err == nil { - publishState = REQUEST_EVENT_PUBLISH_CONFIRMED - svc.Logger.WithFields(logrus.Fields{ - "status": publishState, - "eventId": requestEvent.ID, - }).Info("Published request event successfully") - } else { - svc.Logger.WithFields(logrus.Fields{ - "status": publishState, - "eventId": requestEvent.ID, - }).Info("Failed to publish request event") - return &nostr.Event{}, http.StatusBadRequest, fmt.Errorf("error publishing request event: %s", err.Error()) - } - - select { - case <-ctx.Done(): - return &nostr.Event{}, http.StatusRequestTimeout, fmt.Errorf("request canceled or timed out") - case event := <-sub.Events: - svc.Logger.WithFields(logrus.Fields{ - "eventId": event.ID, - "eventKind": event.Kind, - }).Infof("Successfully received event") - responseEvent := ResponseEvent{ - SubscriptionId: subscription.ID, - RequestId: &requestEvent.ID, - NostrId: event.ID, - Content: event.Content, - RepliedAt: event.CreatedAt.Time(), - } - svc.db.Save(&responseEvent) - return event, http.StatusOK, nil - } -} - func (svc *Service) postEventToWebhook(event *nostr.Event, webhookURL string) { eventData, err := json.Marshal(event) if err != nil { @@ -845,3 +860,31 @@ func (svc *Service) postEventToWebhook(event *nostr.Event, webhookURL string) { "webhookUrl": webhookURL, }).Infof("Successfully posted event to webhook") } + +func (svc *Service) subscriptionToFilter(subscription *Subscription) (*nostr.Filter){ + filter := nostr.Filter{ + Limit: subscription.Limit, + Search: subscription.Search, + } + if subscription.Ids != nil { + filter.IDs = *subscription.Ids + } + if subscription.Kinds != nil { + filter.Kinds = *subscription.Kinds + } + if subscription.Authors != nil { + filter.Authors = *subscription.Authors + } + if subscription.Tags != nil { + filter.Tags = *subscription.Tags + } + if !subscription.Since.IsZero() { + since := nostr.Timestamp(subscription.Since.Unix()) + filter.Since = &since + } + if !subscription.Until.IsZero() { + until := nostr.Timestamp(subscription.Until.Unix()) + filter.Until = &until + } + return &filter +} diff --git a/internal/nostr/nostr_test.go b/internal/nostr/nostr_test.go index 27e744b..0046c6d 100644 --- a/internal/nostr/nostr_test.go +++ b/internal/nostr/nostr_test.go @@ -77,7 +77,7 @@ func setupTestService() *Service { Wg: &wg, Logger: logger, Relay: relay, - subscriptions: make(map[uint]*nostr.Subscription), + subscriptions: make(map[string]*nostr.Subscription), } privateKey = nostr.GeneratePrivateKey()