Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Senderwrapper #436

Merged
merged 7 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
go.uber.org/fx v1.20.1
go.uber.org/zap v1.26.0
gopkg.in/dealancer/validate.v2 v2.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.19.0/go.mod h1:7RDsakVbjb124lYDEjKuHTuzdqf04hLMEvPv/ufmqMs=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.38.0/go.mod h1:w6xNm+kC506KNs5cknSHal6dtdRnc4uema0uN9GSQc0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0/go.mod h1:pcQ3MM3SWvrA71U4GDqv9UFDJ3HQsW7y5ZO3tDTlUdI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/contrib/propagators v0.19.0/go.mod h1:4QOdZClXISU5S43xZxk5tYaWcpb+lehqfKtE6PK6msE=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
Expand Down
17 changes: 11 additions & 6 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

type ServerHandlerIn struct {
fx.In
Logger *zap.Logger
Telemetry *HandlerTelemetry
CaduceusSenderWrapper *CaduceusSenderWrapper
Logger *zap.Logger
Telemetry *HandlerTelemetry
}

type ServerHandlerOut struct {
Expand All @@ -32,8 +33,8 @@

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
log *zap.Logger
// caduceusHandler RequestHandler
log *zap.Logger
caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
maxOutstanding int64
Expand Down Expand Up @@ -195,16 +196,20 @@
fx.Provide(
func(in ServerHandlerIn) (ServerHandlerOut, error) {
//Hard coding maxOutstanding and incomingQueueDepth for now
handler, err := New(in.Logger, in.Telemetry, 0.0, 0.0)
handler, err := New(in.CaduceusSenderWrapper, in.Logger, in.Telemetry, 0.0, 0.0)

Check warning on line 199 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L199

Added line #L199 was not covered by tests
return ServerHandlerOut{
Handler: handler,
}, err
},
),
)

func New(log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {

Check warning on line 207 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L207

Added line #L207 was not covered by tests
return &ServerHandler{
caduceusHandler: &CaduceusHandler{
senderWrapper: senderWrapper,
Logger: log,
},

Check warning on line 212 in http.go

View check run for this annotation

Codecov / codecov/patch

http.go#L209-L212

Added lines #L209 - L212 were not covered by tests
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func caduceus(arguments []string, run bool) error {
arrangehttp.ProvideServer("servers.alternate"),

HandlerModule,
SenderWrapperModule,
touchstone.Provide(),
touchhttp.Provide(),
ProvideMetrics(),
Expand Down
193 changes: 119 additions & 74 deletions senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,118 +3,163 @@
package main

import (
"crypto/tls"
"errors"
"net/http"
"sync"
"time"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/wrp-go/v3"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/fx"
"go.uber.org/zap"
)

// SenderWrapperFactory configures the CaduceusSenderWrapper for creation
type SenderWrapperFactory struct {
type CaduceusSenderWrapperIn struct {
fx.In

Tracing candlelight.Tracing
SenderConfig SenderConfig
Metrics SenderMetrics
Logger *zap.Logger
}

type CaduceusSenderWrapperOut struct {
fx.Out
CaduceusSenderWrapper *CaduceusSenderWrapper
}
type SenderMetricsIn struct {
fx.In
QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"`
EventType prometheus.CounterVec `name:"incoming_event_type_count"`
}

type SenderMetrics struct {
QueryLatency prometheus.HistogramVec
EventType prometheus.CounterVec
}
type SenderWrapper interface {
// Update([]ancla.InternalWebhook)
Queue(*wrp.Message)
Shutdown(bool)
}

// CaduceusSenderWrapper contains no external parameters.
type CaduceusSenderWrapper struct {
// The http client Do() function to share with OutboundSenders.
sender httpClient
// The number of workers to assign to each OutboundSender created.
NumWorkersPerSender int
numWorkersPerSender int

// The queue size to assign to each OutboundSender created.
QueueSizePerSender int

// The cut off time to assign to each OutboundSender created.
CutOffPeriod time.Duration
queueSizePerSender int

// Number of delivery retries before giving up
DeliveryRetries int
deliveryRetries int

// Time in between delivery retries
DeliveryInterval time.Duration
deliveryInterval time.Duration

// The cut off time to assign to each OutboundSender created.
cutOffPeriod time.Duration

// The amount of time to let expired OutboundSenders linger before
// shutting them down and cleaning up the resources associated with them.
Linger time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry

// The metrics counter for dropped messages due to invalid payloads
DroppedMsgCounter metrics.Counter

EventType metrics.Counter
linger time.Duration

// The logger implementation to share with OutboundSenders.
Logger *zap.Logger
logger *zap.Logger

// The http client Do() function to share with OutboundSenders.
Sender httpClient
mutex *sync.RWMutex
senders map[string]OutboundSender
eventType prometheus.CounterVec
queryLatency prometheus.HistogramVec
wg sync.WaitGroup
shutdown chan struct{}

// CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message
// has no partner IDs.
CustomPIDs []string
customPIDs []string

// DisablePartnerIDs dictates whether or not to enforce the partner ID check.
DisablePartnerIDs bool
}

type SenderWrapper interface {
// Update([]ancla.InternalWebhook)
Queue(*wrp.Message)
Shutdown(bool)
disablePartnerIDs bool
}

// CaduceusSenderWrapper contains no external parameters.
type CaduceusSenderWrapper struct {
sender httpClient
numWorkersPerSender int
queueSizePerSender int
deliveryRetries int
deliveryInterval time.Duration
cutOffPeriod time.Duration
linger time.Duration
logger *zap.Logger
mutex sync.RWMutex
senders map[string]OutboundSender
metricsRegistry CaduceusMetricsRegistry
eventType metrics.Counter
queryLatency metrics.Histogram
wg sync.WaitGroup
shutdown chan struct{}
customPIDs []string
disablePartnerIDs bool
}
var SenderWrapperModule = fx.Module("caduceusSenderWrapper",
fx.Provide(
func(in SenderMetricsIn) SenderMetrics {
return SenderMetrics{
QueryLatency: in.QueryLatency,
EventType: in.EventType,
}
},

Check warning on line 98 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L93-L98

Added lines #L93 - L98 were not covered by tests
),
fx.Provide(
func(in CaduceusSenderWrapperIn) http.RoundTripper {
return NewRoundTripper(in.SenderConfig, in.Tracing)
},

Check warning on line 103 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L101-L103

Added lines #L101 - L103 were not covered by tests
),
fx.Provide(
func(tr http.RoundTripper, in CaduceusSenderWrapperIn) (CaduceusSenderWrapperOut, error) {
csw, err := NewSenderWrapper(tr, in)
return CaduceusSenderWrapperOut{
CaduceusSenderWrapper: csw,
}, err
},

Check warning on line 111 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L106-L111

Added lines #L106 - L111 were not covered by tests
),
)

// New produces a new SenderWrapper implemented by CaduceusSenderWrapper
// based on the factory configuration.
func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) {
caduceusSenderWrapper := &CaduceusSenderWrapper{
sender: swf.Sender,
numWorkersPerSender: swf.NumWorkersPerSender,
queueSizePerSender: swf.QueueSizePerSender,
deliveryRetries: swf.DeliveryRetries,
deliveryInterval: swf.DeliveryInterval,
cutOffPeriod: swf.CutOffPeriod,
linger: swf.Linger,
logger: swf.Logger,
metricsRegistry: swf.MetricsRegistry,
customPIDs: swf.CustomPIDs,
disablePartnerIDs: swf.DisablePartnerIDs,
// New produces a new CaduceusSenderWrapper
// based on the SenderConfig
func NewSenderWrapper(tr http.RoundTripper, in CaduceusSenderWrapperIn) (csw *CaduceusSenderWrapper, err error) {
csw = &CaduceusSenderWrapper{
numWorkersPerSender: in.SenderConfig.NumWorkersPerSender,
queueSizePerSender: in.SenderConfig.QueueSizePerSender,
deliveryRetries: in.SenderConfig.DeliveryRetries,
deliveryInterval: in.SenderConfig.DeliveryInterval,
cutOffPeriod: in.SenderConfig.CutOffPeriod,
linger: in.SenderConfig.Linger,
logger: in.Logger,
customPIDs: in.SenderConfig.CustomPIDs,
disablePartnerIDs: in.SenderConfig.DisablePartnerIDs,
eventType: in.Metrics.EventType,
queryLatency: in.Metrics.QueryLatency,

Check warning on line 129 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L117-L129

Added lines #L117 - L129 were not covered by tests
}
csw.sender = doerFunc((&http.Client{
Transport: tr,
Timeout: in.SenderConfig.ClientTimeout,
}).Do)

Check warning on line 134 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L131-L134

Added lines #L131 - L134 were not covered by tests

if swf.Linger <= 0 {
if in.SenderConfig.Linger <= 0 {

Check warning on line 136 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L136

Added line #L136 was not covered by tests
err = errors.New("Linger must be positive.")
sw = nil
csw = nil

Check warning on line 138 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L138

Added line #L138 was not covered by tests
return
}

caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter)
csw.senders = make(map[string]OutboundSender)
csw.shutdown = make(chan struct{})

Check warning on line 143 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L142-L143

Added lines #L142 - L143 were not covered by tests

caduceusSenderWrapper.senders = make(map[string]OutboundSender)
caduceusSenderWrapper.shutdown = make(chan struct{})
csw.wg.Add(1)
go undertaker(csw)

Check warning on line 146 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L145-L146

Added lines #L145 - L146 were not covered by tests

caduceusSenderWrapper.wg.Add(1)
go undertaker(caduceusSenderWrapper)
return

Check warning on line 148 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L148

Added line #L148 was not covered by tests
}

func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http.RoundTripper) {
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation},
MaxIdleConnsPerHost: config.NumWorkersPerSender,
ResponseHeaderTimeout: config.ResponseHeaderTimeout,
IdleConnTimeout: config.IdleConnTimeout,
}

Check warning on line 157 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L151-L157

Added lines #L151 - L157 were not covered by tests

sw = caduceusSenderWrapper
tr = otelhttp.NewTransport(tr,
otelhttp.WithPropagators(tracing.Propagator()),
otelhttp.WithTracerProvider(tracing.TracerProvider()),
)

Check warning on line 162 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L159-L162

Added lines #L159 - L162 were not covered by tests
return
}

Expand Down Expand Up @@ -177,7 +222,7 @@
sw.mutex.RLock()
defer sw.mutex.RUnlock()

sw.eventType.With("event", msg.FindEventStringSubMatch()).Add(1)
sw.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1)

Check warning on line 225 in senderWrapper.go

View check run for this annotation

Codecov / codecov/patch

senderWrapper.go#L225

Added line #L225 was not covered by tests

for _, v := range sw.senders {
v.Queue(msg)
Expand Down
Loading
Loading