From a8ae0ac93f619e50b651e7541508ab43b93a3d0f Mon Sep 17 00:00:00 2001 From: maura fortino Date: Wed, 11 Dec 2024 11:26:28 -0500 Subject: [PATCH] added logic for the v2 webhooks including srv lookup and hashing --- internal/sink/sink.go | 326 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 312 insertions(+), 14 deletions(-) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 5c2a5d0..f3ebf60 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -5,6 +5,7 @@ package sink import ( "bytes" "container/ring" + "context" "crypto/hmac" "crypto/sha1" "encoding/hex" @@ -12,8 +13,10 @@ import ( "fmt" "io" "math/rand" + "net" "net/http" "net/url" + "sort" "strconv" "strings" "sync" @@ -37,11 +40,22 @@ type WebhookV1 struct { urls *ring.Ring CommonWebhook //TODO: need to determine best way to add client and client middleware to WebhooV1 - // client http.Client + // client http.Client // clientMiddleware func(http.Client) http.Client } -type Webhooks []*WebhookV1 +type WebhookV2 struct { + urls *ring.Ring + dialers []*Dialer + CommonWebhook + //TODO: need to determine best way to add client and client middleware to WebhooV1 + // clientMiddleware func(http.Client) http.Client +} +type WebhookSink struct { + webooks map[string]*WebhookV2 + Hash *HashRing + HashField string +} type CommonWebhook struct { id string failureUrl string @@ -71,12 +85,43 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { return v1 case *ancla.RegistryV2: if len(l.Registration.Webhooks) > 0 { - var whs Webhooks - for _, wh := range l.Registration.Webhooks { - v1 := &WebhookV1{} - v1.Update(c, logger, wh.ReceiverURLs[1:], l.GetId(), l.Registration.FailureURL, wh.ReceiverURLs[0]) - whs = append(whs, v1) + var whs WebhookSink + r := &HashRing{} + whs.HashField = l.Registration.Hash.Field + for i, wh := range l.Registration.Webhooks { + v2 := &WebhookV2{ + CommonWebhook: CommonWebhook{ + id: l.GetId(), + logger: logger, + failureUrl: l.Registration.FailureURL, + deliveryInterval: c.DeliveryInterval, + deliveryRetries: c.DeliveryRetries, + }, + } + if len(wh.ReceiverURLs) > 0 { + urlCount, err := getUrls(wh.ReceiverURLs[1:]) + if err != nil { + v2.logger.Error("error recevied parsing urls", zap.Error(err)) + } + v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:]) + } else { + sortBy := wh.DNSSrvRecord.LoadBalancingScheme + for _, domain := range wh.DNSSrvRecord.FQDNs { + dialer := new(Dialer) + err := dialer.lookup(domain, sortBy) + if err != nil { + v2.logger.Error("error received looking up service records", zap.Error(err)) + } + dialer.NewClient() + v2.dialers = append(v2.dialers, dialer) + } + } + whs.webooks[strconv.Itoa(i)] = v2 + if l.Registration.Hash.Field != "" { + r.Add(strconv.Itoa(i)) + } } + return whs } if len(l.Registration.Kafkas) > 0 { @@ -85,7 +130,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { sink.HashField = l.Registration.Hash.Field for i, k := range l.Registration.Kafkas { kafka := &Kafka{} - err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) + err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable if err != nil { return nil } @@ -162,13 +207,24 @@ func getUrls(urls []string) (int, error) { } -func (whs Webhooks) Send(secret, acceptType string, msg *wrp.Message) error { +func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error { var errs error - - for _, wh := range whs { - err := wh.Send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) + if len(*whs.Hash) == len(whs.webooks) { + //TODO: flush out the error handling for kafka + if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok { + err := v2.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + } else { + //TODO: discuss with wes and john the default hashing logic + //for now: when no hash is given we will just loop through all the kafkas + for _, v2 := range whs.webooks { + err := v2.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } } } return errs @@ -585,3 +641,245 @@ func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) { }) } } + +func (v2 *WebhookV2) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { + //TODO: do we need to return an error if not - we should get rid of the error return + v2.id = id + v2.failureUrl = failureUrl + v2.deliveryInterval = c.DeliveryInterval + v2.deliveryRetries = c.DeliveryRetries + v2.logger = l + + urlCount, err := getUrls(altUrls) + if err != nil { + l.Error("error recevied parsing urls", zap.Error(err)) + } + v2.updateUrls(urlCount, receiverUrl, altUrls) + +} + +func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) { + v2.mutex.Lock() + defer v2.mutex.Unlock() + + if urlCount == 0 { + v2.urls = ring.New(1) + v2.urls.Value = url + } else { + ring := ring.New(urlCount) + for i := 0; i < urlCount; i++ { + ring.Value = urls[i] + ring = ring.Next() + } + v2.urls = ring + } + + // Randomize where we start so all the instances don't synchronize + rand := rand.New(rand.NewSource(time.Now().UnixNano())) + offset := rand.Intn(v2.urls.Len()) + for 0 < offset { + v2.urls = v2.urls.Next() + offset-- + } +} + +// worker is the routine that actually takes the queued messages and delivers +// them to the listeners outside webpa +func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error { + defer func() { + if r := recover(); nil != r { + // s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) + v2.logger.Error("goroutine send() panicked", zap.String("id", v2.id), zap.Any("panic", r)) + } + // s.workers.Release() + // s.currentWorkersGauge.Add(-1.0) + }() + + //TODO: is there a reason we are setting it up like this? + payload := msg.Payload + body := payload + var payloadReader *bytes.Reader + + // Use the internal content type unless the accept type is wrp + contentType := msg.ContentType + switch acceptType { + case "wrp", wrp.MimeTypeMsgpack, wrp.MimeTypeWrp: + // WTS - We should pass the original, raw WRP event instead of + // re-encoding it. + contentType = wrp.MimeTypeMsgpack + buffer := bytes.NewBuffer([]byte{}) + encoder := wrp.NewEncoder(buffer, wrp.Msgpack) + encoder.Encode(msg) + body = buffer.Bytes() + } + payloadReader = bytes.NewReader(body) + + req, err := http.NewRequest("POST", v2.urls.Value.(string), payloadReader) + if err != nil { + // Report drop + // s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}).Add(1.0) + v2.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, v2.urls.Value.(string)), zap.String("id", v2.id), zap.Error(err)) + return err + } + + req.Header.Set("Content-Type", contentType) + + // Add x-Midt-* headers + wrphttp.AddMessageHeaders(req.Header, msg) + + // Provide the old headers for now + req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) + req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) + + // Add the device id without the trailing service + id, _ := wrp.ParseDeviceID(msg.Source) + req.Header.Set("X-Webpa-Device-Id", string(id)) + req.Header.Set("X-Webpa-Device-Name", string(id)) + + // Apply the secret + + if secret != "" { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(body) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } + + // find the event "short name" + event := msg.FindEventStringSubMatch() + + // Send it + v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + + var resp *http.Response + if len(v2.dialers) != 0 { + for _, d := range v2.dialers { + resp, err = d.client.Do(req) + } + + } else { + client, _ := retryhttp.NewClient( + // retryhttp.WithHTTPClient(s.clientMiddleware(s.client)), + retryhttp.WithRunner(v2.addRunner(req, event)), + retryhttp.WithRequesters(v2.updateRequest(v2.urls)), + ) + resp, err = client.Do(req) + } + + var deliveryCounterLabels []string + code := metrics.MessageDroppedCode + reason := metrics.NoErrReason + logger := v2.logger + if err != nil { + // Report failure + //TODO: add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) to webhook metrics and remove from sink sender? + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).Add(1.0) + reason = metrics.GetDoErrReason(err) + if resp != nil { + code = strconv.Itoa(resp.StatusCode) + } + + logger = v2.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err)) + deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} + fmt.Print(deliveryCounterLabels) + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1) + logger.Error("Dropped Network Error", zap.Error(err)) + return err + } else { + // Report Result + code = strconv.Itoa(resp.StatusCode) + + // read until the response is complete before closing to allow + // connection reuse + if resp.Body != nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + + deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} + } + fmt.Print(deliveryCounterLabels) + + //TODO: do we add deliveryCounter to webhook metrics and remove from sink sender? + // v1.deliveryCounter.With(prometheus.Labels{deliveryCounterLabels}).Add(1.0) + logger.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String(metrics.CodeLabel, code), zap.String(metrics.UrlLabel, req.URL.String())) + return nil +} + +func (v2 *WebhookV2) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { + runner, _ := retry.NewRunner[*http.Response]( + retry.WithPolicyFactory[*http.Response](retry.Config{ + Interval: v2.deliveryInterval, + MaxRetries: v2.deliveryRetries, + }), + retry.WithOnAttempt[*http.Response](v2.onAttempt(request, event)), + ) + return runner +} + +func (v2 *WebhookV2) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { + return func(request *http.Request) *http.Request { + urls = urls.Next() + tmp, err := url.Parse(urls.Value.(string)) + if err != nil { + v2.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err)) + //TODO: do we add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) metric to webhook and remove from sink sender? + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1) + } + request.URL = tmp + return request + } +} + +func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { + + return func(attempt retry.Attempt[*http.Response]) { + if attempt.Retries > 0 { + fmt.Print(event) + // s.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0) + v2.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + } + + } +} + +type Dialer struct { + srvs []*net.SRV + client *http.Client +} + +type DialContext func(ctx context.Context, network, address string) (net.Conn, error) + +func (d *Dialer) CustomDial() DialContext { + return func(ctx context.Context, network, address string) (net.Conn, error) { + return net.Dial(network, address) + } +} + +func (d *Dialer) NewClient() { + d.client = &http.Client{ + Transport: &http.Transport{ + DialContext: d.CustomDial(), + }, + } +} + +func (d *Dialer) lookup(domain, sortBy string) error { + _, addrs, err := net.LookupSRV("", "", domain) + if err != nil { + return err + } + + if sortBy == "weight" { + sort.Slice(addrs, func(i, j int) bool { + return addrs[i].Weight > addrs[j].Weight + }) + } else { + sort.Slice(addrs, func(i, j int) bool { + return addrs[i].Priority < addrs[j].Priority + }) + } + + d.srvs = addrs + return nil +}