-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: feedback for @maurafortino 's #587 PR #591
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,18 +5,15 @@ package sink | |
import ( | ||
"bytes" | ||
"container/ring" | ||
"context" | ||
"crypto/hmac" | ||
"crypto/sha1" | ||
"encoding/hex" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math/rand" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"sort" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
|
@@ -45,8 +42,8 @@ type WebhookV1 struct { | |
} | ||
|
||
type WebhookV2 struct { | ||
urls *ring.Ring | ||
dialers []*Dialer | ||
urls *ring.Ring | ||
client *http.Client | ||
CommonWebhook | ||
//TODO: need to determine best way to add client and client middleware to WebhooV1 | ||
// clientMiddleware func(http.Client) http.Client | ||
|
@@ -77,13 +74,19 @@ type Kafka struct { | |
CommonWebhook | ||
} | ||
|
||
func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { | ||
func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error) { | ||
switch l := listener.(type) { | ||
case *ancla.RegistryV1: | ||
v1 := &WebhookV1{} | ||
v1.Update(c, logger, l.Registration.Config.AlternativeURLs, l.GetId(), l.Registration.FailureURL, l.Registration.Config.ReceiverURL) | ||
return v1 | ||
return v1, nil | ||
case *ancla.RegistryV2: | ||
if len(l.Registration.Kafkas) == 0 && len(l.Registration.Webhooks) == 0 { | ||
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used") | ||
} else if len(l.Registration.Kafkas) > 0 && len(l.Registration.Webhooks) > 0 { | ||
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used but not both") | ||
} | ||
|
||
if len(l.Registration.Webhooks) > 0 { | ||
var whs WebhookSink | ||
r := &HashRing{} | ||
|
@@ -97,33 +100,37 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { | |
deliveryInterval: c.DeliveryInterval, | ||
deliveryRetries: c.DeliveryRetries, | ||
}, | ||
client: &http.Client{}, | ||
} | ||
if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same with this? |
||
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used") | ||
} else if len(wh.ReceiverURLs) > 0 && len(wh.DNSSrvRecord.FQDNs) > 0 { | ||
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used but not both") | ||
} | ||
|
||
if len(wh.ReceiverURLs) > 0 { | ||
urlCount, err := getUrls(wh.ReceiverURLs[1:]) | ||
if err != nil { | ||
v2.logger.Error("error recevied parsing urls", zap.Error(err)) | ||
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) | ||
} | ||
v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:]) | ||
} else { | ||
sortBy := wh.DNSSrvRecord.LoadBalancingScheme | ||
for _, domain := range wh.DNSSrvRecord.FQDNs { | ||
dialer := new(Dialer) | ||
err := dialer.lookup(domain, sortBy) | ||
if err != nil { | ||
v2.logger.Error("error received looking up service records", zap.Error(err)) | ||
} | ||
dialer.NewClient() | ||
v2.dialers = append(v2.dialers, dialer) | ||
} | ||
} | ||
|
||
transport, err := NewSRVRecordDailer(wh.DNSSrvRecord) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it was already mentioned but just making a note so we remember to change this to NewSRVRecordDialer |
||
if err != nil { | ||
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) | ||
} | ||
|
||
v2.client.Transport = transport | ||
whs.webooks[strconv.Itoa(i)] = v2 | ||
if l.Registration.Hash.Field != "" { | ||
r.Add(strconv.Itoa(i)) | ||
} | ||
} | ||
|
||
return whs | ||
return whs, nil | ||
} | ||
|
||
if len(l.Registration.Kafkas) > 0 { | ||
var sink KafkaSink | ||
r := &HashRing{} | ||
|
@@ -132,21 +139,22 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { | |
kafka := &Kafka{} | ||
err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable | ||
if err != nil { | ||
return nil | ||
return nil, err | ||
} | ||
sink.Kafkas[strconv.Itoa(i)] = kafka | ||
if l.Registration.Hash.Field != "" { | ||
r.Add(strconv.Itoa(i)) | ||
|
||
} | ||
} | ||
|
||
sink.Hash = r | ||
return sink | ||
|
||
return sink, nil | ||
} | ||
default: | ||
return nil | ||
} | ||
return nil | ||
|
||
return nil, fmt.Errorf("unknown webhook registry type") | ||
} | ||
|
||
func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { | ||
|
@@ -752,11 +760,8 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error { | |
v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) | ||
|
||
var resp *http.Response | ||
if len(v2.dialers) != 0 { | ||
for _, d := range v2.dialers { | ||
resp, err = d.client.Do(req) | ||
} | ||
|
||
if v2.client != nil { | ||
resp, err = v2.client.Do(req) | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want this else statement? are we not trying to have retry logic for the custom client that we have? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, that's exactly what I want to do. i.e.: use the retry client with a given dailer (the dailer may be custom or be default). Either of us can do that. If you're busy, I can knock that out tomorrow morning. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i should be able to work on this |
||
client, _ := retryhttp.NewClient( | ||
// retryhttp.WithHTTPClient(s.clientMiddleware(s.client)), | ||
|
@@ -842,44 +847,3 @@ func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAtte | |
|
||
} | ||
} | ||
|
||
type Dialer struct { | ||
srvs []*net.SRV | ||
client *http.Client | ||
} | ||
|
||
type DialContext func(ctx context.Context, network, address string) (net.Conn, error) | ||
|
||
func (d *Dialer) CustomDial() DialContext { | ||
return func(ctx context.Context, network, address string) (net.Conn, error) { | ||
return net.Dial(network, address) | ||
} | ||
} | ||
|
||
func (d *Dialer) NewClient() { | ||
d.client = &http.Client{ | ||
Transport: &http.Transport{ | ||
DialContext: d.CustomDial(), | ||
}, | ||
} | ||
} | ||
|
||
func (d *Dialer) lookup(domain, sortBy string) error { | ||
_, addrs, err := net.LookupSRV("", "", domain) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if sortBy == "weight" { | ||
sort.Slice(addrs, func(i, j int) bool { | ||
return addrs[i].Weight > addrs[j].Weight | ||
}) | ||
} else { | ||
sort.Slice(addrs, func(i, j int) bool { | ||
return addrs[i].Priority < addrs[j].Priority | ||
}) | ||
} | ||
|
||
d.srvs = addrs | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package sink | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"net" | ||
"net/http" | ||
"sort" | ||
|
||
"github.com/xmidt-org/webhook-schema" | ||
) | ||
|
||
func NewSRVRecordDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) { | ||
if len(dnsSrvRecord.FQDNs) == 0 { | ||
return http.DefaultTransport, nil | ||
} | ||
|
||
d := SRVRecordDialer{dnsSrvRecord: dnsSrvRecord} | ||
|
||
var errs error | ||
for _, fqdn := range d.dnsSrvRecord.FQDNs { | ||
_, addrs, err := net.LookupSRV("", "", fqdn) | ||
if err != nil { | ||
errs = errors.Join(errs, | ||
fmt.Errorf("srv lookup failure: `%s`", fqdn), | ||
err, | ||
) | ||
continue | ||
} | ||
|
||
d.srvs = append(d.srvs, addrs...) | ||
} | ||
|
||
// TODO: ask wes/john whether 1 or more net.LookupSRV error should trigger an error from NewSRVRecordDailer | ||
if len(d.srvs) == 0 { | ||
return nil, errors.Join(fmt.Errorf("expected atleast 1 srv record from fqdn list `%v`", d.dnsSrvRecord.FQDNs), errs) | ||
} | ||
|
||
switch d.dnsSrvRecord.LoadBalancingScheme { | ||
case "weight": | ||
sort.Slice(d.srvs, func(i, j int) bool { | ||
return d.srvs[i].Weight > d.srvs[j].Weight | ||
}) | ||
case "priortiy": | ||
sort.Slice(d.srvs, func(i, j int) bool { | ||
return d.srvs[i].Priority < d.srvs[j].Priority | ||
}) | ||
default: | ||
return nil, fmt.Errorf("unknown loadBalancingScheme type: %s", d.dnsSrvRecord.LoadBalancingScheme) | ||
} | ||
|
||
return &http.Transport{ | ||
DialContext: (&d).DialContext, | ||
}, nil | ||
|
||
} | ||
|
||
type SRVRecordDialer struct { | ||
srvs []*net.SRV | ||
dnsSrvRecord webhook.DNSSrvRecord | ||
} | ||
|
||
func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) { | ||
var errs error | ||
for _, addr := range d.srvs { | ||
host := net.JoinHostPort(addr.Target, string(addr.Port)) | ||
conn, err := net.Dial("", host) | ||
if err != nil { | ||
errs = errors.Join(errs, | ||
fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`", | ||
err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs)) | ||
continue | ||
} | ||
|
||
return conn, nil | ||
} | ||
|
||
return nil, errs | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to add this to the validation portion of web hook-schema? just thinking about ways to try and clean up the this switch statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think.
I suggest leaving it here until we successfully move it cleanly