From 4138bf5ff398164da72edd7efa86b056d2af9719 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Wed, 10 Apr 2024 11:03:04 -0400 Subject: [PATCH 1/3] updated package structure to include internal package --- main.go => caduceus.go | 26 ++-- main_test.go => caduceus_test.go | 6 +- cmd/main.go | 29 +++++ config.go | 17 ++- client.go => internal/client/client.go | 4 +- .../client/httpClient.go | 11 +- .../client/httpClient_test.go | 2 +- internal/handler/caduceus_type.go | 24 ++++ .../handler/caduceus_type_test.go | 10 +- http.go => internal/handler/http.go | 19 +-- http_test.go => internal/handler/http_test.go | 116 +++++++++--------- .../handler/primaryHandler.go | 21 +--- .../logging/basculeLogging.go | 7 +- .../logging/basculeLogging_test.go | 2 +- metrics.go => internal/metrics/metrics.go | 12 +- .../metrics/metrics_test.go | 2 +- mocks_test.go => internal/mocks/mocks.go | 24 ++-- .../sink/listenerStub.go | 2 +- matcher.go => internal/sink/matcher.go | 2 +- sink.go => internal/sink/sink.go | 27 ++-- .../sink/sinkConfig.go | 26 +--- sinkSender.go => internal/sink/sinkSender.go | 54 ++++---- .../sink/sinkSender_test.go | 2 +- .../sink/sinkWrapper.go | 21 ++-- .../sink/sinkWrapper_test.go | 2 +- logger.go | 2 +- routes.go | 20 ++- 27 files changed, 274 insertions(+), 216 deletions(-) rename main.go => caduceus.go (91%) rename main_test.go => caduceus_test.go (96%) create mode 100644 cmd/main.go rename client.go => internal/client/client.go (91%) rename httpClient.go => internal/client/httpClient.go (74%) rename httpClient_test.go => internal/client/httpClient_test.go (99%) create mode 100644 internal/handler/caduceus_type.go rename caduceus_type_test.go => internal/handler/caduceus_type_test.go (82%) rename http.go => internal/handler/http.go (93%) rename http_test.go => internal/handler/http_test.go (87%) rename primaryHandler.go => internal/handler/primaryHandler.go (90%) rename basculeLogging.go => internal/logging/basculeLogging.go (91%) rename basculeLogging_test.go => internal/logging/basculeLogging_test.go (98%) rename metrics.go => internal/metrics/metrics.go (96%) rename metrics_test.go => internal/metrics/metrics_test.go (98%) rename mocks_test.go => internal/mocks/mocks.go (63%) rename listenerStub.go => internal/sink/listenerStub.go (99%) rename matcher.go => internal/sink/matcher.go (99%) rename sink.go => internal/sink/sink.go (83%) rename caduceus_type.go => internal/sink/sinkConfig.go (73%) rename sinkSender.go => internal/sink/sinkSender.go (85%) rename sinkSender_test.go => internal/sink/sinkSender_test.go (99%) rename sinkWrapper.go => internal/sink/sinkWrapper.go (90%) rename sinkWrapper_test.go => internal/sink/sinkWrapper_test.go (99%) diff --git a/main.go b/caduceus.go similarity index 91% rename from main.go rename to caduceus.go index f8f08ed8..b2ad2219 100644 --- a/main.go +++ b/caduceus.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "fmt" @@ -14,6 +14,10 @@ import ( _ "github.com/goschtalt/yaml-decoder" _ "github.com/goschtalt/yaml-encoder" "github.com/xmidt-org/arrange/arrangehttp" + "github.com/xmidt-org/caduceus/internal/handler" + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/caduceus/internal/sink" + "github.com/xmidt-org/candlelight" "github.com/xmidt-org/sallust" "github.com/xmidt-org/touchstone" @@ -47,7 +51,7 @@ type CLI struct { // Provides a named type so it's a bit easier to flow through & use in fx. type cliArgs []string -func caduceus(arguments []string, run bool) error { +func Caduceus(arguments []string, run bool) error { var ( gscfg *goschtalt.Config @@ -74,7 +78,7 @@ func caduceus(arguments []string, run bool) error { goschtalt.UnmarshalFunc[sallust.Config]("logging"), goschtalt.UnmarshalFunc[candlelight.Config]("tracing"), goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"), - goschtalt.UnmarshalFunc[SinkConfig]("sender"), + goschtalt.UnmarshalFunc[sink.SinkConfig]("sender"), goschtalt.UnmarshalFunc[Service]("service"), goschtalt.UnmarshalFunc[[]string]("authHeader"), goschtalt.UnmarshalFunc[bool]("previousVersionSupport"), @@ -124,10 +128,10 @@ func caduceus(arguments []string, run bool) error { candlelight.New, ), - providePprofEndpoint(), - provideMetricEndpoint(), - provideHealthCheck(), - provideCoreEndpoints(), + ProvidePprofEndpoint(), + ProvideMetricEndpoint(), + ProvideHealthCheck(), + ProvideCoreEndpoints(), arrangehttp.ProvideServer("servers.health"), arrangehttp.ProvideServer("servers.metrics"), @@ -135,11 +139,11 @@ func caduceus(arguments []string, run bool) error { arrangehttp.ProvideServer("servers.primary"), arrangehttp.ProvideServer("servers.alternate"), - ProvideHandler(), - ProvideWrapper(), + handler.ProvideHandler(), + sink.ProvideWrapper(), touchstone.Provide(), touchhttp.Provide(), - ProvideMetrics(), + metrics.ProvideMetrics(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) @@ -219,7 +223,7 @@ func main() { } }() - err := caduceus(os.Args[1:], true) + err := Caduceus(os.Args[1:], true) if err == nil { return diff --git a/main_test.go b/caduceus_test.go similarity index 96% rename from main_test.go rename to caduceus_test.go index 2e138e31..3d61631b 100644 --- a/main_test.go +++ b/caduceus_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "testing" @@ -86,10 +86,10 @@ func Test_caduceus(t *testing.T) { if tc.panic { assert.Panics(func() { - _ = caduceus(tc.args, false) + _ = Caduceus(tc.args, false) }) } else { - err := caduceus(tc.args, false) + err := Caduceus(tc.args, false) assert.ErrorIs(err, tc.expectedErr) } diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 00000000..47ea8c3e --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,29 @@ +// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: LicenseRef-COMCAST + +package main + +import ( + "fmt" + "os" + "runtime/debug" + + "github.com/xmidt-org/caduceus" +) + +func main() { + defer func() { + if r := recover(); r != nil { + fmt.Println("stacktrace from panic: \n" + string(debug.Stack())) + } + }() + + err := caduceus.Caduceus(os.Args[1:], true) + + if err == nil { + return + } + + fmt.Fprintln(os.Stderr, err) + os.Exit(-1) +} diff --git a/config.go b/config.go index 97d46c57..6401f108 100644 --- a/config.go +++ b/config.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "fmt" @@ -11,7 +11,10 @@ import ( "github.com/goschtalt/goschtalt" "github.com/xmidt-org/arrange/arrangehttp" "github.com/xmidt-org/arrange/arrangepprof" + "github.com/xmidt-org/bascule" + "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/candlelight" + "github.com/xmidt-org/clortho" "github.com/xmidt-org/sallust" "github.com/xmidt-org/touchstone" "gopkg.in/dealancer/validate.v2" @@ -26,7 +29,7 @@ type Config struct { Servers Servers ArgusClientTimeout HttpClientTimeout JWTValidator JWTValidator - Sink SinkConfig + Sink sink.SinkConfig Service Service AuthHeader []string Server string @@ -114,6 +117,16 @@ type MetricsOption struct { Subsystem string } +// JWTValidator provides a convenient way to define jwt validator through config files +type JWTValidator struct { + // Config is used to create the clortho Resolver & Refresher for JWT verification keys + Config clortho.Config `json:"config"` + + // Leeway is used to set the amount of time buffer should be given to JWT + // time values, such as nbf + Leeway bascule.Leeway +} + // Collect and process the configuration files and env vars and // produce a configuration object. func provideConfig(cli *CLI) (*goschtalt.Config, error) { diff --git a/client.go b/internal/client/client.go similarity index 91% rename from client.go rename to internal/client/client.go index c6a6b6be..ac9b03df 100644 --- a/client.go +++ b/internal/client/client.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package client import "net/http" @@ -10,7 +10,7 @@ type Client interface { Do(*http.Request) (*http.Response, error) } -func nopClient(next Client) Client { +func NopClient(next Client) Client { return next } diff --git a/httpClient.go b/internal/client/httpClient.go similarity index 74% rename from httpClient.go rename to internal/client/httpClient.go index 0f6bccb2..43d02d6b 100644 --- a/httpClient.go +++ b/internal/client/httpClient.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package client import ( "errors" @@ -10,6 +10,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/caduceus/internal/metrics" ) var ( @@ -22,7 +23,7 @@ type metricWrapper struct { id string } -func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { +func NewMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, id string) (*metricWrapper, error) { if now == nil { now = time.Now } @@ -36,12 +37,12 @@ func newMetricWrapper(now func() time.Time, queryLatency prometheus.ObserverVec, }, nil } -func (m *metricWrapper) roundTripper(next Client) Client { +func (m *metricWrapper) RoundTripper(next Client) Client { return doerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) endTime := m.now() - code := networkError + code := metrics.NetworkError if err == nil { code = strconv.Itoa(resp.StatusCode) @@ -49,7 +50,7 @@ func (m *metricWrapper) roundTripper(next Client) Client { // find time difference, add to metric var latency = endTime.Sub(startTime) - m.queryLatency.With(prometheus.Labels{UrlLabel: m.id, CodeLabel: code}).Observe(latency.Seconds()) + m.queryLatency.With(prometheus.Labels{metrics.UrlLabel: m.id, metrics.CodeLabel: code}).Observe(latency.Seconds()) return resp, err }) diff --git a/httpClient_test.go b/internal/client/httpClient_test.go similarity index 99% rename from httpClient_test.go rename to internal/client/httpClient_test.go index 9802023c..ce8e9b8e 100644 --- a/httpClient_test.go +++ b/internal/client/httpClient_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package client // import ( // "errors" diff --git a/internal/handler/caduceus_type.go b/internal/handler/caduceus_type.go new file mode 100644 index 00000000..e570f6ea --- /dev/null +++ b/internal/handler/caduceus_type.go @@ -0,0 +1,24 @@ +// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 +package handler + +import ( + "go.uber.org/zap" + + "github.com/xmidt-org/caduceus/internal/sink" + "github.com/xmidt-org/wrp-go/v3" +) + +type RequestHandler interface { + HandleRequest(workerID int, msg *wrp.Message) +} + +type CaduceusHandler struct { + wrapper sink.Wrapper + Logger *zap.Logger +} + +func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { + ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID)) + ch.wrapper.Queue(msg) +} diff --git a/caduceus_type_test.go b/internal/handler/caduceus_type_test.go similarity index 82% rename from caduceus_type_test.go rename to internal/handler/caduceus_type_test.go index 73ea159a..8f6f7511 100644 --- a/caduceus_type_test.go +++ b/internal/handler/caduceus_type_test.go @@ -1,25 +1,25 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package handler import ( "testing" "github.com/stretchr/testify/mock" - "go.uber.org/zap/zaptest" - + "github.com/xmidt-org/caduceus/internal/mocks" "github.com/xmidt-org/wrp-go/v3" + "go.uber.org/zap/zaptest" ) func TestCaduceusHandler(t *testing.T) { logger := zaptest.NewLogger(t) - fakeSenderWrapper := new(mockSenderWrapper) + fakeSenderWrapper := new(mocks.MockSinkWrapper) fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once() testHandler := CaduceusHandler{ wrapper: fakeSenderWrapper, - Logger: logger, + Logger: logger, } t.Run("TestHandleRequest", func(t *testing.T) { diff --git a/http.go b/internal/handler/http.go similarity index 93% rename from http.go rename to internal/handler/http.go index 3712b9af..43969345 100644 --- a/http.go +++ b/internal/handler/http.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package handler import ( "io" @@ -13,14 +13,15 @@ import ( "github.com/prometheus/client_golang/prometheus" uuid "github.com/satori/go.uuid" - + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/sallust" "github.com/xmidt-org/wrp-go/v3" ) type ServerHandlerIn struct { fx.In - SinkWrapper *SinkWrapper + SinkWrapper *sink.SinkWrapper Logger *zap.Logger Telemetry *HandlerTelemetry } @@ -57,7 +58,7 @@ type HandlerTelemetry struct { } func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { - eventType := unknownEventType + eventType := metrics.UnknownEventType logger := sallust.Get(request.Context()) // find time difference, add to metric after function finishes defer func(s time.Time) { @@ -155,16 +156,16 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { // use the one the source specified. if msg.ContentType == "" { msg.ContentType = wrp.MimeTypeJson - reason = emptyContentTypeReason + reason = metrics.EmptyContentTypeReason } // Ensure there is a transaction id even if we make one up if msg.TransactionUUID == "" { msg.TransactionUUID = uuid.NewV4().String() if reason == "" { - reason = emptyUUIDReason + reason = metrics.EmptyUUIDReason } else { - reason = bothEmptyReason + reason = metrics.BothEmptyReason } } @@ -196,11 +197,11 @@ func ProvideHandler() fx.Option { }, ) } -func New(sw *SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { +func New(sw *sink.SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { return &ServerHandler{ caduceusHandler: &CaduceusHandler{ wrapper: sw, - Logger: log, + Logger: log, }, telemetry: t, maxOutstanding: maxOutstanding, diff --git a/http_test.go b/internal/handler/http_test.go similarity index 87% rename from http_test.go rename to internal/handler/http_test.go index ac91c1c2..8703d012 100644 --- a/http_test.go +++ b/internal/handler/http_test.go @@ -1,6 +1,6 @@ // // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // // SPDX-License-Identifier: Apache-2.0 -package main +package handler import ( "bytes" @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "go.uber.org/zap/zaptest" + "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/wrp-go/v3" ) @@ -79,7 +79,7 @@ func TestServerHandler(t *testing.T) { expectedResponse: http.StatusBadRequest, request: exampleRequest(1), throwStatusBadRequest: true, - expectedEventType: unknownEventType, + expectedEventType: metrics.UnknownEventType, startTime: date1, endTime: date2, }, @@ -88,7 +88,6 @@ func TestServerHandler(t *testing.T) { for _, tc := range tcs { assert := assert.New(t) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) if !tc.throwStatusBadRequest { fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), @@ -110,17 +109,18 @@ func TestServerHandler(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ errorRequests: fakeErrorRequests, emptyRequests: fakeEmptyRequests, invalidCount: fakeInvalidCount, incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, incomingQueueLatency: fakeHist, - now: fakeTime, + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, + now: fakeTime, } t.Run(tc.desc, func(t *testing.T) { w := httptest.NewRecorder() @@ -145,7 +145,6 @@ func TestServerHandlerFixWrp(t *testing.T) { assert := assert.New(t) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).Return().Once() @@ -162,7 +161,7 @@ func TestServerHandlerFixWrp(t *testing.T) { fakeIncomingContentTypeCount.On("Add", 1.0).Return() fakeModifiedWRPCount := new(mockCounter) - fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() + fakeModifiedWRPCount.On("With", []string{"reason", metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() fakeModifiedWRPCount.On("Add", 1.0).Return().Once() fakeHist := new(mockHistogram) @@ -170,18 +169,19 @@ func TestServerHandlerFixWrp(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ errorRequests: fakeErrorRequests, emptyRequests: fakeEmptyRequests, invalidCount: fakeInvalidCount, modifiedWRPCount: fakeModifiedWRPCount, incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, + now: mockTime(date1, date2), } t.Run("TestServeHTTPHappyPath", func(t *testing.T) { @@ -206,7 +206,6 @@ func TestServerHandlerFull(t *testing.T) { assert := assert.New(t) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) @@ -215,18 +214,19 @@ func TestServerHandlerFull(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, + now: mockTime(date1, date2), } t.Run("TestServeHTTPTooMany", func(t *testing.T) { @@ -259,7 +259,6 @@ func TestServerEmptyPayload(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) @@ -270,19 +269,20 @@ func TestServerEmptyPayload(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ emptyRequests: fakeEmptyRequests, incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, + now: mockTime(date1, date2), } t.Run("TestServeHTTPTooMany", func(t *testing.T) { @@ -314,7 +314,6 @@ func TestServerUnableToReadBody(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() @@ -325,19 +324,20 @@ func TestServerUnableToReadBody(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ errorRequests: fakeErrorRequests, incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, + now: mockTime(date1, date2), } t.Run("TestServeHTTPTooMany", func(t *testing.T) { @@ -368,7 +368,6 @@ func TestServerInvalidBody(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() @@ -380,19 +379,20 @@ func TestServerInvalidBody(t *testing.T) { fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ invalidCount: fakeInvalidCount, incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, incomingQueueLatency: fakeHist, - now: mockTime(date1, date2), + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, + now: mockTime(date1, date2), } t.Run("TestServeHTTPTooMany", func(t *testing.T) { @@ -415,21 +415,21 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) assert := assert.New(t) - logger := zaptest.NewLogger(t) fakeHandler := new(mockHandler) fakeQueueDepth := new(mockGauge) - - serverWrapper := &ServerHandler{ - log: logger, - caduceusHandler: fakeHandler, + fakeTel := &HandlerTelemetry{ incomingQueueDepthMetric: fakeQueueDepth, - maxOutstanding: 1, + } + serverWrapper := &ServerHandler{ + caduceusHandler: fakeHandler, + telemetry: fakeTel, + maxOutstanding: 1, } testCases := []struct { name string @@ -449,7 +449,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { fakeHist := new(mockHistogram) - serverWrapper.incomingQueueLatency = fakeHist + serverWrapper.telemetry.incomingQueueLatency = fakeHist serverWrapper.now = mockTime(date1, date2) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() diff --git a/primaryHandler.go b/internal/handler/primaryHandler.go similarity index 90% rename from primaryHandler.go rename to internal/handler/primaryHandler.go index 3c68c5bf..0de55d61 100644 --- a/primaryHandler.go +++ b/internal/handler/primaryHandler.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package handler import ( "bytes" @@ -22,6 +22,7 @@ import ( "github.com/xmidt-org/bascule/basculechecks" "github.com/xmidt-org/bascule/basculehelper" "github.com/xmidt-org/bascule/basculehttp" + "github.com/xmidt-org/caduceus/internal/logging" "github.com/xmidt-org/clortho" "github.com/xmidt-org/clortho/clorthozap" "github.com/xmidt-org/sallust" @@ -44,16 +45,6 @@ type CapabilityConfig struct { EndpointBuckets []string } -// JWTValidator provides a convenient way to define jwt validator through config files -type JWTValidator struct { - // Config is used to create the clortho Resolver & Refresher for JWT verification keys - Config clortho.Config `json:"config"` - - // Leeway is used to set the amount of time buffer should be given to JWT - // time values, such as nbf - Leeway bascule.Leeway -} - func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, sw *ServerHandler, router *mux.Router, prevVersionSupport bool) (*mux.Router, error) { auth, err := authenticationMiddleware(v, l) if err != nil { @@ -98,7 +89,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, logger.Debug("Created list of allowed basic auths", zap.Any("allowed", basicAllowed), zap.Any("config", basicAuth)) options := []basculehttp.COption{ - basculehttp.WithCLogger(getLogger), + basculehttp.WithCLogger(logging.GetLogger), // basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse), } if len(basicAllowed) > 0 { @@ -219,7 +210,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, } authEnforcer := basculehttp.NewEnforcer( - basculehttp.WithELogger(getLogger), + basculehttp.WithELogger(logging.GetLogger), basculehttp.WithRules("Basic", bascule.Validators{ basculechecks.AllowAll(), }), @@ -227,8 +218,8 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, // basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse), ) - authChain := alice.New(setLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later - authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later + authChain := alice.New(logging.SetLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later + authChainLegacy := alice.New(logging.SetLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later versionCompatibleAuth := alice.New(func(next http.Handler) http.Handler { return http.HandlerFunc(func(r http.ResponseWriter, req *http.Request) { diff --git a/basculeLogging.go b/internal/logging/basculeLogging.go similarity index 91% rename from basculeLogging.go rename to internal/logging/basculeLogging.go index d21dc17e..7a78d171 100644 --- a/basculeLogging.go +++ b/internal/logging/basculeLogging.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package logging import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/xmidt-org/candlelight" "github.com/xmidt-org/sallust" + "go.uber.org/zap" ) @@ -27,7 +28,7 @@ func sanitizeHeaders(headers http.Header) (filtered http.Header) { return } -func setLogger(logger *zap.Logger) func(delegate http.Handler) http.Handler { +func SetLogger(logger *zap.Logger) func(delegate http.Handler) http.Handler { return func(delegate http.Handler) http.Handler { return http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { @@ -40,7 +41,7 @@ func setLogger(logger *zap.Logger) func(delegate http.Handler) http.Handler { } } -func getLogger(ctx context.Context) *zap.Logger { +func GetLogger(ctx context.Context) *zap.Logger { logger := sallust.Get(ctx).With(zap.Time("ts", time.Now().UTC()), zap.Any("caller", zap.WithCaller(true))) return logger } diff --git a/basculeLogging_test.go b/internal/logging/basculeLogging_test.go similarity index 98% rename from basculeLogging_test.go rename to internal/logging/basculeLogging_test.go index 98c2583f..6f6ce657 100644 --- a/basculeLogging_test.go +++ b/internal/logging/basculeLogging_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package logging import ( "net/http" diff --git a/metrics.go b/internal/metrics/metrics.go similarity index 96% rename from metrics.go rename to internal/metrics/metrics.go index b8e5d8ed..c0719e2f 100644 --- a/metrics.go +++ b/internal/metrics/metrics.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package metrics import ( "github.com/prometheus/client_golang/prometheus" @@ -33,11 +33,11 @@ const ( ) const ( - emptyContentTypeReason = "empty_content_type" - emptyUUIDReason = "empty_uuid" - bothEmptyReason = "empty_uuid_and_content_type" - networkError = "network_err" - unknownEventType = "unknown" + EmptyContentTypeReason = "empty_content_type" + EmptyUUIDReason = "empty_uuid" + BothEmptyReason = "empty_uuid_and_content_type" + NetworkError = "network_err" + UnknownEventType = "unknown" ) const ( diff --git a/metrics_test.go b/internal/metrics/metrics_test.go similarity index 98% rename from metrics_test.go rename to internal/metrics/metrics_test.go index 8092f9f0..32da4ee3 100644 --- a/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package metrics // Using Caduceus's test suite: // diff --git a/mocks_test.go b/internal/mocks/mocks.go similarity index 63% rename from mocks_test.go rename to internal/mocks/mocks.go index 5a2c19b8..23ee1a03 100644 --- a/mocks_test.go +++ b/internal/mocks/mocks.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package mocks import ( "time" @@ -12,33 +12,33 @@ import ( ) // mockHandler only needs to mock the `HandleRequest` method -type mockHandler struct { +type MockHandler struct { mock.Mock } -func (m *mockHandler) HandleRequest(workerID int, msg *wrp.Message) { +func (m *MockHandler) HandleRequest(workerID int, msg *wrp.Message) { m.Called(workerID, msg) } // mockSenderWrapper needs to mock things that the `SenderWrapper` does -type mockSenderWrapper struct { +type MockSinkWrapper struct { mock.Mock } -// func (m *mockSenderWrapper) Update(list []ancla.InternalWebhook) { +// func (m *MockSinkWrapper) Update(list []ancla.InternalWebhook) { // m.Called(list) // } -func (m *mockSenderWrapper) Queue(msg *wrp.Message) { +func (m *MockSinkWrapper) Queue(msg *wrp.Message) { m.Called(msg) } -func (m *mockSenderWrapper) Shutdown(gentle bool) { +func (m *MockSinkWrapper) Shutdown(gentle bool) { m.Called(gentle) } // mockTime provides two mock time values -func mockTime(one, two time.Time) func() time.Time { +func MockTime(one, two time.Time) func() time.Time { var called bool return func() time.Time { if called { @@ -49,18 +49,18 @@ func mockTime(one, two time.Time) func() time.Time { } } -type mockCounter struct { +type MockCounter struct { mock.Mock } -func (m *mockCounter) Add(delta float64) { +func (m *MockCounter) Add(delta float64) { m.Called(delta) } -func (m *mockCounter) Inc (){ +func (m *MockCounter) Inc() { m.Called(1) } -func (m *mockCounter) With(labelValues ...string) prometheus.Counter { +func (m *MockCounter) With(labelValues ...string) prometheus.Counter { for _, v := range labelValues { if !utf8.ValidString(v) { panic("not UTF-8") diff --git a/listenerStub.go b/internal/sink/listenerStub.go similarity index 99% rename from listenerStub.go rename to internal/sink/listenerStub.go index bd7cb723..ff2cdffa 100644 --- a/listenerStub.go +++ b/internal/sink/listenerStub.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "time" diff --git a/matcher.go b/internal/sink/matcher.go similarity index 99% rename from matcher.go rename to internal/sink/matcher.go index 59fbf098..e453092c 100644 --- a/matcher.go +++ b/internal/sink/matcher.go @@ -1,4 +1,4 @@ -package main +package sink import ( "container/ring" diff --git a/sink.go b/internal/sink/sink.go similarity index 83% rename from sink.go rename to internal/sink/sink.go index 081afe2d..b5e408e3 100644 --- a/sink.go +++ b/internal/sink/sink.go @@ -1,4 +1,4 @@ -package main +package sink import ( "bytes" @@ -14,6 +14,8 @@ import ( "strings" "time" + "github.com/xmidt-org/caduceus/internal/client" + "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/retry" "github.com/xmidt-org/retry/retryhttp" "github.com/xmidt-org/wrp-go/v3" @@ -21,34 +23,37 @@ import ( "go.uber.org/zap" ) -type SinkI interface { +type Sink interface { Update(Listener) error Send(*ring.Ring, string, string, *wrp.Message) error } -type SinkWebhookV1 struct { +type WebhookV1 struct { id string deliveryInterval time.Duration deliveryRetries int logger *zap.Logger + //TODO: need to figure out best way to add client and middleware to webhook + client http.Client + clientMiddleware func(client.Client) client.Client } -func NewSinkWebhookV1(s *SinkSender) { - v1 := &SinkWebhookV1{ +func NewWebhookV1(s *SinkSender) { + v1 := &WebhookV1{ id: s.id, deliveryInterval: s.deliveryInterval, deliveryRetries: s.deliveryRetries, } s.sink = v1 } -func (v1 *SinkWebhookV1) Update(l Listener) (err error) { +func (v1 *WebhookV1) Update(l Listener) (err error) { v1.id = l.GetId() return nil } // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa -func (v1 *SinkWebhookV1) Send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) error { +func (v1 *WebhookV1) Send(urls *ring.Ring, secret, acceptType string, msg *wrp.Message) error { defer func() { if r := recover(); nil != r { // s.droppedPanic.Add(1.0) @@ -142,7 +147,7 @@ func (v1 *SinkWebhookV1) Send(urls *ring.Ring, secret, acceptType string, msg *w return nil } -func (v1 *SinkWebhookV1) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { +func (v1 *WebhookV1) addRunner(request *http.Request, event string) retry.Runner[*http.Response] { runner, _ := retry.NewRunner[*http.Response]( retry.WithPolicyFactory[*http.Response](retry.Config{ Interval: v1.deliveryInterval, @@ -153,19 +158,19 @@ func (v1 *SinkWebhookV1) addRunner(request *http.Request, event string) retry.Ru return runner } -func (v1 *SinkWebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { +func (v1 *WebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Request { return func(request *http.Request) *http.Request { urls = urls.Next() tmp, err := url.Parse(urls.Value.(string)) if err != nil { - v1.logger.Error("failed to update url", zap.String(UrlLabel, urls.Value.(string)), zap.Error(err)) + v1.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err)) } request.URL = tmp return request } } -func (v1 *SinkWebhookV1) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { +func (v1 *WebhookV1) onAttempt(request *http.Request, event string) retry.OnAttempt[*http.Response] { return func(attempt retry.Attempt[*http.Response]) { if attempt.Retries > 0 { diff --git a/caduceus_type.go b/internal/sink/sinkConfig.go similarity index 73% rename from caduceus_type.go rename to internal/sink/sinkConfig.go index e2806df5..3d905b9f 100644 --- a/caduceus_type.go +++ b/internal/sink/sinkConfig.go @@ -1,14 +1,6 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 -package main +package sink -import ( - "time" - - "go.uber.org/zap" - - "github.com/xmidt-org/wrp-go/v3" -) +import "time" // Below is the struct we're using to contain the data from a provided config file // TODO: Try to figure out how to make bucket ranges configurable @@ -54,17 +46,3 @@ type SinkConfig struct { // itself. IdleConnTimeout time.Duration } - -type RequestHandler interface { - HandleRequest(workerID int, msg *wrp.Message) -} - -type CaduceusHandler struct { - wrapper Wrapper - *zap.Logger -} - -func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { - ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID)) - ch.wrapper.Queue(msg) -} diff --git a/sinkSender.go b/internal/sink/sinkSender.go similarity index 85% rename from sinkSender.go rename to internal/sink/sinkSender.go index b635954b..c8d4c52e 100644 --- a/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "bytes" @@ -20,6 +20,8 @@ import ( "go.uber.org/zap" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/caduceus/internal/client" + "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/webpa-common/v2/semaphore" "github.com/xmidt-org/wrp-go/v3" ) @@ -63,12 +65,11 @@ type SinkSender struct { wg sync.WaitGroup workers semaphore.Interface logger *zap.Logger - sink SinkI - client http.Client - clientMiddleware func(Client) Client - failureMessage FailureMessage - listener Listener - matcher Matcher + sink Sink + // failureMessage is sent during a queue overflow. + failureMessage FailureMessage + listener Listener + matcher Matcher SinkMetrics } @@ -95,7 +96,7 @@ type SinkMetrics struct { func NewSinkSender(sw *SinkWrapper, l Listener) (sender *SinkSender, err error) { if sw.clientMiddleware == nil { - sw.clientMiddleware = nopClient + sw.clientMiddleware = client.NopClient } if sw.client == nil { err = errors.New("nil Client") @@ -132,7 +133,6 @@ func NewSinkSender(sw *SinkWrapper, l Listener) (sender *SinkSender, err error) }, customPIDs: sw.config.CustomPIDs, disablePartnerIDs: sw.config.DisablePartnerIDs, - clientMiddleware: sw.clientMiddleware, } sender.CreateMetrics(sw.metrics) @@ -163,7 +163,7 @@ func (s *SinkSender) Update(l Listener) (err error) { return } s.matcher = m - NewSinkWebhookV1(s) + NewWebhookV1(s) default: err = fmt.Errorf("invalid listner") } @@ -445,22 +445,22 @@ Loop: } } -func (s *SinkSender) CreateMetrics(metrics Metrics) { - s.deliveryRetryCounter = metrics.DeliveryRetryCounter - s.deliveryRetryMaxGauge = metrics.DeliveryRetryMaxGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.cutOffCounter = metrics.CutOffCounter.With(prometheus.Labels{UrlLabel: s.id}) - s.droppedQueueFullCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "queue_full"}) - s.droppedExpiredCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "expired"}) - s.droppedExpiredBeforeQueueCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "expired_before_queueing"}) - s.droppedCutoffCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "cut_off"}) - s.droppedInvalidConfig = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: "invalid_config"}) - s.droppedNetworkErrCounter = metrics.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{UrlLabel: s.id, ReasonLabel: networkError}) - s.droppedPanic = metrics.DropsDueToPanic.With(prometheus.Labels{UrlLabel: s.id}) - s.queueDepthGauge = metrics.OutgoingQueueDepth.With(prometheus.Labels{UrlLabel: s.id}) - s.renewalTimeGauge = metrics.ConsumerRenewalTimeGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.deliverUntilGauge = metrics.ConsumerDeliverUntilGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.dropUntilGauge = metrics.ConsumerDropUntilGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.currentWorkersGauge = metrics.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{UrlLabel: s.id}) - s.maxWorkersGauge = metrics.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{UrlLabel: s.id}) +func (s *SinkSender) CreateMetrics(m metrics.Metrics) { + s.deliveryRetryCounter = m.DeliveryRetryCounter + s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"}) + s.droppedExpiredCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}) + s.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}) + s.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}) + s.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}) + s.droppedNetworkErrCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) + s.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.deliverUntilGauge = m.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.dropUntilGauge = m.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.currentWorkersGauge = m.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) + s.maxWorkersGauge = m.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) } diff --git a/sinkSender_test.go b/internal/sink/sinkSender_test.go similarity index 99% rename from sinkSender_test.go rename to internal/sink/sinkSender_test.go index a45176c0..4bb9704d 100644 --- a/sinkSender_test.go +++ b/internal/sink/sinkSender_test.go @@ -1,6 +1,6 @@ // // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // // SPDX-License-Identifier: Apache-2.0 -package main +package sink // import ( // "bytes" diff --git a/sinkWrapper.go b/internal/sink/sinkWrapper.go similarity index 90% rename from sinkWrapper.go rename to internal/sink/sinkWrapper.go index 991cd6a9..d1b4fc06 100644 --- a/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "crypto/tls" @@ -11,6 +11,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/xmidt-org/caduceus/internal/client" + "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/candlelight" "github.com/xmidt-org/wrp-go/v3" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" @@ -24,7 +27,7 @@ type SinkWrapperIn struct { Tracing candlelight.Tracing SinkConfig SinkConfig - Metrics Metrics + Metrics metrics.Metrics EventType *prometheus.CounterVec Logger *zap.Logger } @@ -53,16 +56,16 @@ type SinkWrapper struct { eventType *prometheus.CounterVec wg sync.WaitGroup shutdown chan struct{} - metrics Metrics - client Client //TODO: keeping here for now - but might move to SinkSender in a later PR - clientMiddleware func(Client) Client //TODO: keeping here for now - but might move to SinkSender in a later PR + metrics metrics.Metrics + client client.Client //TODO: keeping here for now - but might move to SinkSender in a later PR + clientMiddleware func(client.Client) client.Client //TODO: keeping here for now - but might move to SinkSender in a later PR } func ProvideWrapper() fx.Option { return fx.Provide( - func(in MetricsIn) Metrics { - senderMetrics := Metrics{ + func(in metrics.MetricsIn) metrics.Metrics { + senderMetrics := metrics.Metrics{ DeliveryCounter: in.DeliveryCounter, DeliveryRetryCounter: in.DeliveryRetryCounter, DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, @@ -152,14 +155,14 @@ func (sw *SinkWrapper) Update(list []Listener) { var err error listener := inValue.Listener - metricWrapper, err := newMetricWrapper(time.Now, sw.metrics.QueryLatency, inValue.ID) + metricWrapper, err := client.NewMetricWrapper(time.Now, sw.metrics.QueryLatency, inValue.ID) if err != nil { continue } ss, err = NewSinkSender(sw, listener) - sw.clientMiddleware = metricWrapper.roundTripper + sw.clientMiddleware = metricWrapper.RoundTripper // { // ss, err = newSinkSender(sw, r1) diff --git a/sinkWrapper_test.go b/internal/sink/sinkWrapper_test.go similarity index 99% rename from sinkWrapper_test.go rename to internal/sink/sinkWrapper_test.go index 3b588744..5bceb2cf 100644 --- a/sinkWrapper_test.go +++ b/internal/sink/sinkWrapper_test.go @@ -1,6 +1,6 @@ // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package sink import ( "net/http" diff --git a/logger.go b/logger.go index d58804d7..6150c376 100644 --- a/logger.go +++ b/logger.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "github.com/xmidt-org/sallust" diff --git a/routes.go b/routes.go index 03369b81..cac5d23e 100644 --- a/routes.go +++ b/routes.go @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC // SPDX-License-Identifier: Apache-2.0 -package main +package caduceus import ( "fmt" @@ -10,6 +10,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/xmidt-org/arrange/arrangehttp" "github.com/xmidt-org/arrange/arrangepprof" + "github.com/xmidt-org/caduceus/internal/handler" "github.com/xmidt-org/candlelight" "github.com/xmidt-org/httpaux" "github.com/xmidt-org/httpaux/recovery" @@ -19,11 +20,18 @@ import ( "go.uber.org/fx" ) +const ( + apiVersion = "v4" + prevAPIVersion = "v3" + apiBase = "api/" + apiVersion + apiBaseDualVersion = "api/{version:" + apiVersion + "|" + prevAPIVersion + "}" +) + type RoutesIn struct { fx.In PrimaryMetrics touchhttp.ServerInstrumenter `name:"servers.primary.metrics"` AlternateMetrics touchhttp.ServerInstrumenter `name:"servers.alternate.metrics"` - Handler *ServerHandler + Handler *handler.ServerHandler Tracing candlelight.Tracing PreviousVersionSupport bool } @@ -35,7 +43,7 @@ type RoutesOut struct { } // The name should be 'primary' or 'alternate'. -func provideCoreEndpoints() fx.Option { +func ProvideCoreEndpoints() fx.Option { return fx.Provide( fx.Annotated{ Name: "servers.primary.metrics", @@ -88,7 +96,7 @@ func provideCoreOption(server string, in RoutesIn) arrangehttp.Option[http.Serve } -func provideHealthCheck() fx.Option { +func ProvideHealthCheck() fx.Option { return fx.Provide( fx.Annotated{ Name: "servers.health.metrics", @@ -114,7 +122,7 @@ func provideHealthCheck() fx.Option { ) } -func provideMetricEndpoint() fx.Option { +func ProvideMetricEndpoint() fx.Option { return fx.Provide( fx.Annotate( func(metrics touchhttp.Handler, path MetricsPath) arrangehttp.Option[http.Server] { @@ -131,7 +139,7 @@ func provideMetricEndpoint() fx.Option { ) } -func providePprofEndpoint() fx.Option { +func ProvidePprofEndpoint() fx.Option { return fx.Provide( fx.Annotate( func(pathPrefix PprofPathPrefix) arrangehttp.Option[http.Server] { From 548aaadd63e1c222c5ab267edbdcf44d850d623a Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 11 Apr 2024 11:22:06 -0400 Subject: [PATCH 2/3] made PR review edits --- caduceus.go | 34 +--- config.go | 2 +- internal/handler/caduceus_type.go | 24 --- internal/handler/caduceus_type_test.go | 30 --- internal/handler/http.go | 37 ++-- internal/handler/http_test.go | 241 +++++++++++++------------ internal/metrics/metrics.go | 2 +- internal/metrics/metrics_test.go | 2 +- internal/mocks/mocks.go | 43 +++-- internal/sink/matcher.go | 2 + internal/sink/sink.go | 4 +- internal/sink/sinkConfig.go | 4 +- internal/sink/sinkSender.go | 70 +++---- internal/sink/sinkSender_test.go | 4 +- internal/sink/sinkWrapper.go | 110 +++++------ routes.go | 8 +- 16 files changed, 294 insertions(+), 323 deletions(-) delete mode 100644 internal/handler/caduceus_type.go delete mode 100644 internal/handler/caduceus_type_test.go diff --git a/caduceus.go b/caduceus.go index b2ad2219..bdaa5431 100644 --- a/caduceus.go +++ b/caduceus.go @@ -5,7 +5,6 @@ package caduceus import ( "fmt" "os" - "runtime/debug" "github.com/alecthomas/kong" "github.com/goschtalt/goschtalt" @@ -78,7 +77,7 @@ func Caduceus(arguments []string, run bool) error { goschtalt.UnmarshalFunc[sallust.Config]("logging"), goschtalt.UnmarshalFunc[candlelight.Config]("tracing"), goschtalt.UnmarshalFunc[touchstone.Config]("prometheus"), - goschtalt.UnmarshalFunc[sink.SinkConfig]("sender"), + goschtalt.UnmarshalFunc[sink.Config]("sender"), goschtalt.UnmarshalFunc[Service]("service"), goschtalt.UnmarshalFunc[[]string]("authHeader"), goschtalt.UnmarshalFunc[bool]("previousVersionSupport"), @@ -128,10 +127,10 @@ func Caduceus(arguments []string, run bool) error { candlelight.New, ), - ProvidePprofEndpoint(), - ProvideMetricEndpoint(), - ProvideHealthCheck(), - ProvideCoreEndpoints(), + providePprofEndpoint(), + provideMetricEndpoint(), + provideHealthCheck(), + provideCoreEndpoints(), arrangehttp.ProvideServer("servers.health"), arrangehttp.ProvideServer("servers.metrics"), @@ -139,11 +138,11 @@ func Caduceus(arguments []string, run bool) error { arrangehttp.ProvideServer("servers.primary"), arrangehttp.ProvideServer("servers.alternate"), - handler.ProvideHandler(), - sink.ProvideWrapper(), + handler.Provide(), + sink.Provide(), touchstone.Provide(), touchhttp.Provide(), - metrics.ProvideMetrics(), + metrics.Provide(), // ancla.ProvideMetrics(), //TODO: need to add back in once we fix the ancla/argus dependency issue ) @@ -215,20 +214,3 @@ func provideCLIWithOpts(args cliArgs, testOpts bool) (*CLI, error) { return &cli, nil } - -func main() { - defer func() { - if r := recover(); r != nil { - fmt.Println("stacktrace from panic: \n" + string(debug.Stack())) - } - }() - - err := Caduceus(os.Args[1:], true) - - if err == nil { - return - } - - fmt.Fprintln(os.Stderr, err) - os.Exit(-1) -} diff --git a/config.go b/config.go index 6401f108..91ca4429 100644 --- a/config.go +++ b/config.go @@ -29,7 +29,7 @@ type Config struct { Servers Servers ArgusClientTimeout HttpClientTimeout JWTValidator JWTValidator - Sink sink.SinkConfig + SinkConfig sink.Config Service Service AuthHeader []string Server string diff --git a/internal/handler/caduceus_type.go b/internal/handler/caduceus_type.go deleted file mode 100644 index e570f6ea..00000000 --- a/internal/handler/caduceus_type.go +++ /dev/null @@ -1,24 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 -package handler - -import ( - "go.uber.org/zap" - - "github.com/xmidt-org/caduceus/internal/sink" - "github.com/xmidt-org/wrp-go/v3" -) - -type RequestHandler interface { - HandleRequest(workerID int, msg *wrp.Message) -} - -type CaduceusHandler struct { - wrapper sink.Wrapper - Logger *zap.Logger -} - -func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { - ch.Logger.Info("Worker received a request, now passing to sender", zap.Int("workerId", workerID)) - ch.wrapper.Queue(msg) -} diff --git a/internal/handler/caduceus_type_test.go b/internal/handler/caduceus_type_test.go deleted file mode 100644 index 8f6f7511..00000000 --- a/internal/handler/caduceus_type_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// SPDX-License-Identifier: Apache-2.0 -package handler - -import ( - "testing" - - "github.com/stretchr/testify/mock" - "github.com/xmidt-org/caduceus/internal/mocks" - "github.com/xmidt-org/wrp-go/v3" - "go.uber.org/zap/zaptest" -) - -func TestCaduceusHandler(t *testing.T) { - logger := zaptest.NewLogger(t) - - fakeSenderWrapper := new(mocks.MockSinkWrapper) - fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once() - - testHandler := CaduceusHandler{ - wrapper: fakeSenderWrapper, - Logger: logger, - } - - t.Run("TestHandleRequest", func(t *testing.T) { - testHandler.HandleRequest(0, &wrp.Message{}) - - fakeSenderWrapper.AssertExpectations(t) - }) -} diff --git a/internal/handler/http.go b/internal/handler/http.go index 43969345..a071464f 100644 --- a/internal/handler/http.go +++ b/internal/handler/http.go @@ -19,11 +19,14 @@ import ( "github.com/xmidt-org/wrp-go/v3" ) +type Handler interface { + ServeHTTP(http.ResponseWriter, *http.Request) +} type ServerHandlerIn struct { fx.In - SinkWrapper *sink.SinkWrapper + SinkWrapper sink.Wrapper Logger *zap.Logger - Telemetry *HandlerTelemetry + Telemetry *Telemetry } type ServerHandlerOut struct { @@ -33,13 +36,14 @@ type ServerHandlerOut struct { // Below is the struct that will implement our ServeHTTP method type ServerHandler struct { - caduceusHandler RequestHandler - telemetry *HandlerTelemetry + sinkWrapper sink.Wrapper + logger *zap.Logger + telemetry *Telemetry incomingQueueDepth int64 maxOutstanding int64 now func() time.Time } -type HandlerTelemetryIn struct { +type TelemetryIn struct { fx.In ErrorRequests prometheus.Counter `name:"error_request_body_count"` EmptyRequests prometheus.Counter `name:"empty_request_body_count"` @@ -48,7 +52,7 @@ type HandlerTelemetryIn struct { ModifiedWRPCount *prometheus.CounterVec `name:"modified_wrp_count"` IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"` } -type HandlerTelemetry struct { +type Telemetry struct { errorRequests prometheus.Counter emptyRequests prometheus.Counter invalidCount prometheus.Counter @@ -134,7 +138,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R } eventType = msg.FindEventStringSubMatch() - sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg)) + sh.handleRequest(sh.fixWrp(msg)) // return a 202 response.WriteHeader(http.StatusAccepted) @@ -143,6 +147,11 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R logger.Debug("event passed to senders.", zap.Any("event", msg)) } +func (sh *ServerHandler) handleRequest(msg *wrp.Message) { + sh.logger.Info("Worker received a request, now passing to sender") + sh.sinkWrapper.Queue(msg) +} + func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) @@ -176,10 +185,10 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { return msg } -func ProvideHandler() fx.Option { +func Provide() fx.Option { return fx.Provide( - func(in HandlerTelemetryIn) *HandlerTelemetry { - return &HandlerTelemetry{ + func(in TelemetryIn) *Telemetry { + return &Telemetry{ errorRequests: in.ErrorRequests, emptyRequests: in.EmptyRequests, invalidCount: in.InvalidCount, @@ -197,12 +206,10 @@ func ProvideHandler() fx.Option { }, ) } -func New(sw *sink.SinkWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { +func New(w sink.Wrapper, logger *zap.Logger, t *Telemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) { return &ServerHandler{ - caduceusHandler: &CaduceusHandler{ - wrapper: sw, - Logger: log, - }, + sinkWrapper: w, + logger: logger, telemetry: t, maxOutstanding: maxOutstanding, incomingQueueDepth: incomingQueueDepth, diff --git a/internal/handler/http_test.go b/internal/handler/http_test.go index 8703d012..d29e0cf1 100644 --- a/internal/handler/http_test.go +++ b/internal/handler/http_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/caduceus/internal/mocks" + "go.uber.org/zap/zaptest" "github.com/xmidt-org/wrp-go/v3" ) @@ -87,45 +89,47 @@ func TestServerHandler(t *testing.T) { for _, tc := range tcs { assert := assert.New(t) + logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) if !tc.throwStatusBadRequest { fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).Return().Times(1) } - fakeEmptyRequests := new(mockCounter) - fakeErrorRequests := new(mockCounter) - fakeInvalidCount := new(mockCounter) - fakeQueueDepth := new(mockGauge) + fakeEmptyRequests := new(mocks.Counter) + fakeErrorRequests := new(mocks.Counter) + fakeInvalidCount := new(mocks.Counter) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) if tc.throwStatusBadRequest { fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() } - fakeTime := mockTime(tc.startTime, tc.endTime) - fakeHist := new(mockHistogram) + fakeTime := mocks.Time(tc.startTime, tc.endTime) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", tc.expectedEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &HandlerTelemetry{ - errorRequests: fakeErrorRequests, - emptyRequests: fakeEmptyRequests, - invalidCount: fakeInvalidCount, - incomingQueueDepthMetric: fakeQueueDepth, - incomingQueueLatency: fakeHist, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, - now: fakeTime, + fakeTel := &mocks.Telemetry{ + ErrorRequests: fakeErrorRequests, + EmptyRequests: fakeEmptyRequests, + InvalidCount: fakeInvalidCount, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = fakeTime + t.Run(tc.desc, func(t *testing.T) { w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, tc.request) + fakeHandler.ServeHTTP(w, tc.request) resp := w.Result() assert.Equal(tc.expectedResponse, resp.StatusCode) @@ -144,50 +148,50 @@ func TestServerHandlerFixWrp(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).Return().Once() - fakeEmptyRequests := new(mockCounter) - fakeErrorRequests := new(mockCounter) - fakeInvalidCount := new(mockCounter) - fakeQueueDepth := new(mockGauge) + fakeEmptyRequests := new(mocks.Counter) + fakeErrorRequests := new(mocks.Counter) + fakeInvalidCount := new(mocks.Counter) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) - fakeIncomingContentTypeCount := new(mockCounter) + fakeIncomingContentTypeCount := new(mocks.Counter) fakeIncomingContentTypeCount.On("With", []string{"content_type", wrp.MimeTypeMsgpack}).Return(fakeIncomingContentTypeCount) fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) fakeIncomingContentTypeCount.On("Add", 1.0).Return() - fakeModifiedWRPCount := new(mockCounter) + fakeModifiedWRPCount := new(mocks.Counter) fakeModifiedWRPCount.On("With", []string{"reason", metrics.BothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() fakeModifiedWRPCount.On("Add", 1.0).Return().Once() - fakeHist := new(mockHistogram) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", "bob"} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &HandlerTelemetry{ - errorRequests: fakeErrorRequests, - emptyRequests: fakeEmptyRequests, - invalidCount: fakeInvalidCount, - modifiedWRPCount: fakeModifiedWRPCount, - incomingQueueDepthMetric: fakeQueueDepth, - incomingQueueLatency: fakeHist, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, - now: mockTime(date1, date2), + fakeTel := &mocks.Telemetry{ + ErrorRequests: fakeErrorRequests, + EmptyRequests: fakeEmptyRequests, + InvalidCount: fakeInvalidCount, + ModifiedWRPCount: fakeModifiedWRPCount, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) t.Run("TestServeHTTPHappyPath", func(t *testing.T) { w := httptest.NewRecorder() - serverWrapper.ServeHTTP(w, exampleRequest(4, "", "")) + fakeHandler.ServeHTTP(w, exampleRequest(4, "", "")) resp := w.Result() assert.Equal(http.StatusAccepted, resp.StatusCode) @@ -205,38 +209,38 @@ func TestServerHandlerFull(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeHist := new(mockHistogram) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &HandlerTelemetry{ - incomingQueueDepthMetric: fakeQueueDepth, - incomingQueueLatency: fakeHist, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, - now: mockTime(date1, date2), + fakeTel := &mocks.Telemetry{ + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Act like we have 1 in flight */ - serverWrapper.incomingQueueDepth = 1 + fakeHandler.IncomingQueueDepth = 1 /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, exampleRequest(4)) + fakeHandler.ServeHTTP(w, exampleRequest(4)) resp := w.Result() assert.Equal(http.StatusServiceUnavailable, resp.StatusCode) @@ -253,43 +257,43 @@ func TestServerEmptyPayload(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) var buffer bytes.Buffer r := bytes.NewReader(buffer.Bytes()) req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - fakeHandler := new(mockHandler) - fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), - mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) + fakeHandler := new(mocks.Handler) + fakeHandler.On("handleRequest", mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Times(2) - fakeEmptyRequests := new(mockCounter) + fakeEmptyRequests := new(mocks.Counter) fakeEmptyRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeHist := new(mockHistogram) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &HandlerTelemetry{ - emptyRequests: fakeEmptyRequests, - incomingQueueDepthMetric: fakeQueueDepth, - incomingQueueLatency: fakeHist, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, - now: mockTime(date1, date2), + fakeTel := &mocks.Telemetry{ + EmptyRequests: fakeEmptyRequests, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusBadRequest, resp.StatusCode) @@ -306,6 +310,7 @@ func TestServerUnableToReadBody(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) var buffer bytes.Buffer r := iotest.TimeoutReader(bytes.NewReader(buffer.Bytes())) @@ -314,37 +319,37 @@ func TestServerUnableToReadBody(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - fakeErrorRequests := new(mockCounter) + fakeErrorRequests := new(mocks.Counter) fakeErrorRequests.On("Add", mock.AnythingOfType("float64")).Return().Once() - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeHist := new(mockHistogram) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &HandlerTelemetry{ - errorRequests: fakeErrorRequests, - incomingQueueDepthMetric: fakeQueueDepth, - incomingQueueLatency: fakeHist, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, - now: mockTime(date1, date2), + fakeTel := &mocks.Telemetry{ + ErrorRequests: fakeErrorRequests, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusBadRequest, resp.StatusCode) @@ -361,6 +366,7 @@ func TestServerInvalidBody(t *testing.T) { date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) assert := assert.New(t) + logger := zaptest.NewLogger(t) r := bytes.NewReader([]byte("Invalid payload.")) @@ -368,38 +374,38 @@ func TestServerInvalidBody(t *testing.T) { req := httptest.NewRequest("POST", "localhost:8080", r) req.Header.Set("Content-Type", wrp.MimeTypeMsgpack) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) fakeHandler.On("HandleRequest", mock.AnythingOfType("int"), mock.AnythingOfType("*wrp.Message")).WaitUntil(time.After(time.Second)).Once() - fakeQueueDepth := new(mockGauge) + fakeQueueDepth := new(mocks.Gauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) - fakeInvalidCount := new(mockCounter) + fakeInvalidCount := new(mocks.Counter) fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() - fakeHist := new(mockHistogram) + fakeHist := new(mocks.Histogram) histogramFunctionCall := []string{"event", metrics.UnknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &HandlerTelemetry{ - invalidCount: fakeInvalidCount, - incomingQueueDepthMetric: fakeQueueDepth, - incomingQueueLatency: fakeHist, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, - now: mockTime(date1, date2), + fakeTel := &mocks.Telemetry{ + InvalidCount: fakeInvalidCount, + IncomingQueueDepthMetric: fakeQueueDepth, + IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + t.Run("TestServeHTTPTooMany", func(t *testing.T) { w := httptest.NewRecorder() /* Make the call that goes over the limit */ - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusBadRequest, resp.StatusCode) @@ -419,18 +425,21 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { fakeLatency := date2.Sub(date1) assert := assert.New(t) + logger := zaptest.NewLogger(t) - fakeHandler := new(mockHandler) + fakeHandler := new(mocks.Handler) - fakeQueueDepth := new(mockGauge) - fakeTel := &HandlerTelemetry{ - incomingQueueDepthMetric: fakeQueueDepth, - } - serverWrapper := &ServerHandler{ - caduceusHandler: fakeHandler, - telemetry: fakeTel, - maxOutstanding: 1, + fakeQueueDepth := new(mocks.Gauge) + fakeTel := &mocks.Telemetry{ + IncomingQueueDepthMetric: fakeQueueDepth, } + + fakeHandler.SinkWrapper = new(mocks.Wrapper) + fakeHandler.Logger = logger + fakeHandler.Telemetry = fakeTel + fakeHandler.MaxOutstanding = 1 + fakeHandler.Now = mocks.Time(date1, date2) + testCases := []struct { name string headers []string @@ -448,9 +457,9 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - fakeHist := new(mockHistogram) - serverWrapper.telemetry.incomingQueueLatency = fakeHist - serverWrapper.now = mockTime(date1, date2) + fakeHist := new(mocks.Histogram) + fakeHandler.Telemetry.IncomingQueueLatency = fakeHist + fakeHandler.Now = mocks.Time(date1, date2) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -460,7 +469,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { for _, h := range testCase.headers { req.Header.Add("Content-Type", h) } - serverWrapper.ServeHTTP(w, req) + fakeHandler.ServeHTTP(w, req) resp := w.Result() assert.Equal(http.StatusUnsupportedMediaType, resp.StatusCode) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index c0719e2f..353c415e 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -84,7 +84,7 @@ type Metrics struct { } // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called -func ProvideMetrics() fx.Option { +func Provide() fx.Option { return fx.Options( touchstone.Gauge(prometheus.GaugeOpts{ Name: IncomingQueueDepth, diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 32da4ee3..6dd46e9a 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -28,7 +28,7 @@ import ( func TestMetrics(t *testing.T) { assert := assert.New(t) - m := ProvideMetrics() + m := Provide() assert.NotNil(m) } diff --git a/internal/mocks/mocks.go b/internal/mocks/mocks.go index 23ee1a03..54a825b2 100644 --- a/internal/mocks/mocks.go +++ b/internal/mocks/mocks.go @@ -3,25 +3,35 @@ package mocks import ( + "net/http" "time" "unicode/utf8" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" + "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/wrp-go/v3" + "go.uber.org/zap" ) // mockHandler only needs to mock the `HandleRequest` method -type MockHandler struct { +type Handler struct { mock.Mock + + SinkWrapper sink.Wrapper + Logger *zap.Logger + Telemetry *Telemetry + IncomingQueueDepth int64 + MaxOutstanding int64 + Now func() time.Time } -func (m *MockHandler) HandleRequest(workerID int, msg *wrp.Message) { - m.Called(workerID, msg) +func (m *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + m.Called(r) } // mockSenderWrapper needs to mock things that the `SenderWrapper` does -type MockSinkWrapper struct { +type Wrapper struct { mock.Mock } @@ -29,16 +39,16 @@ type MockSinkWrapper struct { // m.Called(list) // } -func (m *MockSinkWrapper) Queue(msg *wrp.Message) { +func (m *Wrapper) Queue(msg *wrp.Message) { m.Called(msg) } -func (m *MockSinkWrapper) Shutdown(gentle bool) { +func (m *Wrapper) Shutdown(gentle bool) { m.Called(gentle) } // mockTime provides two mock time values -func MockTime(one, two time.Time) func() time.Time { +func Time(one, two time.Time) func() time.Time { var called bool return func() time.Time { if called { @@ -49,18 +59,18 @@ func MockTime(one, two time.Time) func() time.Time { } } -type MockCounter struct { +type Counter struct { mock.Mock } -func (m *MockCounter) Add(delta float64) { +func (m *Counter) Add(delta float64) { m.Called(delta) } -func (m *MockCounter) Inc() { +func (m *Counter) Inc() { m.Called(1) } -func (m *MockCounter) With(labelValues ...string) prometheus.Counter { +func (m *Counter) With(labelValues ...string) prometheus.Counter { for _, v := range labelValues { if !utf8.ValidString(v) { panic("not UTF-8") @@ -69,3 +79,14 @@ func (m *MockCounter) With(labelValues ...string) prometheus.Counter { args := m.Called(labelValues) return args.Get(0).(prometheus.Counter) } + +type Telemetry struct { + mock.Mock + + ErrorRequests prometheus.Counter + EmptyRequests prometheus.Counter + InvalidCount prometheus.Counter + IncomingQueueDepthMetric prometheus.Gauge + ModifiedWRPCount *prometheus.CounterVec + IncomingQueueLatency prometheus.ObserverVec +} diff --git a/internal/sink/matcher.go b/internal/sink/matcher.go index e453092c..15d15133 100644 --- a/internal/sink/matcher.go +++ b/internal/sink/matcher.go @@ -1,3 +1,5 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 package sink import ( diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 63e11eff..6f9128b9 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -1,3 +1,5 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 package sink import ( @@ -37,7 +39,7 @@ type WebhookV1 struct { clientMiddleware func(http.Client) http.Client } -func NewWebhookV1(s *SinkSender) { +func NewWebhookV1(s *sender) { v1 := &WebhookV1{ id: s.id, deliveryInterval: s.deliveryInterval, diff --git a/internal/sink/sinkConfig.go b/internal/sink/sinkConfig.go index 3d905b9f..6c009a0b 100644 --- a/internal/sink/sinkConfig.go +++ b/internal/sink/sinkConfig.go @@ -1,10 +1,12 @@ +// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 package sink import "time" // Below is the struct we're using to contain the data from a provided config file // TODO: Try to figure out how to make bucket ranges configurable -type SinkConfig struct { +type Config struct { // The number of workers to assign to each SinkSender created. NumWorkersPerSender int diff --git a/internal/sink/sinkSender.go b/internal/sink/sinkSender.go index c8d4c52e..e7e91668 100644 --- a/internal/sink/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -49,7 +49,7 @@ type Sender interface { RetiredSince() time.Time Queue(*wrp.Message) } -type SinkSender struct { +type sender struct { id string queueSize int deliveryRetries int @@ -93,69 +93,69 @@ type SinkMetrics struct { currentWorkersGauge prometheus.Gauge } -func NewSinkSender(sw *SinkWrapper, l Listener) (sender *SinkSender, err error) { +func NewSender(w *wrapper, l Listener) (s *sender, err error) { - if sw.clientMiddleware == nil { - sw.clientMiddleware = client.NopClient + if w.clientMiddleware == nil { + w.clientMiddleware = client.NopClient } - if sw.client == nil { + if w.client == nil { err = errors.New("nil Client") return } - if sw.config.CutOffPeriod.Nanoseconds() == 0 { + if w.config.CutOffPeriod.Nanoseconds() == 0 { err = errors.New("invalid CutOffPeriod") return } - if sw.logger == nil { + if w.logger == nil { err = errors.New("logger required") return } id := l.GetId() - sender = &SinkSender{ + s = &sender{ id: id, listener: l, - queueSize: sw.config.QueueSizePerSender, + queueSize: w.config.QueueSizePerSender, deliverUntil: l.GetUntil(), // dropUntil: where is this being set in old caduceus?, - cutOffPeriod: sw.config.CutOffPeriod, - deliveryRetries: sw.config.DeliveryRetries, - deliveryInterval: sw.config.DeliveryInterval, - maxWorkers: sw.config.NumWorkersPerSender, + cutOffPeriod: w.config.CutOffPeriod, + deliveryRetries: w.config.DeliveryRetries, + deliveryInterval: w.config.DeliveryInterval, + maxWorkers: w.config.NumWorkersPerSender, failureMessage: FailureMessage{ Original: l, Text: failureText, - CutOffPeriod: sw.config.CutOffPeriod.String(), - QueueSize: sw.config.QueueSizePerSender, - Workers: sw.config.NumWorkersPerSender, + CutOffPeriod: w.config.CutOffPeriod.String(), + QueueSize: w.config.QueueSizePerSender, + Workers: w.config.NumWorkersPerSender, }, - customPIDs: sw.config.CustomPIDs, - disablePartnerIDs: sw.config.DisablePartnerIDs, + customPIDs: w.config.CustomPIDs, + disablePartnerIDs: w.config.DisablePartnerIDs, } - sender.CreateMetrics(sw.metrics) - sender.queueDepthGauge.Set(0) - sender.currentWorkersGauge.Set(0) + s.CreateMetrics(w.metrics) + s.queueDepthGauge.Set(0) + s.currentWorkersGauge.Set(0) //TODO: need to figure out how to set this up // Don't share the secret with others when there is an error. // sinkSender.failureMsg.Original.Webhook.Config.Secret = "XxxxxX" - sender.queue.Store(make(chan *wrp.Message, sw.config.QueueSizePerSender)) + s.queue.Store(make(chan *wrp.Message, w.config.QueueSizePerSender)) - if err = sender.Update(l); nil != err { + if err = s.Update(l); nil != err { return } - sender.workers = semaphore.New(sender.maxWorkers) - sender.wg.Add(1) - go sender.dispatcher() + s.workers = semaphore.New(s.maxWorkers) + s.wg.Add(1) + go s.dispatcher() return } -func (s *SinkSender) Update(l Listener) (err error) { +func (s *sender) Update(l Listener) (err error) { switch v := l.(type) { case *ListenerV1: m := &MatcherV1{} @@ -191,7 +191,7 @@ func (s *SinkSender) Update(l Listener) (err error) { // of messages to deliver. The request is checked to see if it matches the // criteria before being accepted or silently dropped. // TODO: can pass in message along with webhook information -func (s *SinkSender) Queue(msg *wrp.Message) { +func (s *sender) Queue(msg *wrp.Message) { s.mutex.RLock() deliverUntil := s.deliverUntil dropUntil := s.dropUntil @@ -233,7 +233,7 @@ func (s *SinkSender) Queue(msg *wrp.Message) { // Shutdown causes the CaduceusOutboundSender to stop its activities either gently or // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. -func (s *SinkSender) Shutdown(gentle bool) { +func (s *sender) Shutdown(gentle bool) { if !gentle { // need to close the channel we're going to replace, in case it doesn't // have any events in it. @@ -252,7 +252,7 @@ func (s *SinkSender) Shutdown(gentle bool) { // RetiredSince returns the time the CaduceusOutboundSender retired (which could be in // the future). -func (s *SinkSender) RetiredSince() time.Time { +func (s *sender) RetiredSince() time.Time { s.mutex.RLock() deliverUntil := s.deliverUntil s.mutex.RUnlock() @@ -270,7 +270,7 @@ func overlaps(sl1 []string, sl2 []string) bool { return false } -func (s *SinkSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { +func (s *sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { if !now.After(dropUntil) { // client was cut off s.droppedCutoffCounter.Add(1.0) @@ -290,7 +290,7 @@ func (s *SinkSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) b // a fresh one, counting any current messages in the queue as dropped. // It should never close a queue, as a queue not referenced anywhere will be // cleaned up by the garbage collector without needing to be closed. -func (s *SinkSender) Empty(droppedCounter prometheus.Counter) { +func (s *sender) Empty(droppedCounter prometheus.Counter) { droppedMsgs := s.queue.Load().(chan *wrp.Message) s.queue.Store(make(chan *wrp.Message, s.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) @@ -300,7 +300,7 @@ func (s *SinkSender) Empty(droppedCounter prometheus.Counter) { // queueOverflow handles the logic of what to do when a queue overflows: // cutting off the webhook for a time and sending a cut off notification // to the failure URL. -func (s *SinkSender) queueOverflow() { +func (s *sender) queueOverflow() { s.mutex.Lock() if time.Now().Before(s.dropUntil) { s.mutex.Unlock() @@ -375,7 +375,7 @@ func (s *SinkSender) queueOverflow() { } } -func (s *SinkSender) dispatcher() { +func (s *sender) dispatcher() { defer s.wg.Done() var ( msg *wrp.Message @@ -445,7 +445,7 @@ Loop: } } -func (s *SinkSender) CreateMetrics(m metrics.Metrics) { +func (s *sender) CreateMetrics(m metrics.Metrics) { s.deliveryRetryCounter = m.DeliveryRetryCounter s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}) diff --git a/internal/sink/sinkSender_test.go b/internal/sink/sinkSender_test.go index 4bb9704d..7dd524f2 100644 --- a/internal/sink/sinkSender_test.go +++ b/internal/sink/sinkSender_test.go @@ -1,5 +1,5 @@ -// // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC -// // SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC +// SPDX-License-Identifier: Apache-2.0 package sink // import ( diff --git a/internal/sink/sinkWrapper.go b/internal/sink/sinkWrapper.go index d1b4fc06..37fd1419 100644 --- a/internal/sink/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -22,14 +22,14 @@ import ( ) // WrapperIn configures the Wrapper for creation -type SinkWrapperIn struct { +type WrapperIn struct { fx.In - Tracing candlelight.Tracing - SinkConfig SinkConfig - Metrics metrics.Metrics - EventType *prometheus.CounterVec - Logger *zap.Logger + Tracing candlelight.Tracing + Config Config + Metrics metrics.Metrics + EventType *prometheus.CounterVec + Logger *zap.Logger } // SinkWrapper interface is needed for unit testing. @@ -40,7 +40,7 @@ type Wrapper interface { } // Wrapper contains the configuration that will be shared with each outbound sender. It contains no external parameters. -type SinkWrapper struct { +type wrapper struct { // The amount of time to let expired SinkSenders linger before // shutting them down and cleaning up the resources associated with them. linger time.Duration @@ -49,7 +49,7 @@ type SinkWrapper struct { logger *zap.Logger //the configuration needed for eash sinkSender - config SinkConfig + config Config mutex *sync.RWMutex senders map[string]Sender @@ -62,7 +62,7 @@ type SinkWrapper struct { } -func ProvideWrapper() fx.Option { +func Provide() fx.Option { return fx.Provide( func(in metrics.MetricsIn) metrics.Metrics { senderMetrics := metrics.Metrics{ @@ -82,39 +82,39 @@ func ProvideWrapper() fx.Option { } return senderMetrics }, - func(in SinkWrapperIn) (*SinkWrapper, error) { - csw, err := NewSinkWrapper(in) - return csw, err + func(in WrapperIn) (Wrapper, error) { + w, err := NewWrapper(in) + return w, err }, ) } -func NewSinkWrapper(in SinkWrapperIn) (sw *SinkWrapper, err error) { - sw = &SinkWrapper{ - linger: in.SinkConfig.Linger, +func NewWrapper(in WrapperIn) (wr Wrapper, err error) { + w := &wrapper{ + linger: in.Config.Linger, logger: in.Logger, eventType: in.EventType, - config: in.SinkConfig, + config: in.Config, metrics: in.Metrics, } - if in.SinkConfig.Linger <= 0 { - linger := fmt.Sprintf("linger not positive: %v", in.SinkConfig.Linger) + if in.Config.Linger <= 0 { + linger := fmt.Sprintf("linger not positive: %v", in.Config.Linger) err = errors.New(linger) - sw = nil + w = nil return } - sw.senders = make(map[string]Sender) - sw.shutdown = make(chan struct{}) + w.senders = make(map[string]Sender) + w.shutdown = make(chan struct{}) - sw.wg.Add(1) - go undertaker(sw) + w.wg.Add(1) + go undertaker(w) return } // no longer being initialized at start up - needs to be initialized by the creation of the outbound sender -func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.RoundTripper) { +func NewRoundTripper(config Config, tracing candlelight.Tracing) (tr http.RoundTripper) { tr = &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.DisableClientHostnameValidation}, MaxIdleConnsPerHost: config.NumWorkersPerSender, @@ -133,7 +133,7 @@ func NewRoundTripper(config SinkConfig, tracing candlelight.Tracing) (tr http.Ro // Update is called when we get changes to our webhook listeners with either // additions, or updates. This code takes care of building new OutboundSenders // and maintaining the existing OutboundSenders. -func (sw *SinkWrapper) Update(list []Listener) { +func (w *wrapper) Update(list []Listener) { ids := make([]struct { Listener Listener @@ -145,31 +145,31 @@ func (sw *SinkWrapper) Update(list []Listener) { ids[i].ID = v.GetId() } - sw.mutex.Lock() - defer sw.mutex.Unlock() + w.mutex.Lock() + defer w.mutex.Unlock() for _, inValue := range ids { - sender, ok := sw.senders[inValue.ID] + sender, ok := w.senders[inValue.ID] if !ok { var ss Sender var err error listener := inValue.Listener - metricWrapper, err := client.NewMetricWrapper(time.Now, sw.metrics.QueryLatency, inValue.ID) + metricWrapper, err := client.NewMetricWrapper(time.Now, w.metrics.QueryLatency, inValue.ID) if err != nil { continue } - ss, err = NewSinkSender(sw, listener) - sw.clientMiddleware = metricWrapper.RoundTripper + ss, err = NewSender(w, listener) + w.clientMiddleware = metricWrapper.RoundTripper // { // ss, err = newSinkSender(sw, r1) // } if err == nil { - sw.senders[inValue.ID] = ss + w.senders[inValue.ID] = ss } continue } @@ -180,13 +180,13 @@ func (sw *SinkWrapper) Update(list []Listener) { // Queue is used to send all the possible outbound senders a request. This // function performs the fan-out and filtering to multiple possible endpoints. -func (sw *SinkWrapper) Queue(msg *wrp.Message) { - sw.mutex.RLock() - defer sw.mutex.RUnlock() +func (w *wrapper) Queue(msg *wrp.Message) { + w.mutex.RLock() + defer w.mutex.RUnlock() - sw.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) + w.eventType.With(prometheus.Labels{"event": msg.FindEventStringSubMatch()}).Add(1) - for _, v := range sw.senders { + for _, v := range w.senders { v.Queue(msg) } } @@ -194,57 +194,57 @@ func (sw *SinkWrapper) Queue(msg *wrp.Message) { // Shutdown closes down the delivery mechanisms and cleans up the underlying // OutboundSenders either gently (waiting for delivery queues to empty) or not // (dropping enqueued messages) -func (sw *SinkWrapper) Shutdown(gentle bool) { - sw.mutex.Lock() - defer sw.mutex.Unlock() - for k, v := range sw.senders { +func (w *wrapper) Shutdown(gentle bool) { + w.mutex.Lock() + defer w.mutex.Unlock() + for k, v := range w.senders { v.Shutdown(gentle) - delete(sw.senders, k) + delete(w.senders, k) } - close(sw.shutdown) + close(w.shutdown) } // undertaker looks at the OutboundSenders periodically and prunes the ones // that have been retired for too long, freeing up resources. -func undertaker(sw *SinkWrapper) { - defer sw.wg.Done() +func undertaker(w *wrapper) { + defer w.wg.Done() // Collecting unused OutboundSenders isn't a huge priority, so do it // slowly. - ticker := time.NewTicker(2 * sw.linger) + ticker := time.NewTicker(2 * w.linger) for { select { case <-ticker.C: - threshold := time.Now().Add(-1 * sw.linger) + threshold := time.Now().Add(-1 * w.linger) // Actually shutting these down could take longer then we // want to lock the mutex, so just remove them from the active // list & shut them down afterwards. - deadList := createDeadlist(sw, threshold) + deadList := createDeadlist(w, threshold) // Shut them down for _, v := range deadList { v.Shutdown(false) } - case <-sw.shutdown: + case <-w.shutdown: ticker.Stop() return } } } -func createDeadlist(sw *SinkWrapper, threshold time.Time) map[string]Sender { - if sw == nil || threshold.IsZero() { +func createDeadlist(w *wrapper, threshold time.Time) map[string]Sender { + if w == nil || threshold.IsZero() { return nil } deadList := make(map[string]Sender) - sw.mutex.Lock() - defer sw.mutex.Unlock() - for k, v := range sw.senders { + w.mutex.Lock() + defer w.mutex.Unlock() + for k, v := range w.senders { retired := v.RetiredSince() if threshold.After(retired) { deadList[k] = v - delete(sw.senders, k) + delete(w.senders, k) } } return deadList diff --git a/routes.go b/routes.go index cac5d23e..1159d7ab 100644 --- a/routes.go +++ b/routes.go @@ -43,7 +43,7 @@ type RoutesOut struct { } // The name should be 'primary' or 'alternate'. -func ProvideCoreEndpoints() fx.Option { +func provideCoreEndpoints() fx.Option { return fx.Provide( fx.Annotated{ Name: "servers.primary.metrics", @@ -96,7 +96,7 @@ func provideCoreOption(server string, in RoutesIn) arrangehttp.Option[http.Serve } -func ProvideHealthCheck() fx.Option { +func provideHealthCheck() fx.Option { return fx.Provide( fx.Annotated{ Name: "servers.health.metrics", @@ -122,7 +122,7 @@ func ProvideHealthCheck() fx.Option { ) } -func ProvideMetricEndpoint() fx.Option { +func provideMetricEndpoint() fx.Option { return fx.Provide( fx.Annotate( func(metrics touchhttp.Handler, path MetricsPath) arrangehttp.Option[http.Server] { @@ -139,7 +139,7 @@ func ProvideMetricEndpoint() fx.Option { ) } -func ProvidePprofEndpoint() fx.Option { +func providePprofEndpoint() fx.Option { return fx.Provide( fx.Annotate( func(pathPrefix PprofPathPrefix) arrangehttp.Option[http.Server] { From 828166a0a57a0fb6b230400bff5f795efa079db8 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Thu, 11 Apr 2024 15:13:00 -0400 Subject: [PATCH 3/3] package-reorg --- internal/handler/http.go | 40 +++++++++++++++++------------------ internal/handler/http_test.go | 20 ++++++++++-------- internal/mocks/mocks.go | 14 ++---------- 3 files changed, 33 insertions(+), 41 deletions(-) diff --git a/internal/handler/http.go b/internal/handler/http.go index a071464f..13621c9e 100644 --- a/internal/handler/http.go +++ b/internal/handler/http.go @@ -53,12 +53,12 @@ type TelemetryIn struct { IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"` } type Telemetry struct { - errorRequests prometheus.Counter - emptyRequests prometheus.Counter - invalidCount prometheus.Counter - incomingQueueDepthMetric prometheus.Gauge - modifiedWRPCount *prometheus.CounterVec - incomingQueueLatency prometheus.ObserverVec + ErrorRequests prometheus.Counter + EmptyRequests prometheus.Counter + InvalidCount prometheus.Counter + IncomingQueueDepthMetric prometheus.Gauge + ModifiedWRPCount *prometheus.CounterVec + IncomingQueueLatency prometheus.ObserverVec } func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { @@ -90,19 +90,19 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R return } - sh.telemetry.incomingQueueDepthMetric.Add(1.0) - defer sh.telemetry.incomingQueueDepthMetric.Add(-1.0) + sh.telemetry.IncomingQueueDepthMetric.Add(1.0) + defer sh.telemetry.IncomingQueueDepthMetric.Add(-1.0) payload, err := io.ReadAll(request.Body) if err != nil { - sh.telemetry.errorRequests.Add(1.0) + sh.telemetry.ErrorRequests.Add(1.0) logger.Error("Unable to retrieve the request body.", zap.Error(err)) response.WriteHeader(http.StatusBadRequest) return } if len(payload) == 0 { - sh.telemetry.emptyRequests.Add(1.0) + sh.telemetry.EmptyRequests.Add(1.0) logger.Error("Empty payload.") response.WriteHeader(http.StatusBadRequest) response.Write([]byte("Empty payload.\n")) @@ -115,7 +115,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R err = decoder.Decode(msg) if err != nil || msg.MessageType() != 4 { // return a 400 - sh.telemetry.invalidCount.Add(1.0) + sh.telemetry.InvalidCount.Add(1.0) response.WriteHeader(http.StatusBadRequest) if err != nil { response.Write([]byte("Invalid payload format.\n")) @@ -130,7 +130,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R err = wrp.UTF8(msg) if err != nil { // return a 400 - sh.telemetry.invalidCount.Add(1.0) + sh.telemetry.InvalidCount.Add(1.0) response.WriteHeader(http.StatusBadRequest) response.Write([]byte("Strings must be UTF-8.\n")) logger.Debug("Strings must be UTF-8.") @@ -154,7 +154,7 @@ func (sh *ServerHandler) handleRequest(msg *wrp.Message) { func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) + sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { @@ -179,7 +179,7 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { } if reason != "" { - sh.telemetry.modifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0) + sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0) } return msg @@ -189,12 +189,12 @@ func Provide() fx.Option { return fx.Provide( func(in TelemetryIn) *Telemetry { return &Telemetry{ - errorRequests: in.ErrorRequests, - emptyRequests: in.EmptyRequests, - invalidCount: in.InvalidCount, - incomingQueueDepthMetric: in.IncomingQueueDepthMetric, - modifiedWRPCount: in.ModifiedWRPCount, - incomingQueueLatency: in.IncomingQueueLatency, + ErrorRequests: in.ErrorRequests, + EmptyRequests: in.EmptyRequests, + InvalidCount: in.InvalidCount, + IncomingQueueDepthMetric: in.IncomingQueueDepthMetric, + ModifiedWRPCount: in.ModifiedWRPCount, + IncomingQueueLatency: in.IncomingQueueLatency, } }, func(in ServerHandlerIn) (ServerHandlerOut, error) { diff --git a/internal/handler/http_test.go b/internal/handler/http_test.go index d29e0cf1..901db7ee 100644 --- a/internal/handler/http_test.go +++ b/internal/handler/http_test.go @@ -1,6 +1,6 @@ // // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC // // SPDX-License-Identifier: Apache-2.0 -package handler +package handler_test import ( "bytes" @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/xmidt-org/caduceus/internal/handler" "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/caduceus/internal/mocks" "go.uber.org/zap/zaptest" @@ -112,7 +113,7 @@ func TestServerHandler(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ ErrorRequests: fakeErrorRequests, EmptyRequests: fakeEmptyRequests, InvalidCount: fakeInvalidCount, @@ -174,12 +175,12 @@ func TestServerHandlerFixWrp(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ ErrorRequests: fakeErrorRequests, EmptyRequests: fakeEmptyRequests, InvalidCount: fakeInvalidCount, - ModifiedWRPCount: fakeModifiedWRPCount, IncomingQueueDepthMetric: fakeQueueDepth, + ModifiedWRPCount: fakeModifiedWRPCount, IncomingQueueLatency: fakeHist, } fakeHandler.SinkWrapper = new(mocks.Wrapper) @@ -223,10 +224,11 @@ func TestServerHandlerFull(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ IncomingQueueDepthMetric: fakeQueueDepth, IncomingQueueLatency: fakeHist, } + fakeHandler.SinkWrapper = new(mocks.Wrapper) fakeHandler.Logger = logger fakeHandler.Telemetry = fakeTel @@ -277,7 +279,7 @@ func TestServerEmptyPayload(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ EmptyRequests: fakeEmptyRequests, IncomingQueueDepthMetric: fakeQueueDepth, IncomingQueueLatency: fakeHist, @@ -333,7 +335,7 @@ func TestServerUnableToReadBody(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ ErrorRequests: fakeErrorRequests, IncomingQueueDepthMetric: fakeQueueDepth, IncomingQueueLatency: fakeHist, @@ -389,7 +391,7 @@ func TestServerInvalidBody(t *testing.T) { fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ InvalidCount: fakeInvalidCount, IncomingQueueDepthMetric: fakeQueueDepth, IncomingQueueLatency: fakeHist, @@ -430,7 +432,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { fakeHandler := new(mocks.Handler) fakeQueueDepth := new(mocks.Gauge) - fakeTel := &mocks.Telemetry{ + fakeTel := &handler.Telemetry{ IncomingQueueDepthMetric: fakeQueueDepth, } diff --git a/internal/mocks/mocks.go b/internal/mocks/mocks.go index 54a825b2..7d777e38 100644 --- a/internal/mocks/mocks.go +++ b/internal/mocks/mocks.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" + "github.com/xmidt-org/caduceus/internal/handler" "github.com/xmidt-org/caduceus/internal/sink" "github.com/xmidt-org/wrp-go/v3" "go.uber.org/zap" @@ -20,7 +21,7 @@ type Handler struct { SinkWrapper sink.Wrapper Logger *zap.Logger - Telemetry *Telemetry + Telemetry *handler.Telemetry IncomingQueueDepth int64 MaxOutstanding int64 Now func() time.Time @@ -79,14 +80,3 @@ func (m *Counter) With(labelValues ...string) prometheus.Counter { args := m.Called(labelValues) return args.Get(0).(prometheus.Counter) } - -type Telemetry struct { - mock.Mock - - ErrorRequests prometheus.Counter - EmptyRequests prometheus.Counter - InvalidCount prometheus.Counter - IncomingQueueDepthMetric prometheus.Gauge - ModifiedWRPCount *prometheus.CounterVec - IncomingQueueLatency prometheus.ObserverVec -}