Skip to content

Commit

Permalink
chore: feedback for @maurafortino 's #587 PR
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed Dec 16, 2024
1 parent a8ae0ac commit b3a7a34
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 75 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/xmidt-org/retry v0.0.3
github.com/xmidt-org/sallust v0.2.2
github.com/xmidt-org/touchstone v0.1.5
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1712,8 +1712,8 @@ github.com/xmidt-org/touchstone v0.1.5 h1:Afm3P0EzCOWD1ITyVLsEDPVQkSE0t2ZhHyV+kO
github.com/xmidt-org/touchstone v0.1.5/go.mod h1:Dz0fA1eWjm/2WrsdEeaQZMevkmfdYTsAbQfLaTrB8Eo=
github.com/xmidt-org/urlegit v0.1.12 h1:qlwTgELD2ufKKH4vuioG/BWZ3293Cbx1f1viMDMaLV0=
github.com/xmidt-org/urlegit v0.1.12/go.mod h1:wEEFUdBOEK3bQNb5LHLMfGnTtGn8WwEKgFPk8p6lhIM=
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 h1:2Q7f7IVLXCckvmDjSSVEvUXcpHX1gfGkF9xg3K/X87c=
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA=
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd h1:AMGhtnmbuqzJVRW1uyxKRVRu9rEGJcUDReiRE+R835M=
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA=
github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8=
Expand Down
106 changes: 35 additions & 71 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@ package sink
import (
"bytes"
"container/ring"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -45,8 +42,8 @@ type WebhookV1 struct {
}

type WebhookV2 struct {
urls *ring.Ring
dialers []*Dialer
urls *ring.Ring
client *http.Client
CommonWebhook
//TODO: need to determine best way to add client and client middleware to WebhooV1
// clientMiddleware func(http.Client) http.Client
Expand Down Expand Up @@ -77,13 +74,19 @@ type Kafka struct {
CommonWebhook
}

func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error) {
switch l := listener.(type) {
case *ancla.RegistryV1:
v1 := &WebhookV1{}
v1.Update(c, logger, l.Registration.Config.AlternativeURLs, l.GetId(), l.Registration.FailureURL, l.Registration.Config.ReceiverURL)
return v1
return v1, nil
case *ancla.RegistryV2:
if len(l.Registration.Kafkas) == 0 && len(l.Registration.Webhooks) == 0 {
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used")
} else if len(l.Registration.Kafkas) > 0 && len(l.Registration.Webhooks) > 0 {
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used but not both")
}

if len(l.Registration.Webhooks) > 0 {
var whs WebhookSink
r := &HashRing{}
Expand All @@ -97,33 +100,37 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
deliveryInterval: c.DeliveryInterval,
deliveryRetries: c.DeliveryRetries,
},
client: &http.Client{},
}
if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used")
} else if len(wh.ReceiverURLs) > 0 && len(wh.DNSSrvRecord.FQDNs) > 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used but not both")
}

if len(wh.ReceiverURLs) > 0 {
urlCount, err := getUrls(wh.ReceiverURLs[1:])
if err != nil {
v2.logger.Error("error recevied parsing urls", zap.Error(err))
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}
v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:])
} else {
sortBy := wh.DNSSrvRecord.LoadBalancingScheme
for _, domain := range wh.DNSSrvRecord.FQDNs {
dialer := new(Dialer)
err := dialer.lookup(domain, sortBy)
if err != nil {
v2.logger.Error("error received looking up service records", zap.Error(err))
}
dialer.NewClient()
v2.dialers = append(v2.dialers, dialer)
}
}

transport, err := NewSRVDailer(wh.DNSSrvRecord)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}

v2.client.Transport = transport
whs.webooks[strconv.Itoa(i)] = v2
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))
}
}

return whs
return whs, nil
}

if len(l.Registration.Kafkas) > 0 {
var sink KafkaSink
r := &HashRing{}
Expand All @@ -132,21 +139,22 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
kafka := &Kafka{}
err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable
if err != nil {
return nil
return nil, err
}
sink.Kafkas[strconv.Itoa(i)] = kafka
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))

}
}

sink.Hash = r
return sink

return sink, nil
}
default:
return nil
}
return nil

return nil, fmt.Errorf("unknow webhook registry type")
}

func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) {
Expand Down Expand Up @@ -752,11 +760,8 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

var resp *http.Response
if len(v2.dialers) != 0 {
for _, d := range v2.dialers {
resp, err = d.client.Do(req)
}

if v2.client != nil {
resp, err = v2.client.Do(req)
} else {
client, _ := retryhttp.NewClient(
// retryhttp.WithHTTPClient(s.clientMiddleware(s.client)),
Expand Down Expand Up @@ -842,44 +847,3 @@ func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAtte

}
}

type Dialer struct {
srvs []*net.SRV
client *http.Client
}

type DialContext func(ctx context.Context, network, address string) (net.Conn, error)

func (d *Dialer) CustomDial() DialContext {
return func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial(network, address)
}
}

func (d *Dialer) NewClient() {
d.client = &http.Client{
Transport: &http.Transport{
DialContext: d.CustomDial(),
},
}
}

func (d *Dialer) lookup(domain, sortBy string) error {
_, addrs, err := net.LookupSRV("", "", domain)
if err != nil {
return err
}

if sortBy == "weight" {
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].Weight > addrs[j].Weight
})
} else {
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].Priority < addrs[j].Priority
})
}

d.srvs = addrs
return nil
}
6 changes: 5 additions & 1 deletion internal/sink/sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {

func (s *Sender) Update(l ancla.Register) (err error) {
s.matcher, err = NewMatcher(l, s.logger)
s.sink = NewSink(s.config, s.logger, l)
sink, err := NewSink(s.config, s.logger, l)
if err != nil {
return err
}

s.sink = sink
s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix()))

s.mutex.Lock()
Expand Down
80 changes: 80 additions & 0 deletions internal/sink/srvDailer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package sink

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sort"

"github.com/xmidt-org/webhook-schema"
)

func NewSRVDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) {
if len(dnsSrvRecord.FQDNs) == 0 {
return http.DefaultTransport, nil
}

d := SRVDialer{dnsSrvRecord: dnsSrvRecord}

var errs error
for _, fqdn := range d.dnsSrvRecord.FQDNs {
_, addrs, err := net.LookupSRV("", "", fqdn)
if err != nil {
errs = errors.Join(errs,
fmt.Errorf("srv lookup failure: `%s`", fqdn),
err,
)
continue
}

d.srvs = append(d.srvs, addrs...)
}

// TODO: ask wes/john whether 1 or more net.LookupSRV error should trigger an error from NewSRVDailer
if len(d.srvs) == 0 {
return nil, errors.Join(fmt.Errorf("expected atleast 1 srv record from fqdn list `%v`", d.dnsSrvRecord.FQDNs), errs)
}

switch d.dnsSrvRecord.LoadBalancingScheme {
case "weight":
sort.Slice(d.srvs, func(i, j int) bool {
return d.srvs[i].Weight > d.srvs[j].Weight
})
case "priortiy":
sort.Slice(d.srvs, func(i, j int) bool {
return d.srvs[i].Priority < d.srvs[j].Priority
})
default:
return nil, fmt.Errorf("unknwon loadBalancingScheme type: %s", d.dnsSrvRecord.LoadBalancingScheme)
}

return &http.Transport{
DialContext: (&d).DialContext,
}, nil

}

type SRVDialer struct {
srvs []*net.SRV
dnsSrvRecord webhook.DNSSrvRecord
}

func (d *SRVDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
var errs error
for _, addr := range d.srvs {
host := net.JoinHostPort(addr.Target, string(addr.Port))

Check failure on line 67 in internal/sink/srvDailer.go

View workflow job for this annotation

GitHub Actions / ci / Go Unit Tests

conversion from uint16 to string yields a string of one rune, not a string of digits
conn, err := net.Dial("", host)
if err != nil {
errs = errors.Join(errs,
fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`",

Check failure on line 71 in internal/sink/srvDailer.go

View workflow job for this annotation

GitHub Actions / ci / Go Unit Tests

fmt.Errorf format %s has arg addr.Weight of wrong type uint16
err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs))
continue
}

return conn, nil
}

return nil, errs
}

0 comments on commit b3a7a34

Please sign in to comment.