Skip to content

Commit

Permalink
updated the caduceus outbound sender
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Jan 23, 2024
1 parent dfaf61d commit e19e6f6
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 205 deletions.
8 changes: 0 additions & 8 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"

"github.com/xmidt-org/wrp-go/v3"
)

Expand Down Expand Up @@ -38,12 +36,6 @@ type SenderConfig struct {
DisablePartnerIDs bool
}

type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}
Expand Down
2 changes: 1 addition & 1 deletion http.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
}
eventType = msg.FindEventStringSubMatch()

// sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))
sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))

// return a 202
response.WriteHeader(http.StatusAccepted)
Expand Down
12 changes: 7 additions & 5 deletions httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"time"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand All @@ -33,19 +33,21 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) {

type metricWrapper struct {
now func() time.Time
queryLatency metrics.Histogram
queryLatency prometheus.HistogramVec
id string
}

func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*metricWrapper, error) {
func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) {
if now == nil {
now = time.Now
}
if queryLatency == nil {
if queryLatency.MetricVec == nil {
return nil, errNilHistogram
}
return &metricWrapper{
now: now,
queryLatency: queryLatency,
id: id,
}, nil
}

Expand All @@ -62,7 +64,7 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient {

// find time difference, add to metric
var latency = endTime.Sub(startTime)
m.queryLatency.With("code", code).Observe(latency.Seconds())
m.queryLatency.With(prometheus.Labels{UrlLabel: m.id, CodeLabel: code}).Observe(latency.Seconds())

return resp, err
})
Expand Down
56 changes: 56 additions & 0 deletions listenerStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import "time"

//This is a stub for the ancla listener. This will be removed once we can add ancla back into caduceus

type ListenerStub struct {
PartnerIds []string
Webhook Webhook
}

type Webhook struct {
// Address is the subscription request origin HTTP Address.
Address string `json:"registered_from_address"`

// Config contains data to inform how events are delivered.
Config DeliveryConfig `json:"config"`

// FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow.
// Optional, set to "" to disable notifications.
FailureURL string `json:"failure_url"`

// Events is the list of regular expressions to match an event type against.
Events []string `json:"events"`

// Matcher type contains values to match against the metadata.
Matcher MetadataMatcherConfig `json:"matcher,omitempty"`

// Duration describes how long the subscription lasts once added.
Duration time.Duration `json:"duration"`

// Until describes the time this subscription expires.
Until time.Time `json:"until"`
}

// DeliveryConfig is a Webhook substructure with data related to event delivery.
type DeliveryConfig struct {
// URL is the HTTP URL to deliver messages to.
URL string `json:"url"`

// ContentType is content type value to set WRP messages to (unless already specified in the WRP).
ContentType string `json:"content_type"`

// Secret is the string value for the SHA1 HMAC.
// (Optional, set to "" to disable behavior).
Secret string `json:"secret,omitempty"`

// AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL.
AlternativeURLs []string `json:"alt_urls,omitempty"`
}

// MetadataMatcherConfig is Webhook substructure with config to match event metadata.
type MetadataMatcherConfig struct {
// DeviceID is the list of regular expressions to match device id type against.
DeviceID []string `json:"device_id"`
}
69 changes: 45 additions & 24 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package main

import (
"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"

Expand Down Expand Up @@ -49,29 +48,22 @@ const (
CodeLabel = "code"
)

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}

func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram {
return m.NewHistogram(QueryDurationHistogram, 11)
type SenderMetricsIn struct {
fx.In
QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"`
EventType prometheus.CounterVec `name:"incoming_event_type_count"`
DeliveryCounter prometheus.CounterVec `name:"delivery_count"`
DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"`
DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"`
CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"`
SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"`
DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"`
ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"`
ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"`
ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"`
ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"`
OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"`
ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"`
}

// TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called
Expand Down Expand Up @@ -161,3 +153,32 @@ func ProvideMetrics() fx.Option {
}, EventLabel),
)
}

func ProvideSenderMetrics() fx.Option {
return fx.Provide(
func(in SenderMetricsIn) (SenderWrapperMetrics, OutboundSenderMetrics) {
outbounderMetrics := OutboundSenderMetrics{
DeliveryCounter: in.DeliveryCounter,
DeliveryRetryCounter: in.DeliveryRetryCounter,
DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge,
CutOffCounter: in.CutOffCounter,
SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter,
DropsDueToPanic: in.DropsDueToPanic,
ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge,
ConsumerDropUntilGauge: in.ConsumerDropUntilGauge,
ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge,
ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge,
OutgoingQueueDepth: in.OutgoingQueueDepth,
ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge,
}
wrapperMetrics := SenderWrapperMetrics{
QueryLatency: in.QueryLatency,
EventType: in.EventType,
}

return wrapperMetrics, outbounderMetrics
},
)
}


Loading

0 comments on commit e19e6f6

Please sign in to comment.