Skip to content

Commit

Permalink
Chore: add sendgrid webhook endpoints to receive webhook events (#1150)
Browse files Browse the repository at this point in the history
* Chore: Add sendgrid webhook receiver to trigger workflow

Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey authored Jan 16, 2025
1 parent c1041f6 commit 6d035af
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 111 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pterm/pterm v0.12.79
github.com/rs/cors v1.11.1
github.com/sendgrid/sendgrid-go v3.16.0+incompatible
github.com/spf13/cobra v1.8.1
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sendgrid/sendgrid-go v3.16.0+incompatible h1:i8eE6IMkiCy7vusSdacHHSBUpXyTcTXy/Rl9N9aZ/Qw=
github.com/sendgrid/sendgrid-go v3.16.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4=
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/authz/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var staticRules = map[string][]string{
"GET /api/app-oauth/refresh/{id}",
"GET /api/app-oauth/callback/{id}",
"GET /api/app-oauth/get-token",

"POST /webhook/sendgrid",
},
AuthenticatedGroup: {
"/api/oauth/redirect/{service}",
Expand Down
45 changes: 45 additions & 0 deletions pkg/api/handlers/sendgrid/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package sendgrid

import (
"fmt"
"net/http"

"github.com/obot-platform/obot/apiclient/types"
"github.com/obot-platform/obot/pkg/api"
"github.com/obot-platform/obot/pkg/emailtrigger"
"github.com/sendgrid/sendgrid-go/helpers/inbound"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type InboundWebhookHandler struct {
emailTrigger *emailtrigger.EmailHandler
username string
password string
}

func NewInboundWebhookHandler(c kclient.Client, hostname string, username, password string) *InboundWebhookHandler {
emailTrigger := emailtrigger.EmailTrigger(c, hostname)
return &InboundWebhookHandler{emailTrigger: emailTrigger, username: username, password: password}
}

func (h *InboundWebhookHandler) InboundWebhookHandler(req api.Context) error {
if h.username != "" && h.password != "" {
username, password, ok := req.Request.BasicAuth()
if !ok || username != h.username || password != h.password {
return types.NewErrHttp(http.StatusUnauthorized, "Invalid credentials")
}
}

inboundEmail, err := inbound.Parse(req.Request)
if err != nil {
return types.NewErrHttp(http.StatusBadRequest, fmt.Sprintf("Failed to parse inbound email: %v", err))
}

subject := inboundEmail.Headers["Subject"]
if err := h.emailTrigger.Handler(req.Context(), inboundEmail.Envelope.From, inboundEmail.Envelope.To, subject, []byte(inboundEmail.TextBody)); err != nil {
return types.NewErrHttp(http.StatusInternalServerError, fmt.Sprintf("Failed to handle inbound email: %v", err))
}

req.WriteHeader(http.StatusOK)
return nil
}
6 changes: 6 additions & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/obot-platform/obot/pkg/api/handlers"
"github.com/obot-platform/obot/pkg/api/handlers/sendgrid"
"github.com/obot-platform/obot/pkg/services"
"github.com/obot-platform/obot/ui"
)
Expand Down Expand Up @@ -32,6 +33,8 @@ func Router(services *services.Services) (http.Handler, error) {
version := handlers.NewVersionHandler(services.EmailServerName, services.SupportDocker)
tables := handlers.NewTableHandler(services.GPTClient)

sendgridWebhookHandler := sendgrid.NewInboundWebhookHandler(services.StorageClient, services.EmailServerName, services.SendgridWebhookUsername, services.SendgridWebhookPassword)

// Version
mux.HandleFunc("GET /api/version", version.GetVersion)

Expand Down Expand Up @@ -264,6 +267,9 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("POST /api/webhooks/{id}/remove-token", webhooks.RemoveToken)
mux.HandleFunc("POST /api/webhooks/{namespace}/{id}", webhooks.Execute)

// Webhook for third party integration to trigger workflow
mux.HandleFunc("POST /api/sendgrid", sendgridWebhookHandler.InboundWebhookHandler)

// Email Receivers
mux.HandleFunc("POST /api/email-receivers", emailreceiver.Create)
mux.HandleFunc("GET /api/email-receivers", emailreceiver.List)
Expand Down
132 changes: 132 additions & 0 deletions pkg/emailtrigger/emailtrigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package emailtrigger

import (
"context"
"encoding/json"
"fmt"
"net/mail"
"path"
"strings"

"github.com/obot-platform/obot/logger"
"github.com/obot-platform/obot/pkg/alias"
v1 "github.com/obot-platform/obot/pkg/storage/apis/obot.obot.ai/v1"
"github.com/obot-platform/obot/pkg/system"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var log = logger.Package()

type EmailHandler struct {
c kclient.Client
hostname string
}

func EmailTrigger(c kclient.Client, hostname string) *EmailHandler {
return &EmailHandler{
c: c,
hostname: hostname,
}
}

func (h *EmailHandler) Handler(ctx context.Context, from string, to []string, subject string, data []byte) error {
for _, to := range to {
toAddr, err := mail.ParseAddress(to)
if err != nil {
return fmt.Errorf("parse to address: %w", err)
}

name, host, ok := strings.Cut(toAddr.Address, "@")
if !ok {
return fmt.Errorf("invalid to address: %s", toAddr.Address)
}

if host != h.hostname {
log.Infof("Skipping mail for %s: not for this host", toAddr.Address)
continue
}

name, ns, _ := strings.Cut(name, "+")
if ns == "" {
ns = system.DefaultNamespace
}

var emailReceiver v1.EmailReceiver
if err = alias.Get(ctx, h.c, &emailReceiver, ns, name); apierror.IsNotFound(err) {
log.Infof("Skipping mail for %s: no receiver found", toAddr.Address)
continue
} else if err != nil {
return fmt.Errorf("get email receiver: %w", err)
}

if !matches(from, emailReceiver) {
log.Infof("Skipping mail for %s: sender not allowed", toAddr.Address)
continue
}

if err = h.dispatchEmail(ctx, emailReceiver, string(data), from, to, subject); err != nil {
return fmt.Errorf("dispatch email: %w", err)
}
}

return nil
}

func (h *EmailHandler) dispatchEmail(ctx context.Context, email v1.EmailReceiver, body string, from, to, subject string) error {
var input struct {
Type string `json:"type"`
From string `json:"from"`
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}

input.Type = "email"
input.From = from
input.To = to
input.Subject = subject
input.Body = body

inputJSON, err := json.Marshal(input)
if err != nil {
return fmt.Errorf("marshal input: %w", err)
}

var workflow v1.Workflow
if err = alias.Get(ctx, h.c, &workflow, email.Namespace, email.Spec.Workflow); err != nil {
return err
}

return h.c.Create(ctx, &v1.WorkflowExecution{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WorkflowExecutionPrefix,
Namespace: workflow.Namespace,
},
Spec: v1.WorkflowExecutionSpec{
WorkflowName: workflow.Name,
EmailReceiverName: email.Name,
ThreadName: workflow.Spec.ThreadName,
Input: string(inputJSON),
},
})
}

func matches(address string, email v1.EmailReceiver) bool {
if len(email.Spec.AllowedSenders) == 0 {
return true
}

for _, allowedSender := range email.Spec.AllowedSenders {
if allowedSender == address {
return true
}
matched, _ := path.Match(allowedSender, address)
if matched {
return true
}
}

return false
}
10 changes: 10 additions & 0 deletions pkg/services/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type Config struct {
Docker bool `usage:"Enable Docker support" default:"false" env:"OBOT_DOCKER"`
EnvKeys []string `usage:"The environment keys to pass through to the GPTScript server" env:"OBOT_ENV_KEYS"`

// Sendgrid webhook
SendgridWebhookUsername string `usage:"The username for the sendgrid webhook to authenticate with"`
SendgridWebhookPassword string `usage:"The password for the sendgrid webhook to authenticate with"`

AuthConfig
GatewayConfig
services.Config
Expand All @@ -95,6 +99,10 @@ type Services struct {
ModelProviderDispatcher *dispatcher.Dispatcher
KnowledgeSetIngestionLimit int
SupportDocker bool

// Use basic auth for sendgrid webhook, if being set
SendgridWebhookUsername string
SendgridWebhookPassword string
}

const (
Expand Down Expand Up @@ -362,6 +370,8 @@ func New(ctx context.Context, config Config) (*Services, error) {
EmailServerName: config.EmailServerName,
ModelProviderDispatcher: modelProviderDispatcher,
SupportDocker: config.Docker,
SendgridWebhookUsername: config.SendgridWebhookUsername,
SendgridWebhookPassword: config.SendgridWebhookPassword,
}, nil
}

Expand Down
Loading

0 comments on commit 6d035af

Please sign in to comment.