Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Senderwrapper #436

Merged
merged 7 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"

"github.com/xmidt-org/wrp-go/v3"
)

Expand Down Expand Up @@ -38,12 +36,6 @@ type SenderConfig struct {
DisablePartnerIDs bool
}

type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
emperror.dev/emperror v0.33.0
github.com/alecthomas/kong v0.8.1
github.com/go-chi/chi/v5 v5.0.10
github.com/go-kit/kit v0.13.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

github.com/gorilla/mux v1.8.1
github.com/goschtalt/goschtalt v0.22.1
github.com/goschtalt/yaml-decoder v0.0.1
Expand All @@ -28,6 +27,7 @@ require (
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
go.uber.org/fx v1.20.1
go.uber.org/zap v1.26.0
gopkg.in/dealancer/validate.v2 v2.1.0
Expand All @@ -44,6 +44,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.19.0/go.mod h1:7RDsakVbjb124lYDEjKuHTuzdqf04hLMEvPv/ufmqMs=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.38.0/go.mod h1:w6xNm+kC506KNs5cknSHal6dtdRnc4uema0uN9GSQc0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0/go.mod h1:pcQ3MM3SWvrA71U4GDqv9UFDJ3HQsW7y5ZO3tDTlUdI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/contrib/propagators v0.19.0/go.mod h1:4QOdZClXISU5S43xZxk5tYaWcpb+lehqfKtE6PK6msE=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
Expand Down
31 changes: 17 additions & 14 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

type ServerHandlerIn struct {
fx.In
Logger *zap.Logger
Telemetry *HandlerTelemetry
CaduceusSenderWrapper *CaduceusSenderWrapper
Logger *zap.Logger
Telemetry *HandlerTelemetry
}

type ServerHandlerOut struct {
Expand All @@ -32,8 +33,8 @@

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
log *zap.Logger
// caduceusHandler RequestHandler
log *zap.Logger
caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
maxOutstanding int64
Expand Down Expand Up @@ -138,7 +139,7 @@
}
eventType = msg.FindEventStringSubMatch()

// sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))
sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))

Check warning on line 142 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L142

Added line #L142 was not covered by tests

// return a 202
response.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -180,8 +181,8 @@
return msg
}

var HandlerModule = fx.Module("server",
fx.Provide(
func ProvideHandler() fx.Option {
return fx.Provide(

Check warning on line 185 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L184-L185

Added lines #L184 - L185 were not covered by tests
func(in HandlerTelemetryIn) *HandlerTelemetry {
return &HandlerTelemetry{
errorRequests: in.ErrorRequests,
Expand All @@ -191,20 +192,22 @@
modifiedWRPCount: in.ModifiedWRPCount,
incomingQueueLatency: in.IncomingQueueLatency,
}
}),
fx.Provide(
},

Check warning on line 195 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L195

Added line #L195 was not covered by tests
func(in ServerHandlerIn) (ServerHandlerOut, error) {
//Hard coding maxOutstanding and incomingQueueDepth for now
handler, err := New(in.Logger, in.Telemetry, 0.0, 0.0)
handler, err := New(in.CaduceusSenderWrapper, in.Logger, in.Telemetry, 0.0, 0.0)

Check warning on line 198 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L198

Added line #L198 was not covered by tests
return ServerHandlerOut{
Handler: handler,
}, err
},
),
)

func New(log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
)
}
func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {

Check warning on line 205 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L205

Added line #L205 was not covered by tests
return &ServerHandler{
caduceusHandler: &CaduceusHandler{
senderWrapper: senderWrapper,
Logger: log,
},

Check warning on line 210 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L207-L210

Added lines #L207 - L210 were not covered by tests
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
Expand Down
12 changes: 7 additions & 5 deletions httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"strconv"
"time"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand All @@ -33,19 +33,21 @@

type metricWrapper struct {
now func() time.Time
queryLatency metrics.Histogram
queryLatency prometheus.HistogramVec
id string
}

func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*metricWrapper, error) {
func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) {

Check warning on line 40 in httpClient.go

View check run for this annotation

Codecov / codecov/patch

httpClient.go#L40

Added line #L40 was not covered by tests
if now == nil {
now = time.Now
}
if queryLatency == nil {
if queryLatency.MetricVec == nil {

Check warning on line 44 in httpClient.go

View check run for this annotation

Codecov / codecov/patch

httpClient.go#L44

Added line #L44 was not covered by tests
return nil, errNilHistogram
}
return &metricWrapper{
now: now,
queryLatency: queryLatency,
id: id,

Check warning on line 50 in httpClient.go

View check run for this annotation

Codecov / codecov/patch

httpClient.go#L50

Added line #L50 was not covered by tests
}, nil
}

Expand All @@ -62,7 +64,7 @@

// find time difference, add to metric
var latency = endTime.Sub(startTime)
m.queryLatency.With("code", code).Observe(latency.Seconds())
m.queryLatency.With(prometheus.Labels{UrlLabel: m.id, CodeLabel: code}).Observe(latency.Seconds())

Check warning on line 67 in httpClient.go

View check run for this annotation

Codecov / codecov/patch

httpClient.go#L67

Added line #L67 was not covered by tests

return resp, err
})
Expand Down
56 changes: 56 additions & 0 deletions listenerStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import "time"

//This is a stub for the ancla listener. This will be removed once we can add ancla back into caduceus

type ListenerStub struct {
PartnerIds []string
Webhook Webhook
}

type Webhook struct {
// Address is the subscription request origin HTTP Address.
Address string `json:"registered_from_address"`

// Config contains data to inform how events are delivered.
Config DeliveryConfig `json:"config"`

// FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow.
// Optional, set to "" to disable notifications.
FailureURL string `json:"failure_url"`

// Events is the list of regular expressions to match an event type against.
Events []string `json:"events"`

// Matcher type contains values to match against the metadata.
Matcher MetadataMatcherConfig `json:"matcher,omitempty"`

// Duration describes how long the subscription lasts once added.
Duration time.Duration `json:"duration"`

// Until describes the time this subscription expires.
Until time.Time `json:"until"`
}

// DeliveryConfig is a Webhook substructure with data related to event delivery.
type DeliveryConfig struct {
// URL is the HTTP URL to deliver messages to.
URL string `json:"url"`

// ContentType is content type value to set WRP messages to (unless already specified in the WRP).
ContentType string `json:"content_type"`

// Secret is the string value for the SHA1 HMAC.
// (Optional, set to "" to disable behavior).
Secret string `json:"secret,omitempty"`

// AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL.
AlternativeURLs []string `json:"alt_urls,omitempty"`
}

// MetadataMatcherConfig is Webhook substructure with config to match event metadata.
type MetadataMatcherConfig struct {
// DeviceID is the list of regular expressions to match device id type against.
DeviceID []string `json:"device_id"`
}
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func caduceus(arguments []string, run bool) error {
arrangehttp.ProvideServer("servers.primary"),
arrangehttp.ProvideServer("servers.alternate"),

HandlerModule,
ProvideHandler(),
ProvideSenderWrapper(),
touchstone.Provide(),
touchhttp.Provide(),
ProvideMetrics(),
Expand Down
69 changes: 45 additions & 24 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package main

import (
"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"

Expand Down Expand Up @@ -49,29 +48,22 @@
CodeLabel = "code"
)

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}

func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram {
return m.NewHistogram(QueryDurationHistogram, 11)
type SenderMetricsIn struct {
fx.In
QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"`
EventType prometheus.CounterVec `name:"incoming_event_type_count"`
DeliveryCounter prometheus.CounterVec `name:"delivery_count"`
DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"`
DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"`
CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"`
SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"`
DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"`
ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"`
ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"`
ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"`
ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"`
OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"`
ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"`
}

// TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called
Expand Down Expand Up @@ -161,3 +153,32 @@
}, EventLabel),
)
}

func ProvideSenderMetrics() fx.Option {
return fx.Provide(
func(in SenderMetricsIn) (SenderWrapperMetrics, OutboundSenderMetrics) {
outbounderMetrics := OutboundSenderMetrics{
DeliveryCounter: in.DeliveryCounter,
DeliveryRetryCounter: in.DeliveryRetryCounter,
DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge,
CutOffCounter: in.CutOffCounter,
SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter,
DropsDueToPanic: in.DropsDueToPanic,
ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge,
ConsumerDropUntilGauge: in.ConsumerDropUntilGauge,
ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge,
ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge,
OutgoingQueueDepth: in.OutgoingQueueDepth,
ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge,
}
wrapperMetrics := SenderWrapperMetrics{
QueryLatency: in.QueryLatency,
EventType: in.EventType,
}

return wrapperMetrics, outbounderMetrics
},

Check warning on line 180 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L157-L180

Added lines #L157 - L180 were not covered by tests
)
}


Loading
Loading