Skip to content

Commit

Permalink
updated names and moved listener out of the sinkWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Feb 15, 2024
1 parent 519b432 commit 66ee034
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 48 deletions.
54 changes: 41 additions & 13 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,52 @@ type CaduceusConfig struct {
AuthHeader []string
NumWorkerThreads int
JobQueueSize int
Sender SenderConfig
Sink SinkConfig
JWTValidators []JWTValidator
AllowInsecureTLS bool
}

type SenderConfig struct {
NumWorkersPerSender int
QueueSizePerSender int
CutOffPeriod time.Duration
Linger time.Duration
ClientTimeout time.Duration
type SinkConfig struct {
// The number of workers to assign to each SinkSender created.
NumWorkersPerSender int

// The queue size to assign to each SinkSender created.
QueueSizePerSender int

// The cut off time to assign to each SinkSender created.
CutOffPeriod time.Duration

// The amount of time to let expired SinkSenders linger before
// shutting them down and cleaning up the resources associated with them.
Linger time.Duration

// Number of delivery retries before giving up
DeliveryRetries int

// Time in between delivery retries
DeliveryInterval time.Duration

// CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message
// has no partner IDs.
CustomPIDs []string

// DisablePartnerIDs dictates whether or not to enforce the partner ID check.
DisablePartnerIDs bool

// ClientTimeout specifies a time limit for requests made by the SinkSender's Client
ClientTimeout time.Duration

//DisableClientHostnameValidation used in HTTP Client creation
DisableClientHostnameValidation bool
ResponseHeaderTimeout time.Duration
IdleConnTimeout time.Duration
DeliveryRetries int
DeliveryInterval time.Duration
CustomPIDs []string
DisablePartnerIDs bool

// ResponseHeaderTimeout specifies the amount of time to wait for a server's response headers after fully
// writing the request
ResponseHeaderTimeout time.Duration

//IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
IdleConnTimeout time.Duration
}

type RequestHandler interface {
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
Servers Servers
ArgusClientTimeout HttpClientTimeout
JWTValidator JWTValidator
Sender SenderConfig
Sink SinkConfig
Service Service
AuthHeader []string
Server string
Expand Down
2 changes: 1 addition & 1 deletion httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
errNilHistogram = errors.New("histogram cannot be nil")
)

func nopHTTPClient(next Client) Client {
func nopClient(next Client) Client {
return next
}

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func caduceus(arguments []string, run bool) error {
goschtalt.UnmarshalFunc[sallust.Config]("logging"),
goschtalt.UnmarshalFunc[candlelight.Config]("tracing"),
goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"),
goschtalt.UnmarshalFunc[SenderConfig]("sender"),
goschtalt.UnmarshalFunc[SinkConfig]("sender"),
goschtalt.UnmarshalFunc[Service]("service"),
goschtalt.UnmarshalFunc[[]string]("authHeader"),
goschtalt.UnmarshalFunc[bool]("previousVersionSupport"),
Expand Down
34 changes: 16 additions & 18 deletions sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ type SinkSender struct {
clientMiddleware func(Client) Client
}

func newSinkSender(sw *SinkWrapper) (s Sender, err error) {
func newSinkSender(sw *SinkWrapper, listener ListenerStub) (s Sender, err error) {
if sw.clientMiddleware == nil {
sw.clientMiddleware = nopHTTPClient
sw.clientMiddleware = nopClient
}
if sw.client == nil {
err = errors.New("nil Client")
Expand All @@ -134,21 +134,19 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) {
return
}

// decoratedLogger := osf.Logger.With(zap.String("webhook.address", osf.Listener.Webhook.Address))
decoratedLogger := sw.logger.With(zap.String("webhook.address", listener.Webhook.Address))

sinkSender := &SinkSender{
// id: osf.Listener.Webhook.Config.URL,
listener: sw.listener,
client: sw.client,
queueSize: sw.config.QueueSizePerSender,
cutOffPeriod: sw.config.CutOffPeriod,
// deliverUntil: osf.Listener.Webhook.Until,
// logger: decoratedLogger,
client: sw.client,
queueSize: sw.config.QueueSizePerSender,
cutOffPeriod: sw.config.CutOffPeriod,
deliverUntil: listener.Webhook.Until,
logger: decoratedLogger,
deliveryRetries: sw.config.DeliveryRetries,
deliveryInterval: sw.config.DeliveryInterval,
maxWorkers: sw.config.NumWorkersPerSender,
failureMsg: FailureMessage{
Original: sw.listener,
Original: listener,
Text: failureText,
CutOffPeriod: sw.config.CutOffPeriod.String(),
QueueSize: sw.config.QueueSizePerSender,
Expand All @@ -160,7 +158,7 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) {
}

// Don't share the secret with others when there is an error.
// caduceusOutboundSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX"
sinkSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX"

CreateOutbounderMetrics(sw.metrics, sinkSender)

Expand All @@ -170,9 +168,9 @@ func newSinkSender(sw *SinkWrapper) (s Sender, err error) {

sinkSender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender))

// if err = caduceusOutboundSender.Update(osf.Listener); nil != err {
// return
// }
if err = sinkSender.Update(listener); nil != err {
return
}

sinkSender.workers = semaphore.New(sinkSender.maxWorkers)
sinkSender.wg.Add(1)
Expand Down Expand Up @@ -207,14 +205,14 @@ func (s *SinkSender) Update(wh ListenerStub) (err error) {
events = append(events, re)
}
if len(events) < 1 {
err = errors.New("events must not be empty.")
err = errors.New("events must not be empty")
return
}

// Create the matcher regex objects
matcher := []*regexp.Regexp{}
for _, item := range wh.Webhook.Matcher.DeviceID {
if ".*" == item {
if item == ".*" {
// Match everything - skip the filtering
matcher = []*regexp.Regexp{}
break
Expand Down Expand Up @@ -263,7 +261,7 @@ func (s *SinkSender) Update(wh ListenerStub) (err error) {
s.matcher = matcher
}

if 0 == urlCount {
if urlCount == 0 {
s.urls = ring.New(1)
s.urls.Value = s.id
} else {
Expand Down
30 changes: 16 additions & 14 deletions sinkWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type SinkWrapperIn struct {
fx.In

Tracing candlelight.Tracing
SenderConfig SenderConfig
SinkConfig SinkConfig
WrapperMetrics SinkWrapperMetrics
SenderMetrics SinkSenderMetrics
Logger *zap.Logger
Expand All @@ -43,24 +43,26 @@ type Wrapper interface {

// Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters.
type SinkWrapper struct {
// The amount of time to let expired OutboundSenders linger before
// The amount of time to let expired SinkSenders linger before
// shutting them down and cleaning up the resources associated with them.
linger time.Duration

// The logger implementation to share with OutboundSenders.
// The logger implementation to share with sinkSenders.
logger *zap.Logger

//the configuration needed for eash sinkSender
config SinkConfig

mutex *sync.RWMutex
senders map[string]Sender
eventType *prometheus.CounterVec
queryLatency prometheus.ObserverVec
wg sync.WaitGroup
shutdown chan struct{}
config SenderConfig
metrics SinkSenderMetrics
client Client //should this be a part of wrapper or sender?
listener ListenerStub //should this be a part of wrapper or sender?
clientMiddleware func(Client) Client //should this be a part of wrapper or sender?
client Client //TODO: keeping here for now - but might move to SinkSender in a later PR
clientMiddleware func(Client) Client //TODO: keeping here for now - but might move to SinkSender in a later PR

}

func ProvideWrapper() fx.Option {
Expand All @@ -74,16 +76,16 @@ func ProvideWrapper() fx.Option {

func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) {
sw = &SinkWrapper{
linger: in.SenderConfig.Linger,
linger: in.SinkConfig.Linger,
logger: in.Logger,
eventType: in.WrapperMetrics.EventType,
queryLatency: in.WrapperMetrics.QueryLatency,
config: in.SenderConfig,
config: in.SinkConfig,
metrics: in.SenderMetrics,
}

if in.SenderConfig.Linger <= 0 {
linger := fmt.Sprintf("linger not positive: %v", in.SenderConfig.Linger)
if in.SinkConfig.Linger <= 0 {
linger := fmt.Sprintf("linger not positive: %v", in.SinkConfig.Linger)
err = errors.New(linger)
sw = nil
return
Expand All @@ -98,7 +100,7 @@ func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) {
}

// no longer being initialized at start up - needs to be initialized by the creation of the outbound sender
func NewRoundTripper(config SenderConfig, tracing candlelight.Tracing) (tr http.RoundTripper) {
func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.RoundTripper) {
tr = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation},
MaxIdleConnsPerHost: config.NumWorkersPerSender,
Expand Down Expand Up @@ -136,14 +138,14 @@ func (sw *SinkWrapper) Update(list []ListenerStub) {
sender, ok := sw.senders[inValue.ID]
if !ok {
// osf.Sender = sw.sender
sw.listener = inValue.Listener
listener := inValue.Listener
metricWrapper, err := newMetricWrapper(time.Now, sw.queryLatency, inValue.ID)

if err != nil {
continue
}
sw.clientMiddleware = metricWrapper.roundTripper
ss, err := newSinkSender(sw)
ss, err := newSinkSender(sw, listener)
if nil == err {
sw.senders[inValue.ID] = ss
}
Expand Down

0 comments on commit 66ee034

Please sign in to comment.