From 66ee034590493922c6276a3004aa9f92401add20 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 15 Feb 2024 16:18:15 -0500 Subject: [PATCH] updated names and moved listener out of the sinkWrapper --- caduceus_type.go | 54 ++++++++++++++++++++++++++++++++++++------------ config.go | 2 +- httpClient.go | 2 +- main.go | 2 +- sinkSender.go | 34 ++++++++++++++---------------- sinkWrapper.go | 30 ++++++++++++++------------- 6 files changed, 76 insertions(+), 48 deletions(-) diff --git a/caduceus_type.go b/caduceus_type.go index 8575dfa1..3ccfcdcb 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -16,24 +16,52 @@ type CaduceusConfig struct { AuthHeader []string NumWorkerThreads int JobQueueSize int - Sender SenderConfig + Sink SinkConfig JWTValidators []JWTValidator AllowInsecureTLS bool } -type SenderConfig struct { - NumWorkersPerSender int - QueueSizePerSender int - CutOffPeriod time.Duration - Linger time.Duration - ClientTimeout time.Duration +type SinkConfig struct { + // The number of workers to assign to each SinkSender created. + NumWorkersPerSender int + + // The queue size to assign to each SinkSender created. + QueueSizePerSender int + + // The cut off time to assign to each SinkSender created. + CutOffPeriod time.Duration + + // The amount of time to let expired SinkSenders linger before + // shutting them down and cleaning up the resources associated with them. + Linger time.Duration + + // Number of delivery retries before giving up + DeliveryRetries int + + // Time in between delivery retries + DeliveryInterval time.Duration + + // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message + // has no partner IDs. + CustomPIDs []string + + // DisablePartnerIDs dictates whether or not to enforce the partner ID check. + DisablePartnerIDs bool + + // ClientTimeout specifies a time limit for requests made by the SinkSender's Client + ClientTimeout time.Duration + + //DisableClientHostnameValidation used in HTTP Client creation DisableClientHostnameValidation bool - ResponseHeaderTimeout time.Duration - IdleConnTimeout time.Duration - DeliveryRetries int - DeliveryInterval time.Duration - CustomPIDs []string - DisablePartnerIDs bool + + // ResponseHeaderTimeout specifies the amount of time to wait for a server's response headers after fully + // writing the request + ResponseHeaderTimeout time.Duration + + //IdleConnTimeout is the maximum amount of time an idle + // (keep-alive) connection will remain idle before closing + // itself. + IdleConnTimeout time.Duration } type RequestHandler interface { diff --git a/config.go b/config.go index 31fba38d..182bacc3 100644 --- a/config.go +++ b/config.go @@ -26,7 +26,7 @@ type Config struct { Servers Servers ArgusClientTimeout HttpClientTimeout JWTValidator JWTValidator - Sender SenderConfig + Sink SinkConfig Service Service AuthHeader []string Server string diff --git a/httpClient.go b/httpClient.go index 51057378..c420108c 100644 --- a/httpClient.go +++ b/httpClient.go @@ -16,7 +16,7 @@ var ( errNilHistogram = errors.New("histogram cannot be nil") ) -func nopHTTPClient(next Client) Client { +func nopClient(next Client) Client { return next } diff --git a/main.go b/main.go index 9a73d3e0..58416a99 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,7 @@ func caduceus(arguments []string, run bool) error { goschtalt.UnmarshalFunc[sallust.Config]("logging"), goschtalt.UnmarshalFunc[candlelight.Config]("tracing"), goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"), - goschtalt.UnmarshalFunc[SenderConfig]("sender"), + goschtalt.UnmarshalFunc[SinkConfig]("sender"), goschtalt.UnmarshalFunc[Service]("service"), goschtalt.UnmarshalFunc[[]string]("authHeader"), goschtalt.UnmarshalFunc[bool]("previousVersionSupport"), diff --git a/sinkSender.go b/sinkSender.go index 57153d29..f10350e2 100644 --- a/sinkSender.go +++ b/sinkSender.go @@ -115,9 +115,9 @@ type SinkSender struct { clientMiddleware func(Client) Client } -func newSinkSender(sw *SinkWrapper) (s Sender, err error) { +func newSinkSender(sw *SinkWrapper, listener ListenerStub) (s Sender, err error) { if sw.clientMiddleware == nil { - sw.clientMiddleware = nopHTTPClient + sw.clientMiddleware = nopClient } if sw.client == nil { err = errors.New("nil Client") @@ -134,21 +134,19 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) { return } - // decoratedLogger := osf.Logger.With(zap.String("webhook.address", osf.Listener.Webhook.Address)) + decoratedLogger := sw.logger.With(zap.String("webhook.address", listener.Webhook.Address)) sinkSender := &SinkSender{ - // id: osf.Listener.Webhook.Config.URL, - listener: sw.listener, - client: sw.client, - queueSize: sw.config.QueueSizePerSender, - cutOffPeriod: sw.config.CutOffPeriod, - // deliverUntil: osf.Listener.Webhook.Until, - // logger: decoratedLogger, + client: sw.client, + queueSize: sw.config.QueueSizePerSender, + cutOffPeriod: sw.config.CutOffPeriod, + deliverUntil: listener.Webhook.Until, + logger: decoratedLogger, deliveryRetries: sw.config.DeliveryRetries, deliveryInterval: sw.config.DeliveryInterval, maxWorkers: sw.config.NumWorkersPerSender, failureMsg: FailureMessage{ - Original: sw.listener, + Original: listener, Text: failureText, CutOffPeriod: sw.config.CutOffPeriod.String(), QueueSize: sw.config.QueueSizePerSender, @@ -160,7 +158,7 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) { } // Don't share the secret with others when there is an error. - // caduceusOutboundSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" + sinkSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" CreateOutbounderMetrics(sw.metrics, sinkSender) @@ -170,9 +168,9 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) { sinkSender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender)) - // if err = caduceusOutboundSender.Update(osf.Listener); nil != err { - // return - // } + if err = sinkSender.Update(listener); nil != err { + return + } sinkSender.workers = semaphore.New(sinkSender.maxWorkers) sinkSender.wg.Add(1) @@ -207,14 +205,14 @@ func (s *SinkSender) Update(wh ListenerStub) (err error) { events = append(events, re) } if len(events) < 1 { - err = errors.New("events must not be empty.") + err = errors.New("events must not be empty") return } // Create the matcher regex objects matcher := []*regexp.Regexp{} for _, item := range wh.Webhook.Matcher.DeviceID { - if ".*" == item { + if item == ".*" { // Match everything - skip the filtering matcher = []*regexp.Regexp{} break @@ -263,7 +261,7 @@ func (s *SinkSender) Update(wh ListenerStub) (err error) { s.matcher = matcher } - if 0 == urlCount { + if urlCount == 0 { s.urls = ring.New(1) s.urls.Value = s.id } else { diff --git a/sinkWrapper.go b/sinkWrapper.go index dd4899aa..d0aaa6e1 100644 --- a/sinkWrapper.go +++ b/sinkWrapper.go @@ -23,7 +23,7 @@ type SinkWrapperIn struct { fx.In Tracing candlelight.Tracing - SenderConfig SenderConfig + SinkConfig SinkConfig WrapperMetrics SinkWrapperMetrics SenderMetrics SinkSenderMetrics Logger *zap.Logger @@ -43,24 +43,26 @@ type Wrapper interface { // Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters. type SinkWrapper struct { - // The amount of time to let expired OutboundSenders linger before + // The amount of time to let expired SinkSenders linger before // shutting them down and cleaning up the resources associated with them. linger time.Duration - // The logger implementation to share with OutboundSenders. + // The logger implementation to share with sinkSenders. logger *zap.Logger + //the configuration needed for eash sinkSender + config SinkConfig + mutex *sync.RWMutex senders map[string]Sender eventType *prometheus.CounterVec queryLatency prometheus.ObserverVec wg sync.WaitGroup shutdown chan struct{} - config SenderConfig metrics SinkSenderMetrics - client Client //should this be a part of wrapper or sender? - listener ListenerStub //should this be a part of wrapper or sender? - clientMiddleware func(Client) Client //should this be a part of wrapper or sender? + client Client //TODO: keeping here for now - but might move to SinkSender in a later PR + clientMiddleware func(Client) Client //TODO: keeping here for now - but might move to SinkSender in a later PR + } func ProvideWrapper() fx.Option { @@ -74,16 +76,16 @@ func ProvideWrapper() fx.Option { func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { sw = &SinkWrapper{ - linger: in.SenderConfig.Linger, + linger: in.SinkConfig.Linger, logger: in.Logger, eventType: in.WrapperMetrics.EventType, queryLatency: in.WrapperMetrics.QueryLatency, - config: in.SenderConfig, + config: in.SinkConfig, metrics: in.SenderMetrics, } - if in.SenderConfig.Linger <= 0 { - linger := fmt.Sprintf("linger not positive: %v", in.SenderConfig.Linger) + if in.SinkConfig.Linger <= 0 { + linger := fmt.Sprintf("linger not positive: %v", in.SinkConfig.Linger) err = errors.New(linger) sw = nil return @@ -98,7 +100,7 @@ func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { } // no longer being initialized at start up - needs to be initialized by the creation of the outbound sender -func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { +func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { tr = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, MaxIdleConnsPerHost: config.NumWorkersPerSender, @@ -136,14 +138,14 @@ func (sw *SinkWrapper) Update(list []ListenerStub) { sender, ok := sw.senders[inValue.ID] if !ok { // osf.Sender = sw.sender - sw.listener = inValue.Listener + listener := inValue.Listener metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID) if err != nil { continue } sw.clientMiddleware = metricWrapper.roundTripper - ss, err := newSinkSender(sw) + ss, err := newSinkSender(sw, listener) if nil == err { sw.senders[inValue.ID] = ss }