Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: feedback for @maurafortino 's #587 PR #591

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/xmidt-org/retry v0.0.3
github.com/xmidt-org/sallust v0.2.2
github.com/xmidt-org/touchstone v0.1.5
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1712,8 +1712,8 @@ github.com/xmidt-org/touchstone v0.1.5 h1:Afm3P0EzCOWD1ITyVLsEDPVQkSE0t2ZhHyV+kO
github.com/xmidt-org/touchstone v0.1.5/go.mod h1:Dz0fA1eWjm/2WrsdEeaQZMevkmfdYTsAbQfLaTrB8Eo=
github.com/xmidt-org/urlegit v0.1.12 h1:qlwTgELD2ufKKH4vuioG/BWZ3293Cbx1f1viMDMaLV0=
github.com/xmidt-org/urlegit v0.1.12/go.mod h1:wEEFUdBOEK3bQNb5LHLMfGnTtGn8WwEKgFPk8p6lhIM=
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 h1:2Q7f7IVLXCckvmDjSSVEvUXcpHX1gfGkF9xg3K/X87c=
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA=
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd h1:AMGhtnmbuqzJVRW1uyxKRVRu9rEGJcUDReiRE+R835M=
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA=
github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8=
Expand Down
106 changes: 35 additions & 71 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@ package sink
import (
"bytes"
"container/ring"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -45,8 +42,8 @@ type WebhookV1 struct {
}

type WebhookV2 struct {
urls *ring.Ring
dialers []*Dialer
urls *ring.Ring
client *http.Client
CommonWebhook
//TODO: need to determine best way to add client and client middleware to WebhooV1
// clientMiddleware func(http.Client) http.Client
Expand Down Expand Up @@ -77,13 +74,19 @@ type Kafka struct {
CommonWebhook
}

func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error) {
switch l := listener.(type) {
case *ancla.RegistryV1:
v1 := &WebhookV1{}
v1.Update(c, logger, l.Registration.Config.AlternativeURLs, l.GetId(), l.Registration.FailureURL, l.Registration.Config.ReceiverURL)
return v1
return v1, nil
case *ancla.RegistryV2:
if len(l.Registration.Kafkas) == 0 && len(l.Registration.Webhooks) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to add this to the validation portion of web hook-schema? just thinking about ways to try and clean up the this switch statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think.

I suggest leaving it here until we successfully move it cleanly

return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used")
} else if len(l.Registration.Kafkas) > 0 && len(l.Registration.Webhooks) > 0 {
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used but not both")
}

if len(l.Registration.Webhooks) > 0 {
var whs WebhookSink
r := &HashRing{}
Expand All @@ -97,33 +100,37 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
deliveryInterval: c.DeliveryInterval,
deliveryRetries: c.DeliveryRetries,
},
client: &http.Client{},
}
if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with this?

return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used")
} else if len(wh.ReceiverURLs) > 0 && len(wh.DNSSrvRecord.FQDNs) > 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used but not both")
}

if len(wh.ReceiverURLs) > 0 {
urlCount, err := getUrls(wh.ReceiverURLs[1:])
if err != nil {
v2.logger.Error("error recevied parsing urls", zap.Error(err))
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}
v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:])
} else {
sortBy := wh.DNSSrvRecord.LoadBalancingScheme
for _, domain := range wh.DNSSrvRecord.FQDNs {
dialer := new(Dialer)
err := dialer.lookup(domain, sortBy)
if err != nil {
v2.logger.Error("error received looking up service records", zap.Error(err))
}
dialer.NewClient()
v2.dialers = append(v2.dialers, dialer)
}
}

transport, err := NewSRVRecordDailer(wh.DNSSrvRecord)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it was already mentioned but just making a note so we remember to change this to NewSRVRecordDialer

if err != nil {
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}

v2.client.Transport = transport
whs.webooks[strconv.Itoa(i)] = v2
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))
}
}

return whs
return whs, nil
}

if len(l.Registration.Kafkas) > 0 {
var sink KafkaSink
r := &HashRing{}
Expand All @@ -132,21 +139,22 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
kafka := &Kafka{}
err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable
if err != nil {
return nil
return nil, err
}
sink.Kafkas[strconv.Itoa(i)] = kafka
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))

}
}

sink.Hash = r
return sink

return sink, nil
}
default:
return nil
}
return nil

return nil, fmt.Errorf("unknow webhook registry type")
Copy link
Contributor

@maurafortino maurafortino Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling change: unknown

}

func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) {
Expand Down Expand Up @@ -752,11 +760,8 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

var resp *http.Response
if len(v2.dialers) != 0 {
for _, d := range v2.dialers {
resp, err = d.client.Do(req)
}

if v2.client != nil {
resp, err = v2.client.Do(req)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want this else statement? are we not trying to have retry logic for the custom client that we have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's exactly what I want to do. i.e.: use the retry client with a given dailer (the dailer may be custom or be default).

Either of us can do that. If you're busy, I can knock that out tomorrow morning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i should be able to work on this

client, _ := retryhttp.NewClient(
// retryhttp.WithHTTPClient(s.clientMiddleware(s.client)),
Expand Down Expand Up @@ -842,44 +847,3 @@ func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAtte

}
}

type Dialer struct {
srvs []*net.SRV
client *http.Client
}

type DialContext func(ctx context.Context, network, address string) (net.Conn, error)

func (d *Dialer) CustomDial() DialContext {
return func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial(network, address)
}
}

func (d *Dialer) NewClient() {
d.client = &http.Client{
Transport: &http.Transport{
DialContext: d.CustomDial(),
},
}
}

func (d *Dialer) lookup(domain, sortBy string) error {
_, addrs, err := net.LookupSRV("", "", domain)
if err != nil {
return err
}

if sortBy == "weight" {
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].Weight > addrs[j].Weight
})
} else {
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].Priority < addrs[j].Priority
})
}

d.srvs = addrs
return nil
}
6 changes: 5 additions & 1 deletion internal/sink/sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {

func (s *Sender) Update(l ancla.Register) (err error) {
s.matcher, err = NewMatcher(l, s.logger)
s.sink = NewSink(s.config, s.logger, l)
sink, err := NewSink(s.config, s.logger, l)
if err != nil {
return err
}

s.sink = sink
s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix()))

s.mutex.Lock()
Expand Down
80 changes: 80 additions & 0 deletions internal/sink/srvRecordDailer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package sink

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sort"

"github.com/xmidt-org/webhook-schema"
)

func NewSRVRecordDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) {
if len(dnsSrvRecord.FQDNs) == 0 {
return http.DefaultTransport, nil
}

d := SRVRecordDialer{dnsSrvRecord: dnsSrvRecord}

var errs error
for _, fqdn := range d.dnsSrvRecord.FQDNs {
_, addrs, err := net.LookupSRV("", "", fqdn)
if err != nil {
errs = errors.Join(errs,
fmt.Errorf("srv lookup failure: `%s`", fqdn),
err,
)
continue
}

d.srvs = append(d.srvs, addrs...)
}

// TODO: ask wes/john whether 1 or more net.LookupSRV error should trigger an error from NewSRVRecordDailer
if len(d.srvs) == 0 {
return nil, errors.Join(fmt.Errorf("expected atleast 1 srv record from fqdn list `%v`", d.dnsSrvRecord.FQDNs), errs)
}

switch d.dnsSrvRecord.LoadBalancingScheme {
case "weight":
sort.Slice(d.srvs, func(i, j int) bool {
return d.srvs[i].Weight > d.srvs[j].Weight
})
case "priortiy":
sort.Slice(d.srvs, func(i, j int) bool {
return d.srvs[i].Priority < d.srvs[j].Priority
})
default:
return nil, fmt.Errorf("unknwon loadBalancingScheme type: %s", d.dnsSrvRecord.LoadBalancingScheme)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling change: unknown

}

return &http.Transport{
DialContext: (&d).DialContext,
}, nil

}

type SRVRecordDialer struct {
srvs []*net.SRV
dnsSrvRecord webhook.DNSSrvRecord
}

func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
var errs error
for _, addr := range d.srvs {
host := net.JoinHostPort(addr.Target, string(addr.Port))

Check failure on line 67 in internal/sink/srvRecordDailer.go

View workflow job for this annotation

GitHub Actions / ci / Go Unit Tests

conversion from uint16 to string yields a string of one rune, not a string of digits
conn, err := net.Dial("", host)
if err != nil {
errs = errors.Join(errs,
fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`",

Check failure on line 71 in internal/sink/srvRecordDailer.go

View workflow job for this annotation

GitHub Actions / ci / Go Unit Tests

fmt.Errorf format %s has arg addr.Weight of wrong type uint16
err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs))
continue
}

return conn, nil
}

return nil, errs
}
Loading