Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up #444

Merged
merged 7 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions caduceus_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"testing"

"github.com/stretchr/testify/mock"
// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/adapter"
"go.uber.org/zap/zaptest"

"github.com/xmidt-org/wrp-go/v3"
)

func TestCaduceusHandler(t *testing.T) {
logger := adapter.DefaultLogger().Logger
logger := zaptest.NewLogger(t)

fakeSenderWrapper := new(mockSenderWrapper)
fakeSenderWrapper.On("Queue", mock.AnythingOfType("*wrp.Message")).Return().Once()
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/goschtalt/approx v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -79,7 +78,6 @@ require (
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,6 @@ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/z
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/goschtalt/approx v1.0.0 h1:q8DMVEOSgwjFUYsupwhLApMWhfbaxRfWeSKT2uTU214=
github.com/goschtalt/approx v1.0.0/go.mod h1:Mh0VbpeEgO2Qo2PKGrSuz241D/nj9q7OPegJNWzrbIU=
Expand Down Expand Up @@ -1547,7 +1546,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/segmentio/ksuid v1.0.2/go.mod h1:BXuJDr2byAiHuQaQtSKoXh1J0YmUDurywOXgB2w+OSU=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
27 changes: 10 additions & 17 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
uuid "github.com/satori/go.uuid"

"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/v2/adapter"
"github.com/xmidt-org/wrp-go/v3"
)

Expand All @@ -33,7 +32,6 @@ type ServerHandlerOut struct {

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
log *zap.Logger
caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
Expand All @@ -60,23 +58,19 @@ type HandlerTelemetry struct {

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
eventType := unknownEventType
log := sallust.Get(request.Context())
// find time difference, add to metric after function finishes
defer func(s time.Time) {
sh.recordQueueLatencyToHistogram(s, eventType)
}(sh.now())

logger := sallust.Get(request.Context())
if logger == adapter.DefaultLogger().Logger {
logger = sh.log
}

logger.Info("Receiving incoming request...")
log.Info("Receiving incoming request...")

if len(request.Header["Content-Type"]) != 1 || request.Header["Content-Type"][0] != "application/msgpack" {
//return a 415
response.WriteHeader(http.StatusUnsupportedMediaType)
response.Write([]byte("Invalid Content-Type header(s). Expected application/msgpack. \n"))
logger.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
log.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
return
}

Expand All @@ -87,7 +81,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
// return a 503
response.WriteHeader(http.StatusServiceUnavailable)
response.Write([]byte("Incoming queue is full.\n"))
logger.Debug("Incoming queue is full.\n")
log.Debug("Incoming queue is full.\n")
return
}

Expand All @@ -97,14 +91,14 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
payload, err := io.ReadAll(request.Body)
if err != nil {
sh.telemetry.errorRequests.Add(1.0)
logger.Error("Unable to retrieve the request body.", zap.Error(err))
log.Error("Unable to retrieve the request body.", zap.Error(err))
response.WriteHeader(http.StatusBadRequest)
return
}

if len(payload) == 0 {
sh.telemetry.emptyRequests.Add(1.0)
logger.Error("Empty payload.")
log.Error("Empty payload.")
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Empty payload.\n"))
return
Expand All @@ -120,10 +114,10 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusBadRequest)
if err != nil {
response.Write([]byte("Invalid payload format.\n"))
logger.Debug("Invalid payload format.")
log.Debug("Invalid payload format.")
} else {
response.Write([]byte("Invalid MessageType.\n"))
logger.Debug("Invalid MessageType.")
log.Debug("Invalid MessageType.")
}
return
}
Expand All @@ -134,7 +128,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
sh.telemetry.invalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Strings must be UTF-8.\n"))
logger.Debug("Strings must be UTF-8.")
log.Debug("Strings must be UTF-8.")
return
}
eventType = msg.FindEventStringSubMatch()
Expand All @@ -145,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusAccepted)
response.Write([]byte("Request placed on to queue.\n"))

logger.Debug("event passed to senders.", zap.Any("event", msg))
log.Debug("event passed to senders.", zap.Any("event", msg))
}

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
Expand Down Expand Up @@ -208,7 +202,6 @@ func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTeleme
senderWrapper: senderWrapper,
Logger: log,
},
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
incomingQueueDepth: incomingQueueDepth,
Expand Down
2 changes: 2 additions & 0 deletions listenerStub.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// SPDX-FileCopyrightText: 2023 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package main

import "time"
Expand Down
2 changes: 0 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,3 @@ func ProvideSenderMetrics() fx.Option {
},
)
}


33 changes: 9 additions & 24 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/xmidt-org/httpaux/retry"
"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/webpa-common/v2/device"

"github.com/xmidt-org/webpa-common/v2/semaphore"
"github.com/xmidt-org/webpa-common/v2/xhttp"
"github.com/xmidt-org/wrp-go/v3"
"github.com/xmidt-org/wrp-go/v3/wrphttp"
)
Expand Down Expand Up @@ -564,7 +562,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri
req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID)

// Add the device id without the trailing service
id, _ := device.ParseID(msg.Source)
id, _ := wrp.ParseDeviceID(msg.Source)
req.Header.Set("X-Webpa-Device-Id", string(id))
req.Header.Set("X-Webpa-Device-Name", string(id))

Expand All @@ -580,33 +578,20 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri
// find the event "short name"
event := msg.FindEventStringSubMatch()

//TODO: do this need to be replaced by the retry repo?
retryOptions := xhttp.RetryOptions{
Logger: obs.logger,
/*TODO: need middleware for:
Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event)
Logger
Update Request
*/
Comment on lines +581 to +585
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is important to us

retryConfig := retry.Config{
Retries: obs.deliveryRetries,
Interval: obs.deliveryInterval,
// Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), //webpa retry does not accept prometheus metrics
// Always retry on failures up to the max count.
ShouldRetry: xhttp.ShouldRetry,
ShouldRetryStatus: xhttp.RetryCodes,
}

// update subsequent requests with the next url in the list upon failure
retryOptions.UpdateRequest = func(request *http.Request) {
urls = urls.Next()
tmp, err := url.Parse(urls.Value.(string))
if err != nil {
obs.logger.Error("failed to update url", zap.String("url", urls.Value.(string)), zap.Error(err))
return
}
request.URL = tmp
}

// Send it
obs.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do)
client := obs.clientMiddleware(doerFunc(retryer))
client := retry.New(retryConfig, obs.clientMiddleware(obs.sender))
resp, err := client.Do(req)

code := "failure"
Expand Down
49 changes: 18 additions & 31 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -18,20 +17,16 @@ import (

"github.com/gorilla/mux"
"github.com/justinas/alice"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
"github.com/xmidt-org/bascule"
"github.com/xmidt-org/bascule/basculechecks"
"github.com/xmidt-org/bascule/basculehelper"
"github.com/xmidt-org/bascule/basculehttp"
"github.com/xmidt-org/clortho"
"github.com/xmidt-org/clortho/clorthometrics"
"github.com/xmidt-org/clortho/clorthozap"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/touchstone"

// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xmetrics"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -59,8 +54,8 @@ type JWTValidator struct {
Leeway bascule.Leeway
}

func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, registry xmetrics.Registry, sw *ServerHandler, router *mux.Router, prevVersionSupport bool) (*mux.Router, error) {
auth, err := authenticationMiddleware(v, l, registry)
func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, sw *ServerHandler, router *mux.Router, prevVersionSupport bool) (*mux.Router, error) {
auth, err := authenticationMiddleware(v, l)
if err != nil {
// nolint:errorlint
return nil, fmt.Errorf("unable to build authentication middleware: %v", err)
Expand All @@ -79,14 +74,11 @@ func NewPrimaryHandler(l *zap.Logger, v *viper.Viper, registry xmetrics.Registry
}

// authenticationMiddleware configures the authorization requirements for requests to reach the main handler
func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetrics.Registry) (*alice.Chain, error) {
if registry == nil {
return nil, errors.New("nil registry")
}
func authenticationMiddleware(v *viper.Viper, logger *zap.Logger) (*alice.Chain, error) {

basculeMeasures := basculehelper.NewAuthValidationMeasures(registry)
capabilityCheckMeasures := basculehelper.NewAuthCapabilityCheckMeasures(registry)
listener := basculehelper.NewMetricListener(basculeMeasures)
// basculeMeasures := basculehelper.NewAuthValidationMeasures(registry)
// capabilityCheckMeasures := basculehelper.NewAuthCapabilityCheckMeasures(registry)
// listener := basculehelper.NewMetricListener(basculeMeasures)

basicAllowed := make(map[string]string)
denopink marked this conversation as resolved.
Show resolved Hide resolved
basicAuth := v.GetStringSlice("authHeader")
Expand All @@ -107,7 +99,7 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr

options := []basculehttp.COption{
basculehttp.WithCLogger(getLogger),
basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse),
// basculehttp.WithCErrorResponseFunc(listener.OnErrorResponse),
}
if len(basicAllowed) > 0 {
options = append(options, basculehttp.WithTokenFactory("Basic", basculehttp.BasicTokenFactory(basicAllowed)))
Expand Down Expand Up @@ -141,11 +133,6 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
return &alice.Chain{}, emperror.With(err, "failed to create clortho resolver")
}

promReg, ok := registry.(prometheus.Registerer)
if !ok {
return &alice.Chain{}, errors.New("failed to get prometheus registerer")
}

var (
tsConfig touchstone.Config
zConfig sallust.Config
Expand All @@ -154,12 +141,12 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
v.UnmarshalKey("touchstone", &tsConfig)
v.UnmarshalKey("zap", &zConfig)
zlogger := zap.Must(zConfig.Build())
tf := touchstone.NewFactory(tsConfig, zlogger, promReg)
// tf := touchstone.NewFactory(tsConfig, zlogger, promReg)
// Instantiate a metric listener for refresher and resolver to share
cml, err := clorthometrics.NewListener(clorthometrics.WithFactory(tf))
if err != nil {
return &alice.Chain{}, emperror.With(err, "failed to create clortho metrics listener")
}
// cml, err := clorthometrics.NewListener(clorthometrics.WithFactory(tf))
// if err != nil {
// return &alice.Chain{}, emperror.With(err, "failed to create clortho metrics listener")
// }

// Instantiate a logging listener for refresher and resolver to share
czl, err := clorthozap.NewListener(
Expand All @@ -169,9 +156,9 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
return &alice.Chain{}, emperror.With(err, "failed to create clortho zap logger listener")
}

resolver.AddListener(cml)
// resolver.AddListener(cml)
resolver.AddListener(czl)
ref.AddListener(cml)
// ref.AddListener(cml)
ref.AddListener(czl)
ref.AddListener(kr)
// context.Background() is for the unused `context.Context` argument in refresher.Start
Expand Down Expand Up @@ -225,8 +212,8 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
}
m := basculehelper.MetricValidator{
C: basculehelper.CapabilitiesValidator{Checker: c},
Measures: capabilityCheckMeasures,
Endpoints: endpoints,
// Measures: capabilityCheckMeasures,
}
bearerRules = append(bearerRules, m.CreateValidator(capabilityCheck.Type == "enforce"))
}
Expand All @@ -237,11 +224,11 @@ func authenticationMiddleware(v *viper.Viper, logger *zap.Logger, registry xmetr
basculechecks.AllowAll(),
}),
basculehttp.WithRules("Bearer", bearerRules),
basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse),
// basculehttp.WithEErrorResponseFunc(listener.OnErrorResponse),
)

authChain := alice.New(setLogger(logger), authConstructor, authEnforcer, basculehttp.NewListenerDecorator(listener))
authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer, basculehttp.NewListenerDecorator(listener))
authChain := alice.New(setLogger(logger), authConstructor, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener). commenting for now in case needed later
authChainLegacy := alice.New(setLogger(logger), authConstructorLegacy, authEnforcer) //removing: basculehttp.NewListenerDecorator(listener) commenting for now in case needed later

versionCompatibleAuth := alice.New(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(r http.ResponseWriter, req *http.Request) {
Expand Down
Loading
Loading