Skip to content

Commit

Permalink
Chore: add sendgrid webhook endpoints to receive webhook events
Browse files Browse the repository at this point in the history
Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey committed Jan 8, 2025
1 parent 85ab705 commit 6663101
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 128 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sendgrid/rest v2.6.9+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.16.0+incompatible // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/skeema/knownhosts v1.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sendgrid/rest v2.6.9+incompatible h1:1EyIcsNdn9KIisLW50MKwmSRSK+ekueiEMJ7NEoxJo0=
github.com/sendgrid/rest v2.6.9+incompatible/go.mod h1:kXX7q3jZtJXK5c5qK83bSGMdV6tsOE70KbHoqJls4lE=
github.com/sendgrid/sendgrid-go v3.16.0+incompatible h1:i8eE6IMkiCy7vusSdacHHSBUpXyTcTXy/Rl9N9aZ/Qw=
github.com/sendgrid/sendgrid-go v3.16.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4=
Expand Down
40 changes: 40 additions & 0 deletions pkg/api/handlers/sendgrid/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package sendgrid

import (
"fmt"
"net/http"

"github.com/obot-platform/obot/apiclient/types"
"github.com/obot-platform/obot/pkg/api"
"github.com/obot-platform/obot/pkg/emailtrigger"
"github.com/sendgrid/sendgrid-go/helpers/inbound"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type InboundWebhookHandler struct {
emailTrigger *emailtrigger.EmailHandler
}

func NewInboundWebhookHandler(c kclient.Client, hostname string) *InboundWebhookHandler {
emailTrigger := emailtrigger.EmailTrigger(c, hostname)
return &InboundWebhookHandler{emailTrigger: emailTrigger}
}

func (h *InboundWebhookHandler) InboundWebhookHandler(req api.Context) error {
if req.Request.Method != http.MethodPost {
return types.NewErrHttp(http.StatusMethodNotAllowed, "Invalid request method")
}

inboundEmail, err := inbound.Parse(req.Request)
if err != nil {
return types.NewErrHttp(http.StatusBadRequest, fmt.Sprintf("Failed to parse inbound email: %v", err))
}

subject := inboundEmail.Headers["Subject"]
if err := h.emailTrigger.Handler(req.Context(), inboundEmail.Envelope.From, inboundEmail.Envelope.To, subject, []byte(inboundEmail.TextBody)); err != nil {
return types.NewErrHttp(http.StatusInternalServerError, fmt.Sprintf("Failed to handle inbound email: %v", err))
}

req.WriteHeader(http.StatusOK)
return nil
}
6 changes: 6 additions & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/obot-platform/obot/pkg/api/handlers"
"github.com/obot-platform/obot/pkg/api/handlers/sendgrid"
"github.com/obot-platform/obot/pkg/services"
"github.com/obot-platform/obot/ui"
)
Expand Down Expand Up @@ -31,6 +32,8 @@ func Router(services *services.Services) (http.Handler, error) {
version := handlers.NewVersionHandler(services.EmailServerName, services.SupportDocker)
tables := handlers.NewTableHandler(services.GPTClient)

sendgridWebhookHandler := sendgrid.NewInboundWebhookHandler(services.StorageClient, services.EmailServerName)

// Version
mux.HandleFunc("GET /api/version", version.GetVersion)

Expand Down Expand Up @@ -256,6 +259,9 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("POST /api/webhooks/{id}/remove-token", webhooks.RemoveToken)
mux.HandleFunc("POST /api/webhooks/{namespace}/{id}", webhooks.Execute)

// Webhook for third party integration to trigger workflow
mux.HandleFunc("POST /webhook/sendgrid", sendgridWebhookHandler.InboundWebhookHandler)

// Email Receivers
mux.HandleFunc("POST /api/email-receivers", emailreceiver.Create)
mux.HandleFunc("GET /api/email-receivers", emailreceiver.List)
Expand Down
50 changes: 35 additions & 15 deletions pkg/api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,25 @@ import (
"github.com/obot-platform/obot/pkg/proxy"
"github.com/obot-platform/obot/pkg/storage"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apiserver/pkg/authentication/user"
)

// authDisabledPaths is a list of paths that do not require authentication.
var (
authDisabledPaths = []string{
"/webhook",
}
)

func isAuthDisabledPath(path string) bool {
for _, p := range authDisabledPaths {
if strings.HasPrefix(path, p) {
return true
}
}
return false
}

type Server struct {
storageClient storage.Client
gptClient *gptscript.GPTScript
Expand Down Expand Up @@ -69,26 +86,29 @@ func (s *Server) wrap(f api.HandlerFunc) http.HandlerFunc {
rw.Header().Set("Pragma", "no-cache")
rw.Header().Set("Expires", "0")

user, err := s.authenticator.Authenticate(req)
if err != nil {
http.Error(rw, err.Error(), http.StatusUnauthorized)
return
}

isOAuthPath := strings.HasPrefix(req.URL.Path, "/oauth2/")
if isOAuthPath || strings.HasPrefix(req.URL.Path, "/api/") && !s.authorizer.Authorize(req, user) {
// If this is not a request coming from browser or the proxy is not enabled, then return 403.
if !isOAuthPath && (s.proxyServer == nil || req.Method != http.MethodGet || slices.Contains(user.GetGroups(), authz.AuthenticatedGroup) || !strings.Contains(strings.ToLower(req.UserAgent()), "mozilla")) {
http.Error(rw, "forbidden", http.StatusForbidden)
var user user.Info
if !isAuthDisabledPath(req.URL.Path) {
user, err := s.authenticator.Authenticate(req)
if err != nil {
http.Error(rw, err.Error(), http.StatusUnauthorized)
return
}

req.Header.Set("X-Otto-Auth-Required", "true")
s.proxyServer.ServeHTTP(rw, req)
return
isOAuthPath := strings.HasPrefix(req.URL.Path, "/oauth2/")
if isOAuthPath || strings.HasPrefix(req.URL.Path, "/api/") && !s.authorizer.Authorize(req, user) {
// If this is not a request coming from browser or the proxy is not enabled, then return 403.
if !isOAuthPath && (s.proxyServer == nil || req.Method != http.MethodGet || slices.Contains(user.GetGroups(), authz.AuthenticatedGroup) || !strings.Contains(strings.ToLower(req.UserAgent()), "mozilla")) {
http.Error(rw, "forbidden", http.StatusForbidden)
return
}

req.Header.Set("X-Otto-Auth-Required", "true")
s.proxyServer.ServeHTTP(rw, req)
return
}
}

err = f(api.Context{
err := f(api.Context{
ResponseWriter: rw,
Request: req,
GPTClient: s.gptClient,
Expand Down
132 changes: 132 additions & 0 deletions pkg/emailtrigger/emailtrigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package emailtrigger

import (
"context"
"encoding/json"
"fmt"
"net/mail"
"path"
"strings"

"github.com/obot-platform/obot/logger"
"github.com/obot-platform/obot/pkg/alias"
v1 "github.com/obot-platform/obot/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/obot-platform/obot/pkg/system"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var log = logger.Package()

type EmailHandler struct {
c kclient.Client
hostname string
}

func EmailTrigger(c kclient.Client, hostname string) *EmailHandler {
return &EmailHandler{
c: c,
hostname: hostname,
}
}

func (h *EmailHandler) Handler(ctx context.Context, from string, to []string, subject string, data []byte) error {
for _, to := range to {
toAddr, err := mail.ParseAddress(to)
if err != nil {
return fmt.Errorf("parse to address: %w", err)
}

name, host, ok := strings.Cut(toAddr.Address, "@")
if !ok {
return fmt.Errorf("invalid to address: %s", toAddr.Address)
}

if host != h.hostname {
log.Infof("Skipping mail for %s: not for this host", toAddr.Address)
continue
}

name, ns, _ := strings.Cut(name, "+")
if ns == "" {
ns = system.DefaultNamespace
}

var emailReceiver v1.EmailReceiver
if err = alias.Get(ctx, h.c, &emailReceiver, ns, name); apierror.IsNotFound(err) {
log.Infof("Skipping mail for %s: no receiver found", toAddr.Address)
continue
} else if err != nil {
return fmt.Errorf("get email receiver: %w", err)
}

if !matches(from, emailReceiver) {
log.Infof("Skipping mail for %s: sender not allowed", toAddr.Address)
continue
}

if err = h.dispatchEmail(ctx, emailReceiver, string(data), from, to, subject); err != nil {
return fmt.Errorf("dispatch email: %w", err)
}
}

return nil
}

func (h *EmailHandler) dispatchEmail(ctx context.Context, email v1.EmailReceiver, body string, from, to, subject string) error {
var input struct {
Type string `json:"type"`
From string `json:"from"`
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}

input.Type = "email"
input.From = from
input.To = to
input.Subject = subject
input.Body = body

inputJSON, err := json.Marshal(input)
if err != nil {
return fmt.Errorf("marshal input: %w", err)
}

var workflow v1.Workflow
if err = alias.Get(ctx, h.c, &workflow, email.Namespace, email.Spec.Workflow); err != nil {
return err
}

return h.c.Create(ctx, &v1.WorkflowExecution{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WorkflowExecutionPrefix,
Namespace: workflow.Namespace,
},
Spec: v1.WorkflowExecutionSpec{
WorkflowName: workflow.Name,
EmailReceiverName: email.Name,
ThreadName: workflow.Spec.ThreadName,
Input: string(inputJSON),
},
})
}

func matches(address string, email v1.EmailReceiver) bool {
if len(email.Spec.AllowedSenders) == 0 {
return true
}

for _, allowedSender := range email.Spec.AllowedSenders {
if allowedSender == address {
return true
}
matched, _ := path.Match(allowedSender, address)
if matched {
return true
}
}

return false
}
Loading

0 comments on commit 6663101

Please sign in to comment.