diff --git a/go.mod b/go.mod index 1a16bd6..0953392 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/xmidt-org/retry v0.0.3 github.com/xmidt-org/sallust v0.2.2 github.com/xmidt-org/touchstone v0.1.5 - github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 + github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd github.com/xmidt-org/webpa-common/v2 v2.2.2 github.com/xmidt-org/wrp-go/v3 v3.2.3 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1 diff --git a/go.sum b/go.sum index 8c18465..fee356a 100644 --- a/go.sum +++ b/go.sum @@ -1712,8 +1712,8 @@ github.com/xmidt-org/touchstone v0.1.5 h1:Afm3P0EzCOWD1ITyVLsEDPVQkSE0t2ZhHyV+kO github.com/xmidt-org/touchstone v0.1.5/go.mod h1:Dz0fA1eWjm/2WrsdEeaQZMevkmfdYTsAbQfLaTrB8Eo= github.com/xmidt-org/urlegit v0.1.12 h1:qlwTgELD2ufKKH4vuioG/BWZ3293Cbx1f1viMDMaLV0= github.com/xmidt-org/urlegit v0.1.12/go.mod h1:wEEFUdBOEK3bQNb5LHLMfGnTtGn8WwEKgFPk8p6lhIM= -github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 h1:2Q7f7IVLXCckvmDjSSVEvUXcpHX1gfGkF9xg3K/X87c= -github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA= +github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd h1:AMGhtnmbuqzJVRW1uyxKRVRu9rEGJcUDReiRE+R835M= +github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA= github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI= github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8= diff --git a/internal/sink/sink.go b/internal/sink/sink.go index f3ebf60..d3fb530 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -5,7 +5,6 @@ package sink import ( "bytes" "container/ring" - "context" "crypto/hmac" "crypto/sha1" "encoding/hex" @@ -13,10 +12,8 @@ import ( "fmt" "io" "math/rand" - "net" "net/http" "net/url" - "sort" "strconv" "strings" "sync" @@ -45,8 +42,8 @@ type WebhookV1 struct { } type WebhookV2 struct { - urls *ring.Ring - dialers []*Dialer + urls *ring.Ring + client *http.Client CommonWebhook //TODO: need to determine best way to add client and client middleware to WebhooV1 // clientMiddleware func(http.Client) http.Client @@ -77,13 +74,19 @@ type Kafka struct { CommonWebhook } -func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { +func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error) { switch l := listener.(type) { case *ancla.RegistryV1: v1 := &WebhookV1{} v1.Update(c, logger, l.Registration.Config.AlternativeURLs, l.GetId(), l.Registration.FailureURL, l.Registration.Config.ReceiverURL) - return v1 + return v1, nil case *ancla.RegistryV2: + if len(l.Registration.Kafkas) == 0 && len(l.Registration.Webhooks) == 0 { + return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used") + } else if len(l.Registration.Kafkas) > 0 && len(l.Registration.Webhooks) > 0 { + return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used but not both") + } + if len(l.Registration.Webhooks) > 0 { var whs WebhookSink r := &HashRing{} @@ -97,33 +100,37 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { deliveryInterval: c.DeliveryInterval, deliveryRetries: c.DeliveryRetries, }, + client: &http.Client{}, } + if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 { + return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used") + } else if len(wh.ReceiverURLs) > 0 && len(wh.DNSSrvRecord.FQDNs) > 0 { + return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used but not both") + } + if len(wh.ReceiverURLs) > 0 { urlCount, err := getUrls(wh.ReceiverURLs[1:]) if err != nil { - v2.logger.Error("error recevied parsing urls", zap.Error(err)) + return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) } v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:]) - } else { - sortBy := wh.DNSSrvRecord.LoadBalancingScheme - for _, domain := range wh.DNSSrvRecord.FQDNs { - dialer := new(Dialer) - err := dialer.lookup(domain, sortBy) - if err != nil { - v2.logger.Error("error received looking up service records", zap.Error(err)) - } - dialer.NewClient() - v2.dialers = append(v2.dialers, dialer) - } } + + transport, err := NewSRVRecordDailer(wh.DNSSrvRecord) + if err != nil { + return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) + } + + v2.client.Transport = transport whs.webooks[strconv.Itoa(i)] = v2 if l.Registration.Hash.Field != "" { r.Add(strconv.Itoa(i)) } } - return whs + return whs, nil } + if len(l.Registration.Kafkas) > 0 { var sink KafkaSink r := &HashRing{} @@ -132,7 +139,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { kafka := &Kafka{} err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable if err != nil { - return nil + return nil, err } sink.Kafkas[strconv.Itoa(i)] = kafka if l.Registration.Hash.Field != "" { @@ -140,13 +147,14 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { } } + sink.Hash = r - return sink + + return sink, nil } - default: - return nil } - return nil + + return nil, fmt.Errorf("unknow webhook registry type") } func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { @@ -752,11 +760,8 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error { v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) var resp *http.Response - if len(v2.dialers) != 0 { - for _, d := range v2.dialers { - resp, err = d.client.Do(req) - } - + if v2.client != nil { + resp, err = v2.client.Do(req) } else { client, _ := retryhttp.NewClient( // retryhttp.WithHTTPClient(s.clientMiddleware(s.client)), @@ -842,44 +847,3 @@ func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAtte } } - -type Dialer struct { - srvs []*net.SRV - client *http.Client -} - -type DialContext func(ctx context.Context, network, address string) (net.Conn, error) - -func (d *Dialer) CustomDial() DialContext { - return func(ctx context.Context, network, address string) (net.Conn, error) { - return net.Dial(network, address) - } -} - -func (d *Dialer) NewClient() { - d.client = &http.Client{ - Transport: &http.Transport{ - DialContext: d.CustomDial(), - }, - } -} - -func (d *Dialer) lookup(domain, sortBy string) error { - _, addrs, err := net.LookupSRV("", "", domain) - if err != nil { - return err - } - - if sortBy == "weight" { - sort.Slice(addrs, func(i, j int) bool { - return addrs[i].Weight > addrs[j].Weight - }) - } else { - sort.Slice(addrs, func(i, j int) bool { - return addrs[i].Priority < addrs[j].Priority - }) - } - - d.srvs = addrs - return nil -} diff --git a/internal/sink/sinkSender.go b/internal/sink/sinkSender.go index cb1d397..4f62584 100644 --- a/internal/sink/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -135,8 +135,12 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) { func (s *Sender) Update(l ancla.Register) (err error) { s.matcher, err = NewMatcher(l, s.logger) - s.sink = NewSink(s.config, s.logger, l) + sink, err := NewSink(s.config, s.logger, l) + if err != nil { + return err + } + s.sink = sink s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix())) s.mutex.Lock() diff --git a/internal/sink/srvRecordDailer.go b/internal/sink/srvRecordDailer.go new file mode 100644 index 0000000..0ded74f --- /dev/null +++ b/internal/sink/srvRecordDailer.go @@ -0,0 +1,80 @@ +package sink + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "sort" + + "github.com/xmidt-org/webhook-schema" +) + +func NewSRVRecordDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) { + if len(dnsSrvRecord.FQDNs) == 0 { + return http.DefaultTransport, nil + } + + d := SRVRecordDialer{dnsSrvRecord: dnsSrvRecord} + + var errs error + for _, fqdn := range d.dnsSrvRecord.FQDNs { + _, addrs, err := net.LookupSRV("", "", fqdn) + if err != nil { + errs = errors.Join(errs, + fmt.Errorf("srv lookup failure: `%s`", fqdn), + err, + ) + continue + } + + d.srvs = append(d.srvs, addrs...) + } + + // TODO: ask wes/john whether 1 or more net.LookupSRV error should trigger an error from NewSRVRecordDailer + if len(d.srvs) == 0 { + return nil, errors.Join(fmt.Errorf("expected atleast 1 srv record from fqdn list `%v`", d.dnsSrvRecord.FQDNs), errs) + } + + switch d.dnsSrvRecord.LoadBalancingScheme { + case "weight": + sort.Slice(d.srvs, func(i, j int) bool { + return d.srvs[i].Weight > d.srvs[j].Weight + }) + case "priortiy": + sort.Slice(d.srvs, func(i, j int) bool { + return d.srvs[i].Priority < d.srvs[j].Priority + }) + default: + return nil, fmt.Errorf("unknwon loadBalancingScheme type: %s", d.dnsSrvRecord.LoadBalancingScheme) + } + + return &http.Transport{ + DialContext: (&d).DialContext, + }, nil + +} + +type SRVRecordDialer struct { + srvs []*net.SRV + dnsSrvRecord webhook.DNSSrvRecord +} + +func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) { + var errs error + for _, addr := range d.srvs { + host := net.JoinHostPort(addr.Target, string(addr.Port)) + conn, err := net.Dial("", host) + if err != nil { + errs = errors.Join(errs, + fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`", + err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs)) + continue + } + + return conn, nil + } + + return nil, errs +}