Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More configurable options for parsing device ID from event #130

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions eventParser/eventParser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package eventparser

import (
"errors"
"time"

db "github.com/xmidt-org/codex-db"
"github.com/xmidt-org/wrp-go/v3"
wrpparser "github.com/xmidt-org/wrp-listener/wrpParser"
)

const (
defaultTTL = time.Duration(5) * time.Minute
)

type EventParser struct {
classifier wrpparser.Classifier
finders map[db.EventType]labelResults
}

type labelResults struct {
storePayload bool
ttl time.Duration
finder wrpparser.DeviceFinder
}

type Result struct {
Label db.EventType
DeviceID string
StorePayload bool
TTL time.Duration
}

func (p *EventParser) Parse(msg *wrp.Message) (*Result, error) {
eventType := db.Default

if label, ok := p.classifier.Label(msg); ok {
eventType = db.ParseEventType(label)
}

// get the finder for the event type, and if that doesn't work, get the
// default finder and results.
lr, ok := p.finders[eventType]
if !ok {
eventType := db.Default
lr, ok = p.finders[eventType]
if !ok {
return nil, errors.New("this is a serious problem")
}
}

id, err := lr.finder.FindDeviceID(msg)
if err != nil {
return nil, errors.New("some error")
}
return &Result{
Label: eventType,
DeviceID: id,
StorePayload: lr.storePayload,
TTL: lr.ttl,
}, nil

}

// Option is a function used to configure the StrParser.
type Option func(*EventParser)

// WithDeviceFinder adds a DeviceFinder that the StrParser will use to find the
// device id of a wrp message. Each DeviceFinder is associated with a label.
// If the label already has a DeviceFinder associated with it, it will be
// replaced by the new one (as long as the DeviceFinder is not nil).
func WithDeviceFinder(label string, finder wrpparser.DeviceFinder, storePayload bool, ttl time.Duration) Option {
return func(parser *EventParser) {
if finder != nil {
lr := labelResults{
finder: finder,
storePayload: storePayload,
ttl: ttl,
}
if lr.ttl == 0 {
lr.ttl = defaultTTL
}
parser.finders[db.ParseEventType(label)] = lr
}
}
}

func New(classifier wrpparser.Classifier, options ...Option) (*EventParser, error) {
if classifier == nil {
return nil, errors.New("some error")
}

p := &EventParser{
classifier: classifier,
finders: map[db.EventType]labelResults{},
}

for _, o := range options {
o(p)
}

// set up a default if it doesn't exist
if _, ok := p.finders[db.Default]; !ok {
defaultFinder := wrpparser.FieldFinder{Field: wrpparser.Source}
p.finders[db.Default] = labelResults{
finder: &defaultFinder,
storePayload: false,
ttl: defaultTTL,
}
}

return p, nil
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ require (
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.6.1
github.com/stretchr/testify v1.4.0
github.com/xmidt-org/bascule v0.5.0
github.com/xmidt-org/bascule v0.8.0
github.com/xmidt-org/codex-db v0.7.0
github.com/xmidt-org/voynicrypto v0.1.1
github.com/xmidt-org/webpa-common v1.5.0
github.com/xmidt-org/wrp-go/v2 v2.0.1
github.com/xmidt-org/wrp-listener v0.1.1
github.com/xmidt-org/webpa-common v1.10.5
github.com/xmidt-org/wrp-go/v3 v3.0.1
github.com/xmidt-org/wrp-listener v0.1.3-0.20200726012340-6fbea840e72b
)
278 changes: 272 additions & 6 deletions go.sum

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/xmidt-org/svalinn/requestParser"
"github.com/xmidt-org/voynicrypto"
"github.com/xmidt-org/webpa-common/basculechecks"
"github.com/xmidt-org/webpa-common/basculemetrics"
"github.com/xmidt-org/webpa-common/concurrent"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/server"
Expand Down Expand Up @@ -149,7 +150,7 @@ func svalinn(arguments []string) {

var (
f, v = pflag.NewFlagSet(applicationName, pflag.ContinueOnError), viper.New()
logger, metricsRegistry, codex, err = server.Initialize(applicationName, arguments, f, v, cassandra.Metrics, dbretry.Metrics, requestParser.Metrics, batchInserter.Metrics, basculechecks.Metrics, Metrics)
logger, metricsRegistry, codex, err = server.Initialize(applicationName, arguments, f, v, cassandra.Metrics, dbretry.Metrics, requestParser.Metrics, batchInserter.Metrics, basculechecks.Metrics, basculemetrics.Metrics, webhookClient.Metrics, Metrics)
)

if parseErr, done := printVersion(f, arguments); done {
Expand Down Expand Up @@ -177,12 +178,12 @@ func svalinn(arguments []string) {

secretGetter := secretGetter.NewConstantSecret(config.Webhook.Request.Config.Secret)

var m *basculechecks.JWTValidationMeasures
var m *basculemetrics.AuthValidationMeasures

if metricsRegistry != nil {
m = basculechecks.NewJWTValidationMeasures(metricsRegistry)
m = basculemetrics.NewAuthValidationMeasures(metricsRegistry)
}
listener := basculechecks.NewMetricListener(m)
listener := basculemetrics.NewMetricListener(m)

svalinnHandler := alice.New()

Expand Down Expand Up @@ -240,7 +241,7 @@ func svalinn(arguments []string) {
logging.Error(logger, emperror.Context(err)...).Log(logging.MessageKey(), "Failed to create basic registerer", logging.ErrorKey(), err.Error())
//TODO: we shouldn't continue trying to set the webhook registerer up if we fail
}
periodicRegisterer := webhookClient.NewPeriodicRegisterer(registerer, config.Webhook.RegistrationInterval, logger)
periodicRegisterer := webhookClient.NewPeriodicRegisterer(registerer, config.Webhook.RegistrationInterval, logger, metricsRegistry)

s.registerer = periodicRegisterer
periodicRegisterer.Start()
Expand Down
2 changes: 1 addition & 1 deletion primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/xmidt-org/svalinn/requestParser"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/wrp-go/v2"
"github.com/xmidt-org/wrp-go/v3"
)

type parser interface {
Expand Down
2 changes: 1 addition & 1 deletion primaryHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/stretchr/testify/mock"

"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/wrp-go/v2"
"github.com/xmidt-org/wrp-go/v3"
)

func TestHandleWebhook(t *testing.T) {
Expand Down
Loading