Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retryhttp client #592

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 31 additions & 52 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error
deliveryInterval: c.DeliveryInterval,
deliveryRetries: c.DeliveryRetries,
},
client: &http.Client{},
client: &http.Client{}, //TODO: do we want expose configuration for this client via config?
}
if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used")
Expand All @@ -116,7 +116,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error
v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:])
}

transport, err := NewSRVRecordDailer(wh.DNSSrvRecord)
transport, err := NewSRVRecordDialer(wh.DNSSrvRecord)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}
Expand Down Expand Up @@ -215,29 +215,6 @@ func getUrls(urls []string) (int, error) {

}

func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this down toward the v2 logic so that they are all together

var errs error
if len(*whs.Hash) == len(whs.webooks) {
//TODO: flush out the error handling for kafka
if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, v2 := range whs.webooks {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
}

// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
Expand Down Expand Up @@ -650,22 +627,6 @@ func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) {
}
}

func (v2 *WebhookV2) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function wasn't being used

//TODO: do we need to return an error if not - we should get rid of the error return
v2.id = id
v2.failureUrl = failureUrl
v2.deliveryInterval = c.DeliveryInterval
v2.deliveryRetries = c.DeliveryRetries
v2.logger = l

urlCount, err := getUrls(altUrls)
if err != nil {
l.Error("error recevied parsing urls", zap.Error(err))
}
v2.updateUrls(urlCount, receiverUrl, altUrls)

}

func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) {
v2.mutex.Lock()
defer v2.mutex.Unlock()
Expand All @@ -691,6 +652,29 @@ func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) {
}
}

func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error {
var errs error
if len(*whs.Hash) == len(whs.webooks) {
//TODO: flush out the error handling for kafka
if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, v2 := range whs.webooks {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
}

// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
Expand Down Expand Up @@ -759,17 +743,12 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
// Send it
v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

var resp *http.Response
if v2.client != nil {
resp, err = v2.client.Do(req)
} else {
client, _ := retryhttp.NewClient(
// retryhttp.WithHTTPClient(s.clientMiddleware(s.client)),
retryhttp.WithRunner(v2.addRunner(req, event)),
retryhttp.WithRequesters(v2.updateRequest(v2.urls)),
)
resp, err = client.Do(req)
}
client, _ := retryhttp.NewClient(
retryhttp.WithHTTPClient(v2.client),
retryhttp.WithRunner(v2.addRunner(req, event)),
retryhttp.WithRequesters(v2.updateRequest(v2.urls)),
)
resp, err := client.Do(req)

var deliveryCounterLabels []string
code := metrics.MessageDroppedCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"github.com/xmidt-org/webhook-schema"
)

func NewSRVRecordDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) {
func NewSRVRecordDialer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) {
if len(dnsSrvRecord.FQDNs) == 0 {
return http.DefaultTransport, nil
}
Expand Down Expand Up @@ -64,11 +64,11 @@
func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
var errs error
for _, addr := range d.srvs {
host := net.JoinHostPort(addr.Target, string(addr.Port))

Check failure on line 67 in internal/sink/srvRecordDialer.go

View workflow job for this annotation

GitHub Actions / ci / Go Unit Tests

conversion from uint16 to string yields a string of one rune, not a string of digits
conn, err := net.Dial("", host)
if err != nil {
errs = errors.Join(errs,
fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`",
fmt.Errorf("%v: host `%s` [weight: %d, priortiy: %d] from srv record `%v`",
err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs))
continue
}
Expand Down
Loading