Skip to content

Commit

Permalink
Merge pull request #444 from xmidt-org/clean-up
Browse files Browse the repository at this point in the history
Clean up
  • Loading branch information
maurafortino authored Jan 29, 2024
2 parents b1caa05 + 42dddb0 commit d097b39
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 113 deletions.
6 changes: 3 additions & 3 deletions caduceus_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"testing"

"github.com/stretchr/testify/mock"
// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/adapter"
"go.uber.org/zap/zaptest"

"github.com/xmidt-org/wrp-go/v3"
)

func TestCaduceusHandler(t *testing.T) {
logger := adapter.DefaultLogger().Logger
logger := zaptest.NewLogger(t)

fakeSenderWrapper := new(mockSenderWrapper)
fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once()
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/goschtalt/approx v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -79,7 +78,6 @@ require (
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,6 @@ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/z
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/goschtalt/approx v1.0.0 h1:q8DMVEOSgwjFUYsupwhLApMWhfbaxRfWeSKT2uTU214=
github.com/goschtalt/approx v1.0.0/go.mod h1:Mh0VbpeEgO2Qo2PKGrSuz241D/nj9q7OPegJNWzrbIU=
Expand Down Expand Up @@ -1547,7 +1546,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/segmentio/ksuid v1.0.2/go.mod h1:BXuJDr2byAiHuQaQtSKoXh1J0YmUDurywOXgB2w+OSU=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
27 changes: 10 additions & 17 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
uuid "github.com/satori/go.uuid"

"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/v2/adapter"
"github.com/xmidt-org/wrp-go/v3"
)

Expand All @@ -33,7 +32,6 @@ type ServerHandlerOut struct {

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
log *zap.Logger
caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
Expand All @@ -60,23 +58,19 @@ type HandlerTelemetry struct {

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
eventType := unknownEventType
log := sallust.Get(request.Context())
// find time difference, add to metric after function finishes
defer func(s time.Time) {
sh.recordQueueLatencyToHistogram(s, eventType)
}(sh.now())

logger := sallust.Get(request.Context())
if logger == adapter.DefaultLogger().Logger {
logger = sh.log
}

logger.Info("Receiving incoming request...")
log.Info("Receiving incoming request...")

if len(request.Header["Content-Type"]) != 1 || request.Header["Content-Type"][0] != "application/msgpack" {
//return a 415
response.WriteHeader(http.StatusUnsupportedMediaType)
response.Write([]byte("Invalid Content-Type header(s). Expected application/msgpack. \n"))
logger.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
log.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
return
}

Expand All @@ -87,7 +81,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
// return a 503
response.WriteHeader(http.StatusServiceUnavailable)
response.Write([]byte("Incoming queue is full.\n"))
logger.Debug("Incoming queue is full.\n")
log.Debug("Incoming queue is full.\n")
return
}

Expand All @@ -97,14 +91,14 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
payload, err := io.ReadAll(request.Body)
if err != nil {
sh.telemetry.errorRequests.Add(1.0)
logger.Error("Unable to retrieve the request body.", zap.Error(err))
log.Error("Unable to retrieve the request body.", zap.Error(err))
response.WriteHeader(http.StatusBadRequest)
return
}

if len(payload) == 0 {
sh.telemetry.emptyRequests.Add(1.0)
logger.Error("Empty payload.")
log.Error("Empty payload.")
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Empty payload.\n"))
return
Expand All @@ -120,10 +114,10 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusBadRequest)
if err != nil {
response.Write([]byte("Invalid payload format.\n"))
logger.Debug("Invalid payload format.")
log.Debug("Invalid payload format.")
} else {
response.Write([]byte("Invalid MessageType.\n"))
logger.Debug("Invalid MessageType.")
log.Debug("Invalid MessageType.")
}
return
}
Expand All @@ -134,7 +128,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
sh.telemetry.invalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Strings must be UTF-8.\n"))
logger.Debug("Strings must be UTF-8.")
log.Debug("Strings must be UTF-8.")
return
}
eventType = msg.FindEventStringSubMatch()
Expand All @@ -145,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusAccepted)
response.Write([]byte("Request placed on to queue.\n"))

logger.Debug("event passed to senders.", zap.Any("event", msg))
log.Debug("event passed to senders.", zap.Any("event", msg))
}

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
Expand Down Expand Up @@ -208,7 +202,6 @@ func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTeleme
senderWrapper: senderWrapper,
Logger: log,
},
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
incomingQueueDepth: incomingQueueDepth,
Expand Down
2 changes: 2 additions & 0 deletions listenerStub.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package main

import "time"
Expand Down
2 changes: 0 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,3 @@ func ProvideSenderMetrics() fx.Option {
},
)
}


33 changes: 9 additions & 24 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/xmidt-org/httpaux/retry"
"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/webpa-common/v2/device"

"github.com/xmidt-org/webpa-common/v2/semaphore"
"github.com/xmidt-org/webpa-common/v2/xhttp"
"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/wrp-go/v3/wrphttp"
)
Expand Down Expand Up @@ -564,7 +562,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri
req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID)

// Add the device id without the trailing service
id, _ := device.ParseID(msg.Source)
id, _ := wrp.ParseDeviceID(msg.Source)
req.Header.Set("X-Webpa-Device-Id", string(id))
req.Header.Set("X-Webpa-Device-Name", string(id))

Expand All @@ -580,33 +578,20 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri
// find the event "short name"
event := msg.FindEventStringSubMatch()

//TODO: do this need to be replaced by the retry repo?
retryOptions := xhttp.RetryOptions{
Logger: obs.logger,
/*TODO: need middleware for:
Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event)
Logger
Update Request
*/
retryConfig := retry.Config{
Retries: obs.deliveryRetries,
Interval: obs.deliveryInterval,
// Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), //webpa retry does not accept prometheus metrics
// Always retry on failures up to the max count.
ShouldRetry: xhttp.ShouldRetry,
ShouldRetryStatus: xhttp.RetryCodes,
}

// update subsequent requests with the next url in the list upon failure
retryOptions.UpdateRequest = func(request *http.Request) {
urls = urls.Next()
tmp, err := url.Parse(urls.Value.(string))
if err != nil {
obs.logger.Error("failed to update url", zap.String("url", urls.Value.(string)), zap.Error(err))
return
}
request.URL = tmp
}

// Send it
obs.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do)
client := obs.clientMiddleware(doerFunc(retryer))
client := retry.New(retryConfig, obs.clientMiddleware(obs.sender))
resp, err := client.Do(req)

code := "failure"
Expand Down
49 changes: 18 additions & 31 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -18,20 +17,16 @@ import (

"github.com/gorilla/mux"
"github.com/justinas/alice"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
"github.com/xmidt-org/bascule"
"github.com/xmidt-org/bascule/basculechecks"
"github.com/xmidt-org/bascule/basculehelper"
"github.com/xmidt-org/bascule/basculehttp"
"github.com/xmidt-org/clortho"
"github.com/xmidt-org/clortho/clorthometrics"
"github.com/xmidt-org/clortho/clorthozap"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/touchstone"

// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xmetrics"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -59,8 +54,8 @@ type JWTValidator struct {
Leeway bascule.Leeway
}

func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, registry xmetrics.Registry, sw *ServerHandler, router *mux.Router, prevVersionSupport bool) (*mux.Router, error) {
auth, err := authenticationMiddleware(v, l, registry)
func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, sw *ServerHandler, router *mux.Router, prevVersionSupport bool) (*mux.Router, error) {
auth, err := authenticationMiddleware(v, l)
if err != nil {
// nolint:errorlint
return nil, fmt.Errorf("unable to build authentication middleware: %v", err)
Expand All @@ -79,14 +74,11 @@ func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, registry xmetrics.Registry
}

// authenticationMiddleware configures the authorization requirements for requests to reach the main handler
func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetrics.Registry) (*alice.Chain, error) {
if registry == nil {
return nil, errors.New("nil registry")
}
func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, error) {

basculeMeasures := basculehelper.NewAuthValidationMeasures(registry)
capabilityCheckMeasures := basculehelper.NewAuthCapabilityCheckMeasures(registry)
listener := basculehelper.NewMetricListener(basculeMeasures)
// basculeMeasures := basculehelper.NewAuthValidationMeasures(registry)
// capabilityCheckMeasures := basculehelper.NewAuthCapabilityCheckMeasures(registry)
// listener := basculehelper.NewMetricListener(basculeMeasures)

basicAllowed := make(map[string]string)
basicAuth := v.GetStringSlice("authHeader")
Expand All @@ -107,7 +99,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr

options := []basculehttp.COption{
basculehttp.WithCLogger(getLogger),
basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse),
// basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse),
}
if len(basicAllowed) > 0 {
options = append(options, basculehttp.WithTokenFactory("Basic", basculehttp.BasicTokenFactory(basicAllowed)))
Expand Down Expand Up @@ -141,11 +133,6 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
return &alice.Chain{}, emperror.With(err, "failed to create clortho resolver")
}

promReg, ok := registry.(prometheus.Registerer)
if !ok {
return &alice.Chain{}, errors.New("failed to get prometheus registerer")
}

var (
tsConfig touchstone.Config
zConfig sallust.Config
Expand All @@ -154,12 +141,12 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
v.UnmarshalKey("touchstone", &tsConfig)
v.UnmarshalKey("zap", &zConfig)
zlogger := zap.Must(zConfig.Build())
tf := touchstone.NewFactory(tsConfig, zlogger, promReg)
// tf := touchstone.NewFactory(tsConfig, zlogger, promReg)
// Instantiate a metric listener for refresher and resolver to share
cml, err := clorthometrics.NewListener(clorthometrics.WithFactory(tf))
if err != nil {
return &alice.Chain{}, emperror.With(err, "failed to create clortho metrics listener")
}
// cml, err := clorthometrics.NewListener(clorthometrics.WithFactory(tf))
// if err != nil {
// return &alice.Chain{}, emperror.With(err, "failed to create clortho metrics listener")
// }

// Instantiate a logging listener for refresher and resolver to share
czl, err := clorthozap.NewListener(
Expand All @@ -169,9 +156,9 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
return &alice.Chain{}, emperror.With(err, "failed to create clortho zap logger listener")
}

resolver.AddListener(cml)
// resolver.AddListener(cml)
resolver.AddListener(czl)
ref.AddListener(cml)
// ref.AddListener(cml)
ref.AddListener(czl)
ref.AddListener(kr)
// context.Background() is for the unused `context.Context` argument in refresher.Start
Expand Down Expand Up @@ -225,8 +212,8 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
}
m := basculehelper.MetricValidator{
C: basculehelper.CapabilitiesValidator{Checker: c},
Measures: capabilityCheckMeasures,
Endpoints: endpoints,
// Measures: capabilityCheckMeasures,
}
bearerRules = append(bearerRules, m.CreateValidator(capabilityCheck.Type == "enforce"))
}
Expand All @@ -237,11 +224,11 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
basculechecks.AllowAll(),
}),
basculehttp.WithRules("Bearer", bearerRules),
basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse),
// basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse),
)

authChain := alice.New(setLogger(logger), authConstructor, authEnforcer, basculehttp.NewListenerDecorator(listener))
authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer, basculehttp.NewListenerDecorator(listener))
authChain := alice.New(setLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later
authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later

versionCompatibleAuth := alice.New(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(r http.ResponseWriter, req *http.Request) {
Expand Down
Loading

0 comments on commit d097b39

Please sign in to comment.