From b8e37f1b1fca9f0ef837cee590ba9e00fec840d8 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Wed, 18 Dec 2024 12:05:23 -0500 Subject: [PATCH] updated v2.send to use retryhttp client --- internal/sink/sink.go | 83 +++++++------------ ...{srvRecordDailer.go => srvRecordDialer.go} | 4 +- 2 files changed, 33 insertions(+), 54 deletions(-) rename internal/sink/{srvRecordDailer.go => srvRecordDialer.go} (93%) diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 7e49c60..db236b5 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -100,7 +100,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error deliveryInterval: c.DeliveryInterval, deliveryRetries: c.DeliveryRetries, }, - client: &http.Client{}, + client: &http.Client{}, //TODO: do we want expose configuration for this client via config? } if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 { return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used") @@ -116,7 +116,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:]) } - transport, err := NewSRVRecordDailer(wh.DNSSrvRecord) + transport, err := NewSRVRecordDialer(wh.DNSSrvRecord) if err != nil { return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls")) } @@ -215,29 +215,6 @@ func getUrls(urls []string) (int, error) { } -func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error { - var errs error - if len(*whs.Hash) == len(whs.webooks) { - //TODO: flush out the error handling for kafka - if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok { - err := v2.send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) - } - } - } else { - //TODO: discuss with wes and john the default hashing logic - //for now: when no hash is given we will just loop through all the kafkas - for _, v2 := range whs.webooks { - err := v2.send(secret, acceptType, msg) - if err != nil { - errs = errors.Join(errs, err) - } - } - } - return errs -} - // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error { @@ -650,22 +627,6 @@ func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) { } } -func (v2 *WebhookV2) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) { - //TODO: do we need to return an error if not - we should get rid of the error return - v2.id = id - v2.failureUrl = failureUrl - v2.deliveryInterval = c.DeliveryInterval - v2.deliveryRetries = c.DeliveryRetries - v2.logger = l - - urlCount, err := getUrls(altUrls) - if err != nil { - l.Error("error recevied parsing urls", zap.Error(err)) - } - v2.updateUrls(urlCount, receiverUrl, altUrls) - -} - func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) { v2.mutex.Lock() defer v2.mutex.Unlock() @@ -691,6 +652,29 @@ func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) { } } +func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error { + var errs error + if len(*whs.Hash) == len(whs.webooks) { + //TODO: flush out the error handling for kafka + if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok { + err := v2.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + } else { + //TODO: discuss with wes and john the default hashing logic + //for now: when no hash is given we will just loop through all the kafkas + for _, v2 := range whs.webooks { + err := v2.send(secret, acceptType, msg) + if err != nil { + errs = errors.Join(errs, err) + } + } + } + return errs +} + // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error { @@ -759,17 +743,12 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error { // Send it v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) - var resp *http.Response - if v2.client != nil { - resp, err = v2.client.Do(req) - } else { - client, _ := retryhttp.NewClient( - // retryhttp.WithHTTPClient(s.clientMiddleware(s.client)), - retryhttp.WithRunner(v2.addRunner(req, event)), - retryhttp.WithRequesters(v2.updateRequest(v2.urls)), - ) - resp, err = client.Do(req) - } + client, _ := retryhttp.NewClient( + retryhttp.WithHTTPClient(v2.client), + retryhttp.WithRunner(v2.addRunner(req, event)), + retryhttp.WithRequesters(v2.updateRequest(v2.urls)), + ) + resp, err := client.Do(req) var deliveryCounterLabels []string code := metrics.MessageDroppedCode diff --git a/internal/sink/srvRecordDailer.go b/internal/sink/srvRecordDialer.go similarity index 93% rename from internal/sink/srvRecordDailer.go rename to internal/sink/srvRecordDialer.go index 89f6c86..97098fd 100644 --- a/internal/sink/srvRecordDailer.go +++ b/internal/sink/srvRecordDialer.go @@ -11,7 +11,7 @@ import ( "github.com/xmidt-org/webhook-schema" ) -func NewSRVRecordDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) { +func NewSRVRecordDialer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) { if len(dnsSrvRecord.FQDNs) == 0 { return http.DefaultTransport, nil } @@ -68,7 +68,7 @@ func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Con conn, err := net.Dial("", host) if err != nil { errs = errors.Join(errs, - fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`", + fmt.Errorf("%v: host `%s` [weight: %d, priortiy: %d] from srv record `%v`", err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs)) continue }