Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added logic for the v2 webhooks including srv lookup and hashing #587

Open
wants to merge 6 commits into
base: denopink/feat/rewrite
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/xmidt-org/retry v0.0.3
github.com/xmidt-org/sallust v0.2.2
github.com/xmidt-org/touchstone v0.1.5
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1712,8 +1712,8 @@ github.com/xmidt-org/touchstone v0.1.5 h1:Afm3P0EzCOWD1ITyVLsEDPVQkSE0t2ZhHyV+kO
github.com/xmidt-org/touchstone v0.1.5/go.mod h1:Dz0fA1eWjm/2WrsdEeaQZMevkmfdYTsAbQfLaTrB8Eo=
github.com/xmidt-org/urlegit v0.1.12 h1:qlwTgELD2ufKKH4vuioG/BWZ3293Cbx1f1viMDMaLV0=
github.com/xmidt-org/urlegit v0.1.12/go.mod h1:wEEFUdBOEK3bQNb5LHLMfGnTtGn8WwEKgFPk8p6lhIM=
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9 h1:2Q7f7IVLXCckvmDjSSVEvUXcpHX1gfGkF9xg3K/X87c=
github.com/xmidt-org/webhook-schema v0.1.1-0.20240718124820-b8c1ba1f19a9/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA=
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd h1:AMGhtnmbuqzJVRW1uyxKRVRu9rEGJcUDReiRE+R835M=
github.com/xmidt-org/webhook-schema v0.1.1-0.20241216191431-5b48a48d53cd/go.mod h1:CskwqDhcXKRHeCvw9qnu/+v4d+gkzzX4WfG7GCtMPFA=
github.com/xmidt-org/webpa-common v1.1.0/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.3.2/go.mod h1:oCpKzOC+9h2vYHVzAU/06tDTQuBN4RZz+rhgIXptpOI=
github.com/xmidt-org/webpa-common v1.10.2-0.20200604164000-f07406b4eb63/go.mod h1:Fmt3wIxBzwJY0KeRHX6RaLZx2xpKTbXCLEA3Xtd6kq8=
Expand Down
297 changes: 269 additions & 28 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,22 @@ type WebhookV1 struct {
urls *ring.Ring
CommonWebhook
//TODO: need to determine best way to add client and client middleware to WebhooV1
// client http.Client
// client http.Client
// clientMiddleware func(http.Client) http.Client
}

type Webhooks []*WebhookV1
type WebhookV2 struct {
urls *ring.Ring
client *http.Client
CommonWebhook
//TODO: need to determine best way to add client and client middleware to WebhooV1
// clientMiddleware func(http.Client) http.Client
}
type WebhookSink struct {
webooks map[string]*WebhookV2
Hash *HashRing
HashField string
}
type CommonWebhook struct {
id string
failureUrl string
Expand All @@ -63,45 +74,87 @@ type Kafka struct {
CommonWebhook
}

func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error) {
switch l := listener.(type) {
case *ancla.RegistryV1:
v1 := &WebhookV1{}
v1.Update(c, logger, l.Registration.Config.AlternativeURLs, l.GetId(), l.Registration.FailureURL, l.Registration.Config.ReceiverURL)
return v1
return v1, nil
case *ancla.RegistryV2:
if len(l.Registration.Kafkas) == 0 && len(l.Registration.Webhooks) == 0 {
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used")
} else if len(l.Registration.Kafkas) > 0 && len(l.Registration.Webhooks) > 0 {
return nil, fmt.Errorf("either `Kafkas` or `Webhooks` must be used but not both")
}

if len(l.Registration.Webhooks) > 0 {
var whs Webhooks
for _, wh := range l.Registration.Webhooks {
v1 := &WebhookV1{}
v1.Update(c, logger, wh.ReceiverURLs[1:], l.GetId(), l.Registration.FailureURL, wh.ReceiverURLs[0])
whs = append(whs, v1)
var whs WebhookSink
r := &HashRing{}
whs.HashField = l.Registration.Hash.Field
for i, wh := range l.Registration.Webhooks {
v2 := &WebhookV2{
CommonWebhook: CommonWebhook{
id: l.GetId(),
logger: logger,
failureUrl: l.Registration.FailureURL,
deliveryInterval: c.DeliveryInterval,
deliveryRetries: c.DeliveryRetries,
},
client: &http.Client{}, //TODO: do we want expose configuration for this client via config?
}
if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used")
} else if len(wh.ReceiverURLs) > 0 && len(wh.DNSSrvRecord.FQDNs) > 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used but not both")
}

if len(wh.ReceiverURLs) > 0 {
urlCount, err := getUrls(wh.ReceiverURLs[1:])
if err != nil {
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}
v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:])
}

transport, err := NewSRVRecordDialer(wh.DNSSrvRecord)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}

v2.client.Transport = transport
whs.webooks[strconv.Itoa(i)] = v2
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))
}
}
return whs

return whs, nil
}

if len(l.Registration.Kafkas) > 0 {
var sink KafkaSink
r := &HashRing{}
sink.HashField = l.Registration.Hash.Field
for i, k := range l.Registration.Kafkas {
kafka := &Kafka{}
err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger)
err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger) //TODO: quickstart-events need to become variable/configurable
if err != nil {
return nil
return nil, err
}
sink.Kafkas[strconv.Itoa(i)] = kafka
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))

}
}

sink.Hash = r
return sink

return sink, nil
}
default:
return nil
}
return nil

return nil, fmt.Errorf("unknown webhook registry type")
}

func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) {
Expand Down Expand Up @@ -162,18 +215,6 @@ func getUrls(urls []string) (int, error) {

}

func (whs Webhooks) Send(secret, acceptType string, msg *wrp.Message) error {
var errs error

for _, wh := range whs {
err := wh.Send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
return errs
}

// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
Expand Down Expand Up @@ -585,3 +626,203 @@ func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) {
})
}
}

func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) {
v2.mutex.Lock()
defer v2.mutex.Unlock()

if urlCount == 0 {
v2.urls = ring.New(1)
v2.urls.Value = url
} else {
ring := ring.New(urlCount)
for i := 0; i < urlCount; i++ {
ring.Value = urls[i]
ring = ring.Next()
}
v2.urls = ring
}

// Randomize where we start so all the instances don't synchronize
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
offset := rand.Intn(v2.urls.Len())
for 0 < offset {
v2.urls = v2.urls.Next()
offset--
}
}

func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error {
var errs error
if len(*whs.Hash) == len(whs.webooks) {
//TODO: flush out the error handling for kafka
if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, v2 := range whs.webooks {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
}

// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
defer func() {
if r := recover(); nil != r {
// s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
v2.logger.Error("goroutine send() panicked", zap.String("id", v2.id), zap.Any("panic", r))
}
// s.workers.Release()
// s.currentWorkersGauge.Add(-1.0)
}()

//TODO: is there a reason we are setting it up like this?
payload := msg.Payload
body := payload
var payloadReader *bytes.Reader

// Use the internal content type unless the accept type is wrp
contentType := msg.ContentType
switch acceptType {
case "wrp", wrp.MimeTypeMsgpack, wrp.MimeTypeWrp:
// WTS - We should pass the original, raw WRP event instead of
// re-encoding it.
contentType = wrp.MimeTypeMsgpack
buffer := bytes.NewBuffer([]byte{})
encoder := wrp.NewEncoder(buffer, wrp.Msgpack)
encoder.Encode(msg)
body = buffer.Bytes()
}
payloadReader = bytes.NewReader(body)

req, err := http.NewRequest("POST", v2.urls.Value.(string), payloadReader)
if err != nil {
// Report drop
// s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}).Add(1.0)
v2.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, v2.urls.Value.(string)), zap.String("id", v2.id), zap.Error(err))
return err
}

req.Header.Set("Content-Type", contentType)

// Add x-Midt-* headers
wrphttp.AddMessageHeaders(req.Header, msg)

// Provide the old headers for now
req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:"))
req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID)

// Add the device id without the trailing service
id, _ := wrp.ParseDeviceID(msg.Source)
req.Header.Set("X-Webpa-Device-Id", string(id))
req.Header.Set("X-Webpa-Device-Name", string(id))

// Apply the secret

if secret != "" {
s := hmac.New(sha1.New, []byte(secret))
s.Write(body)
sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil)))
req.Header.Set("X-Webpa-Signature", sig)
}

// find the event "short name"
event := msg.FindEventStringSubMatch()

// Send it
v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

client, _ := retryhttp.NewClient(
retryhttp.WithHTTPClient(v2.client),
retryhttp.WithRunner(v2.addRunner(req, event)),
retryhttp.WithRequesters(v2.updateRequest(v2.urls)),
)
resp, err := client.Do(req)

var deliveryCounterLabels []string
code := metrics.MessageDroppedCode
reason := metrics.NoErrReason
logger := v2.logger
if err != nil {
// Report failure
//TODO: add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) to webhook metrics and remove from sink sender?
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).Add(1.0)
reason = metrics.GetDoErrReason(err)
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
}

logger = v2.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err))
deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event}
fmt.Print(deliveryCounterLabels)
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1)
logger.Error("Dropped Network Error", zap.Error(err))
return err
} else {
// Report Result
code = strconv.Itoa(resp.StatusCode)

// read until the response is complete before closing to allow
// connection reuse
if resp.Body != nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}

deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event}
}
fmt.Print(deliveryCounterLabels)

//TODO: do we add deliveryCounter to webhook metrics and remove from sink sender?
// v1.deliveryCounter.With(prometheus.Labels{deliveryCounterLabels}).Add(1.0)
logger.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String(metrics.CodeLabel, code), zap.String(metrics.UrlLabel, req.URL.String()))
return nil
}

func (v2 *WebhookV2) addRunner(request *http.Request, event string) retry.Runner[*http.Response] {
runner, _ := retry.NewRunner[*http.Response](
retry.WithPolicyFactory[*http.Response](retry.Config{
Interval: v2.deliveryInterval,
MaxRetries: v2.deliveryRetries,
}),
retry.WithOnAttempt[*http.Response](v2.onAttempt(request, event)),
)
return runner
}

func (v2 *WebhookV2) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request {
return func(request *http.Request) *http.Request {
urls = urls.Next()
tmp, err := url.Parse(urls.Value.(string))
if err != nil {
v2.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err))
//TODO: do we add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) metric to webhook and remove from sink sender?
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1)
}
request.URL = tmp
return request
}
}

func (v2 *WebhookV2) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] {

return func(attempt retry.Attempt[*http.Response]) {
if attempt.Retries > 0 {
fmt.Print(event)
// s.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0)
v2.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode))
}

}
}
6 changes: 5 additions & 1 deletion internal/sink/sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {

func (s *Sender) Update(l ancla.Register) (err error) {
s.matcher, err = NewMatcher(l, s.logger)
s.sink = NewSink(s.config, s.logger, l)
sink, err := NewSink(s.config, s.logger, l)
if err != nil {
return err
}

s.sink = sink
s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix()))

s.mutex.Lock()
Expand Down
Loading
Loading