Skip to content

Commit

Permalink
package-reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Apr 11, 2024
1 parent 548aaad commit 828166a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 41 deletions.
40 changes: 20 additions & 20 deletions internal/handler/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ type TelemetryIn struct {
IncomingQueueLatency prometheus.ObserverVec `name:"incoming_queue_latency_histogram_seconds"`
}
type Telemetry struct {
errorRequests prometheus.Counter
emptyRequests prometheus.Counter
invalidCount prometheus.Counter
incomingQueueDepthMetric prometheus.Gauge
modifiedWRPCount *prometheus.CounterVec
incomingQueueLatency prometheus.ObserverVec
ErrorRequests prometheus.Counter
EmptyRequests prometheus.Counter
InvalidCount prometheus.Counter
IncomingQueueDepthMetric prometheus.Gauge
ModifiedWRPCount *prometheus.CounterVec
IncomingQueueLatency prometheus.ObserverVec
}

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
Expand Down Expand Up @@ -90,19 +90,19 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
return
}

sh.telemetry.incomingQueueDepthMetric.Add(1.0)
defer sh.telemetry.incomingQueueDepthMetric.Add(-1.0)
sh.telemetry.IncomingQueueDepthMetric.Add(1.0)
defer sh.telemetry.IncomingQueueDepthMetric.Add(-1.0)

payload, err := io.ReadAll(request.Body)
if err != nil {
sh.telemetry.errorRequests.Add(1.0)
sh.telemetry.ErrorRequests.Add(1.0)
logger.Error("Unable to retrieve the request body.", zap.Error(err))
response.WriteHeader(http.StatusBadRequest)
return
}

if len(payload) == 0 {
sh.telemetry.emptyRequests.Add(1.0)
sh.telemetry.EmptyRequests.Add(1.0)
logger.Error("Empty payload.")
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Empty payload.\n"))
Expand All @@ -115,7 +115,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
err = decoder.Decode(msg)
if err != nil || msg.MessageType() != 4 {
// return a 400
sh.telemetry.invalidCount.Add(1.0)
sh.telemetry.InvalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
if err != nil {
response.Write([]byte("Invalid payload format.\n"))
Expand All @@ -130,7 +130,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
err = wrp.UTF8(msg)
if err != nil {
// return a 400
sh.telemetry.invalidCount.Add(1.0)
sh.telemetry.InvalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Strings must be UTF-8.\n"))
logger.Debug("Strings must be UTF-8.")
Expand All @@ -154,7 +154,7 @@ func (sh *ServerHandler) handleRequest(msg *wrp.Message) {

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
endTime := sh.now()
sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds())
sh.telemetry.IncomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(endTime.Sub(startTime).Seconds())
}

func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
Expand All @@ -179,7 +179,7 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
}

if reason != "" {
sh.telemetry.modifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0)
sh.telemetry.ModifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0)
}

return msg
Expand All @@ -189,12 +189,12 @@ func Provide() fx.Option {
return fx.Provide(
func(in TelemetryIn) *Telemetry {
return &Telemetry{
errorRequests: in.ErrorRequests,
emptyRequests: in.EmptyRequests,
invalidCount: in.InvalidCount,
incomingQueueDepthMetric: in.IncomingQueueDepthMetric,
modifiedWRPCount: in.ModifiedWRPCount,
incomingQueueLatency: in.IncomingQueueLatency,
ErrorRequests: in.ErrorRequests,
EmptyRequests: in.EmptyRequests,
InvalidCount: in.InvalidCount,
IncomingQueueDepthMetric: in.IncomingQueueDepthMetric,
ModifiedWRPCount: in.ModifiedWRPCount,
IncomingQueueLatency: in.IncomingQueueLatency,
}
},
func(in ServerHandlerIn) (ServerHandlerOut, error) {
Expand Down
20 changes: 11 additions & 9 deletions internal/handler/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// // SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC
// // SPDX-License-Identifier: Apache-2.0
package handler
package handler_test

import (
"bytes"
Expand All @@ -13,6 +13,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/caduceus/internal/handler"
"github.com/xmidt-org/caduceus/internal/metrics"
"github.com/xmidt-org/caduceus/internal/mocks"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestServerHandler(t *testing.T) {
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
ErrorRequests: fakeErrorRequests,
EmptyRequests: fakeEmptyRequests,
InvalidCount: fakeInvalidCount,
Expand Down Expand Up @@ -174,12 +175,12 @@ func TestServerHandlerFixWrp(t *testing.T) {
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
ErrorRequests: fakeErrorRequests,
EmptyRequests: fakeEmptyRequests,
InvalidCount: fakeInvalidCount,
ModifiedWRPCount: fakeModifiedWRPCount,
IncomingQueueDepthMetric: fakeQueueDepth,
ModifiedWRPCount: fakeModifiedWRPCount,
IncomingQueueLatency: fakeHist,
}
fakeHandler.SinkWrapper = new(mocks.Wrapper)
Expand Down Expand Up @@ -223,10 +224,11 @@ func TestServerHandlerFull(t *testing.T) {
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
IncomingQueueDepthMetric: fakeQueueDepth,
IncomingQueueLatency: fakeHist,
}

fakeHandler.SinkWrapper = new(mocks.Wrapper)
fakeHandler.Logger = logger
fakeHandler.Telemetry = fakeTel
Expand Down Expand Up @@ -277,7 +279,7 @@ func TestServerEmptyPayload(t *testing.T) {
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
EmptyRequests: fakeEmptyRequests,
IncomingQueueDepthMetric: fakeQueueDepth,
IncomingQueueLatency: fakeHist,
Expand Down Expand Up @@ -333,7 +335,7 @@ func TestServerUnableToReadBody(t *testing.T) {
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
ErrorRequests: fakeErrorRequests,
IncomingQueueDepthMetric: fakeQueueDepth,
IncomingQueueLatency: fakeHist,
Expand Down Expand Up @@ -389,7 +391,7 @@ func TestServerInvalidBody(t *testing.T) {
fakeLatency := date2.Sub(date1)
fakeHist.On("With", histogramFunctionCall).Return().Once()
fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once()
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
InvalidCount: fakeInvalidCount,
IncomingQueueDepthMetric: fakeQueueDepth,
IncomingQueueLatency: fakeHist,
Expand Down Expand Up @@ -430,7 +432,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) {
fakeHandler := new(mocks.Handler)

fakeQueueDepth := new(mocks.Gauge)
fakeTel := &mocks.Telemetry{
fakeTel := &handler.Telemetry{
IncomingQueueDepthMetric: fakeQueueDepth,
}

Expand Down
14 changes: 2 additions & 12 deletions internal/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/caduceus/internal/handler"
"github.com/xmidt-org/caduceus/internal/sink"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
Expand All @@ -20,7 +21,7 @@ type Handler struct {

SinkWrapper sink.Wrapper
Logger *zap.Logger
Telemetry *Telemetry
Telemetry *handler.Telemetry
IncomingQueueDepth int64
MaxOutstanding int64
Now func() time.Time
Expand Down Expand Up @@ -79,14 +80,3 @@ func (m *Counter) With(labelValues ...string) prometheus.Counter {
args := m.Called(labelValues)
return args.Get(0).(prometheus.Counter)
}

type Telemetry struct {
mock.Mock

ErrorRequests prometheus.Counter
EmptyRequests prometheus.Counter
InvalidCount prometheus.Counter
IncomingQueueDepthMetric prometheus.Gauge
ModifiedWRPCount *prometheus.CounterVec
IncomingQueueLatency prometheus.ObserverVec
}

0 comments on commit 828166a

Please sign in to comment.