diff --git a/go.mod b/go.mod index 1a16bd6..0953392 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/xmidt-org/retry v0.0.3 github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.5 - github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 + github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd github.com/xmidt-org/webpa-common/v2 v2.2.2 github.com/xmidt-org/wrp-go/v3 v3.2.3 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1 diff --git a/go.sum b/go.sum index 8c18465..fee356a 100644 --- a/go.sum +++ b/go.sum @@ -1712,8 +1712,8 @@ github.com/xmidt-org/touchstone v0.1.5 h1:Afm3P0EzCOWD1ITyVLsEDPVQkSE0t2ZhHyV+kO github.com/xmidt-org/touchstone v0.1.5/go.mod h1:Dz0fA1eWjm/2WrsdEeaQZMevkmfdYTsAbQfLaTrB8Eo= github.com/xmidt-org/urlegit v0.1.12 h1:qlwTgELD2ufKKH4vuioG/BWZ3293Cbx1f1viMDMaLV0= github.com/xmidt-org/urlegit v0.1.12/go.mod h1:wEEFUdBOEK3bQNb5LHLMfGnTtGn8WwEKgFPk8p6lhIM= -github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 h1:2Q7f7IVLXCckvmDjSSVEvUXcpHX1gfGkF9xg3K/X87c= -github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA= +github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd h1:AMGhtnmbuqzJVRW1uyxKRVRu9rEGJcUDReiRE+R835M= +github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA= github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8= diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 5c2a5d0..db236b5 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -37,11 +37,22 @@ type WebhookV1 struct { urls *ring.Ring CommonWebhook //TODO: need to determine best way to add client and client middleware to WebhooV1 - // client http.Client + // client http.Client // clientMiddleware func(http.Client) http.Client } -type Webhooks []*WebhookV1 +type WebhookV2 struct { + urls *ring.Ring + client *http.Client + CommonWebhook + //TODO: need to determine best way to add client and client middleware to WebhooV1 + // clientMiddleware func(http.Client) http.Client +} +type WebhookSink struct { + webooks map[string]*WebhookV2 + Hash *HashRing + HashField string +} type CommonWebhook struct { id string failureUrl string @@ -63,31 +74,72 @@ type Kafka struct { CommonWebhook } -func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { +func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error) { switch l := listener.(type) { case *ancla.RegistryV1: v1 := &WebhookV1{} v1.Update(c, logger, l.Registration.Config.AlternativeURLs, l.GetId(), l.Registration.FailureURL, l.Registration.Config.ReceiverURL) - return v1 + return v1, nil case *ancla.RegistryV2: + if len(l.Registration.Kafkas) == 0 && len(l.Registration.Webhooks) == 0 { + return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used") + } else if len(l.Registration.Kafkas) > 0 && len(l.Registration.Webhooks) > 0 { + return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used but not both") + } + if len(l.Registration.Webhooks) > 0 { - var whs Webhooks - for _, wh := range l.Registration.Webhooks { - v1 := &WebhookV1{} - v1.Update(c, logger, wh.ReceiverURLs[1:], l.GetId(), l.Registration.FailureURL, wh.ReceiverURLs[0]) - whs = append(whs, v1) + var whs WebhookSink + r := &HashRing{} + whs.HashField = l.Registration.Hash.Field + for i, wh := range l.Registration.Webhooks { + v2 := &WebhookV2{ + CommonWebhook: CommonWebhook{ + id: l.GetId(), + logger: logger, + failureUrl: l.Registration.FailureURL, + deliveryInterval: c.DeliveryInterval, + deliveryRetries: c.DeliveryRetries, + }, + client: &http.Client{}, //TODO: do we want expose configuration for this client via config? + } + if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 { + return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used") + } else if len(wh.ReceiverURLs) > 0 && len(wh.DNSSrvRecord.FQDNs) > 0 { + return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used but not both") + } + + if len(wh.ReceiverURLs) > 0 { + urlCount, err := getUrls(wh.ReceiverURLs[1:]) + if err != nil { + return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) + } + v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:]) + } + + transport, err := NewSRVRecordDialer(wh.DNSSrvRecord) + if err != nil { + return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) + } + + v2.client.Transport = transport + whs.webooks[strconv.Itoa(i)] = v2 + if l.Registration.Hash.Field != "" { + r.Add(strconv.Itoa(i)) + } } - return whs + + return whs, nil } + if len(l.Registration.Kafkas) > 0 { var sink KafkaSink r := &HashRing{} sink.HashField = l.Registration.Hash.Field for i, k := range l.Registration.Kafkas { kafka := &Kafka{} - err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) + err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable if err != nil { - return nil + return nil, err } sink.Kafkas[strconv.Itoa(i)] = kafka if l.Registration.Hash.Field != "" { @@ -95,13 +147,14 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { } } + sink.Hash = r - return sink + + return sink, nil } - default: - return nil } - return nil + + return nil, fmt.Errorf("unknown webhook registry type") } func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { @@ -162,18 +215,6 @@ func getUrls(urls []string) (int, error) { } -func (whs Webhooks) Send(secret, acceptType string, msg *wrp.Message) error { - var errs error - - for _, wh := range whs { - err := wh.Send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) - } - } - return errs -} - // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error { @@ -585,3 +626,203 @@ func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) { }) } } + +func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) { + v2.mutex.Lock() + defer v2.mutex.Unlock() + + if urlCount == 0 { + v2.urls = ring.New(1) + v2.urls.Value = url + } else { + ring := ring.New(urlCount) + for i := 0; i < urlCount; i++ { + ring.Value = urls[i] + ring = ring.Next() + } + v2.urls = ring + } + + // Randomize where we start so all the instances don't synchronize + rand := rand.New(rand.NewSource(time.Now().UnixNano())) + offset := rand.Intn(v2.urls.Len()) + for 0 < offset { + v2.urls = v2.urls.Next() + offset-- + } +} + +func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error { + var errs error + if len(*whs.Hash) == len(whs.webooks) { + //TODO: flush out the error handling for kafka + if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok { + err := v2.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + } else { + //TODO: discuss with wes and john the default hashing logic + //for now: when no hash is given we will just loop through all the kafkas + for _, v2 := range whs.webooks { + err := v2.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + } + return errs +} + +// worker is the routine that actually takes the queued messages and delivers +// them to the listeners outside webpa +func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error { + defer func() { + if r := recover(); nil != r { + // s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) + v2.logger.Error("goroutine send() panicked", zap.String("id", v2.id), zap.Any("panic", r)) + } + // s.workers.Release() + // s.currentWorkersGauge.Add(-1.0) + }() + + //TODO: is there a reason we are setting it up like this? + payload := msg.Payload + body := payload + var payloadReader *bytes.Reader + + // Use the internal content type unless the accept type is wrp + contentType := msg.ContentType + switch acceptType { + case "wrp", wrp.MimeTypeMsgpack, wrp.MimeTypeWrp: + // WTS - We should pass the original, raw WRP event instead of + // re-encoding it. + contentType = wrp.MimeTypeMsgpack + buffer := bytes.NewBuffer([]byte{}) + encoder := wrp.NewEncoder(buffer, wrp.Msgpack) + encoder.Encode(msg) + body = buffer.Bytes() + } + payloadReader = bytes.NewReader(body) + + req, err := http.NewRequest("POST", v2.urls.Value.(string), payloadReader) + if err != nil { + // Report drop + // s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}).Add(1.0) + v2.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, v2.urls.Value.(string)), zap.String("id", v2.id), zap.Error(err)) + return err + } + + req.Header.Set("Content-Type", contentType) + + // Add x-Midt-* headers + wrphttp.AddMessageHeaders(req.Header, msg) + + // Provide the old headers for now + req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) + req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) + + // Add the device id without the trailing service + id, _ := wrp.ParseDeviceID(msg.Source) + req.Header.Set("X-Webpa-Device-Id", string(id)) + req.Header.Set("X-Webpa-Device-Name", string(id)) + + // Apply the secret + + if secret != "" { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(body) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } + + // find the event "short name" + event := msg.FindEventStringSubMatch() + + // Send it + v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) + + client, _ := retryhttp.NewClient( + retryhttp.WithHTTPClient(v2.client), + retryhttp.WithRunner(v2.addRunner(req, event)), + retryhttp.WithRequesters(v2.updateRequest(v2.urls)), + ) + resp, err := client.Do(req) + + var deliveryCounterLabels []string + code := metrics.MessageDroppedCode + reason := metrics.NoErrReason + logger := v2.logger + if err != nil { + // Report failure + //TODO: add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) to webhook metrics and remove from sink sender? + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).Add(1.0) + reason = metrics.GetDoErrReason(err) + if resp != nil { + code = strconv.Itoa(resp.StatusCode) + } + + logger = v2.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err)) + deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} + fmt.Print(deliveryCounterLabels) + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1) + logger.Error("Dropped Network Error", zap.Error(err)) + return err + } else { + // Report Result + code = strconv.Itoa(resp.StatusCode) + + // read until the response is complete before closing to allow + // connection reuse + if resp.Body != nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + + deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} + } + fmt.Print(deliveryCounterLabels) + + //TODO: do we add deliveryCounter to webhook metrics and remove from sink sender? + // v1.deliveryCounter.With(prometheus.Labels{deliveryCounterLabels}).Add(1.0) + logger.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String(metrics.CodeLabel, code), zap.String(metrics.UrlLabel, req.URL.String())) + return nil +} + +func (v2 *WebhookV2) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { + runner, _ := retry.NewRunner[*http.Response]( + retry.WithPolicyFactory[*http.Response](retry.Config{ + Interval: v2.deliveryInterval, + MaxRetries: v2.deliveryRetries, + }), + retry.WithOnAttempt[*http.Response](v2.onAttempt(request, event)), + ) + return runner +} + +func (v2 *WebhookV2) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { + return func(request *http.Request) *http.Request { + urls = urls.Next() + tmp, err := url.Parse(urls.Value.(string)) + if err != nil { + v2.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err)) + //TODO: do we add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) metric to webhook and remove from sink sender? + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1) + } + request.URL = tmp + return request + } +} + +func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { + + return func(attempt retry.Attempt[*http.Response]) { + if attempt.Retries > 0 { + fmt.Print(event) + // s.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0) + v2.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) + } + + } +} diff --git a/internal/sink/sinkSender.go b/internal/sink/sinkSender.go index cb1d397..4f62584 100644 --- a/internal/sink/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -135,8 +135,12 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) { func (s *Sender) Update(l ancla.Register) (err error) { s.matcher, err = NewMatcher(l, s.logger) - s.sink = NewSink(s.config, s.logger, l) + sink, err := NewSink(s.config, s.logger, l) + if err != nil { + return err + } + s.sink = sink s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix())) s.mutex.Lock() diff --git a/internal/sink/srvRecordDialer.go b/internal/sink/srvRecordDialer.go new file mode 100644 index 0000000..97098fd --- /dev/null +++ b/internal/sink/srvRecordDialer.go @@ -0,0 +1,80 @@ +package sink + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "sort" + + "github.com/xmidt-org/webhook-schema" +) + +func NewSRVRecordDialer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) { + if len(dnsSrvRecord.FQDNs) == 0 { + return http.DefaultTransport, nil + } + + d := SRVRecordDialer{dnsSrvRecord: dnsSrvRecord} + + var errs error + for _, fqdn := range d.dnsSrvRecord.FQDNs { + _, addrs, err := net.LookupSRV("", "", fqdn) + if err != nil { + errs = errors.Join(errs, + fmt.Errorf("srv lookup failure: `%s`", fqdn), + err, + ) + continue + } + + d.srvs = append(d.srvs, addrs...) + } + + // TODO: ask wes/john whether 1 or more net.LookupSRV error should trigger an error from NewSRVRecordDailer + if len(d.srvs) == 0 { + return nil, errors.Join(fmt.Errorf("expected atleast 1 srv record from fqdn list `%v`", d.dnsSrvRecord.FQDNs), errs) + } + + switch d.dnsSrvRecord.LoadBalancingScheme { + case "weight": + sort.Slice(d.srvs, func(i, j int) bool { + return d.srvs[i].Weight > d.srvs[j].Weight + }) + case "priortiy": + sort.Slice(d.srvs, func(i, j int) bool { + return d.srvs[i].Priority < d.srvs[j].Priority + }) + default: + return nil, fmt.Errorf("unknown loadBalancingScheme type: %s", d.dnsSrvRecord.LoadBalancingScheme) + } + + return &http.Transport{ + DialContext: (&d).DialContext, + }, nil + +} + +type SRVRecordDialer struct { + srvs []*net.SRV + dnsSrvRecord webhook.DNSSrvRecord +} + +func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) { + var errs error + for _, addr := range d.srvs { + host := net.JoinHostPort(addr.Target, string(addr.Port)) + conn, err := net.Dial("", host) + if err != nil { + errs = errors.Join(errs, + fmt.Errorf("%v: host `%s` [weight: %d, priortiy: %d] from srv record `%v`", + err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs)) + continue + } + + return conn, nil + } + + return nil, errs +}