Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add webhooks to scytale #97

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions deploy/packaging/scytale_spruce.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,30 @@ capabilityConfig:
thirdPiece: "p3"
acceptAllMethod: "all"

########################################
# Webhook Related Configuration
########################################

# cacheConfig configures the in memory storage of the webhooks. aka how long webhooks should live etc.
cacheConfig:
# ttl is how long a webhook registration will last.
# (Optional) defaults to 5m
ttl: 5m

# checkInterval is how long the internal crapper will check if the inMemory webhook should be removed.
# (Optional) defaults to 1m
checkInterval: 10s

# Yggdrasil Config for storing the webhook information
argusConfig:
bucket: "webhooks-caduceus"
address: (( grab $ARGUS_HOST || "argus:6600" ))
pullInterval: "2s"
filters:
stage: "dev"
auth:
basic: (( grab $AUTH_HEADER || "dXNlcjpwYXNz" ))

########################################
# Service Discovery Configuration
########################################
Expand Down
15 changes: 6 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ go 1.12
require (
github.com/aws/aws-sdk-go v1.23.11 // indirect
github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b // indirect
github.com/go-kit/kit v0.8.0
github.com/go-kit/kit v0.9.0
github.com/goph/emperror v0.17.3-0.20190703203600-60a8d9faa17b
github.com/gorilla/mux v1.7.3
github.com/influxdata/influxdb v1.7.7 // indirect
github.com/justinas/alice v1.2.0
github.com/prometheus/client_golang v1.0.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190810000440-0ceca61e4d75 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.6.1
github.com/stretchr/testify v1.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.5.1
github.com/xmidt-org/argus v0.1.2-0.20200508175044-8486ca622384
github.com/xmidt-org/bascule v0.8.0
github.com/xmidt-org/webpa-common v1.6.2
github.com/xmidt-org/webpa-common v1.10.0
github.com/xmidt-org/wrp-go/v2 v2.0.1
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 // indirect
)
147 changes: 123 additions & 24 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/xmidt-org/webpa-common/server"
"github.com/xmidt-org/webpa-common/service"
"github.com/xmidt-org/webpa-common/service/servicecfg"
"github.com/xmidt-org/webpa-common/webhook"
"github.com/xmidt-org/webpa-common/webhook/aws"
)

Expand All @@ -57,6 +56,7 @@ type CapabilityConfig struct {
Type string
Prefix string
AcceptAllMethod string
EndpointBuckets []string
}

// scytale is the driver function for Scytale. It performs everything main() would do,
Expand All @@ -70,7 +70,7 @@ func scytale(arguments []string) int {
f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError)
v = viper.New()

logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, webhook.Metrics, aws.Metrics, basculechecks.Metrics, basculemetrics.Metrics, Metrics)
logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, aws.Metrics, basculechecks.Metrics, basculemetrics.Metrics, Metrics)
)

if parseErr, done := printVersion(f, arguments); done {
Expand Down
23 changes: 18 additions & 5 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/provider"
"github.com/xmidt-org/webpa-common/xmetrics"
)

//Names for our metrics
// Metric names
const (
ReceivedWRPMessageCount = "received_wrp_message_total"
WebhookListSizeGauge = "webhook_list_size_value"
)

// labels
Expand All @@ -34,20 +36,31 @@ const (
JWTPIDInvalid = "jwt_pid_invalid"
)

//Metrics returns the metrics relevant to this package
// Metrics returns the metrics relevant to this package
func Metrics() []xmetrics.Metric {
return []xmetrics.Metric{
xmetrics.Metric{
{
Name: ReceivedWRPMessageCount,
Type: xmetrics.CounterType,
Help: "Number of WRP Messages successfully decoded and ready for fanout.",
LabelNames: []string{OutcomeLabel, ClientIDLabel, ReasonLabel},
},
{
Name: WebhookListSizeGauge,
Help: "Amount of current listeners",
Type: "gauge",
},
}
}

//NewReceivedWRPCounter initializes a counter to keep track of
//scytale users which do not populate the partnerIDs field in their WRP messages
// NewReceivedWRPCounter initializes a counter to keep track of
// scytale users which do not populate the partnerIDs field in their WRP messages
func NewReceivedWRPCounter(r xmetrics.Registry) metrics.Counter {
return r.NewCounter(ReceivedWRPMessageCount)
}

// NewWebhookListSizeGauge initializes a gauge representing the size of the list
// of currently registered webhook listeners
func NewWebhookListSizeGauge(p provider.Provider) metrics.Gauge {
return p.NewGauge(WebhookListSizeGauge)
}
57 changes: 55 additions & 2 deletions primaryHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"net/http"
"regexp"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/gorilla/mux"
"github.com/justinas/alice"
"github.com/spf13/viper"
"github.com/xmidt-org/argus/webhookclient"
"github.com/xmidt-org/bascule"
"github.com/xmidt-org/bascule/basculehttp"
"github.com/xmidt-org/webpa-common/basculechecks"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/xmidt-org/webpa-common/logging/logginghttp"
"github.com/xmidt-org/webpa-common/service"
"github.com/xmidt-org/webpa-common/service/monitor"
"github.com/xmidt-org/webpa-common/webhook"
"github.com/xmidt-org/webpa-common/xhttp"
"github.com/xmidt-org/webpa-common/xhttp/fanout"
"github.com/xmidt-org/webpa-common/xmetrics"
Expand Down Expand Up @@ -136,9 +139,18 @@ func authChain(v *viper.Viper, logger log.Logger, registry xmetrics.Registry) (a
var capabilityCheck CapabilityConfig
v.UnmarshalKey("capabilityCheck", &capabilityCheck)
if capabilityCheck.Type == "enforce" || capabilityCheck.Type == "monitor" {
checker, err := basculechecks.NewCapabilityChecker(capabilityCheckMeasures, capabilityCheck.Prefix, capabilityCheck.AcceptAllMethod)
var endpoints []*regexp.Regexp
for _, e := range capabilityCheck.EndpointBuckets {
r, err := regexp.Compile(e)
if err != nil {
logging.Error(logger).Log(logging.MessageKey(), "failed to compile regular expression", "regex", e, logging.ErrorKey(), err.Error())
continue
}
endpoints = append(endpoints, r)
}
checker, err := basculechecks.NewCapabilityChecker(capabilityCheckMeasures, capabilityCheck.Prefix, capabilityCheck.AcceptAllMethod, endpoints)
if err != nil {
return alice.Chain{}, emperror.With(err, "failed to create capability check")
return alice.New(), emperror.With(err, "failed to create capability check")
}
bearerRules = append(bearerRules, checker.CreateBasculeCheck(capabilityCheck.Type == "enforce"))
}
Expand Down Expand Up @@ -314,6 +326,38 @@ func NewPrimaryHandler(logger log.Logger, v *viper.Viper, registry xmetrics.Regi
sendSubrouter.Headers("Content-Type", wrp.JSON.ContentType()).
Handler(authChain.Then(sendWRPHandler))

webhookListSize := NewWebhookListSizeGauge(registry)

var updatewebhookListSize webhookclient.ListenerFunc = func(hooks []webhook.W) {
webhookListSize.Set(float64(len(hooks)))
}

webhookConfig, err := readWebhookConfig(v)
if err != nil {
logging.Error(logger).Log(logging.ErrorKey(), err, logging.MessageKey(), "could not read webhook config successfully")
return nil, err
}

webhookRegistry, err := NewRegistry(RegistryConfig{
Logger: logger,
CacheConfig: webhookConfig.CacheConfig,
ArgusConfig: webhookConfig.ArgusConfig,
}, updatewebhookListSize)

if err != nil {
return nil, err
}

router.Handle(
fmt.Sprintf("%s/%s/hook", baseURI, version),
authChain.ThenFunc(webhookRegistry.UpdateRegistry),
).Methods(http.MethodPost)

router.Handle(
fmt.Sprintf("%s/%s/hooks", baseURI, version),
authChain.ThenFunc(webhookRegistry.GetRegistry),
).Methods(http.MethodGet)

router.Handle(
fmt.Sprintf("%s/%s/device/{deviceID}/stat", baseURI, version),
authChain.Extend(fanoutChain).Then(
Expand All @@ -337,3 +381,12 @@ func NewPrimaryHandler(logger log.Logger, v *viper.Viper, registry xmetrics.Regi

return router, nil
}

func readWebhookConfig(v *viper.Viper) (*webhookConfig, error) {
config := new(webhookConfig)
err := v.UnmarshalKey("webhook", config)
if err != nil {
return nil, err
}
return config, nil
}
25 changes: 25 additions & 0 deletions scytale.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,31 @@ jwtValidator:
# WRPCheck:
# type: "enforce"


########################################
# Webhook Related Configuration
########################################
webhook:
# cacheConfig configures the in memory storage of the webhooks. aka how long webhooks should live etc.
cacheConfig:
# ttl is how long a webhook registration will last.
# (Optional) defaults to 5m
ttl: 5m

# checkInterval is how long the internal crapper will check if the inMemory webhook should be removed.
# (Optional) defaults to 1m
checkInterval: 10s

# Yggdrasil Config for storing the webhook information
argusConfig:
bucket: "webhooks-caduceus"
address: "http://argus:6600"
pullInterval: "2s"
filters:
stage: "dev"
auth:
basic: ""

########################################
# Service Discovery Configuration
########################################
Expand Down
6 changes: 6 additions & 0 deletions scytale_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package main

import (
"github.com/xmidt-org/argus/webhookclient"
"github.com/xmidt-org/bascule"
"github.com/xmidt-org/bascule/key"
)
Expand All @@ -31,3 +32,8 @@ type JWTValidator struct {
// time values, such as nbf
Leeway bascule.Leeway
}

type webhookConfig struct {
CacheConfig webhookclient.CacheConfig
ArgusConfig webhookclient.ArgusConfig
}
111 changes: 111 additions & 0 deletions webhookHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/go-kit/kit/log"
"github.com/xmidt-org/argus/webhookclient"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/webhook"
)

type Registry struct {
hookStore *webhookclient.Cache
config RegistryConfig
}

type RegistryConfig struct {
Logger log.Logger
CacheConfig webhookclient.CacheConfig
ArgusConfig webhookclient.ArgusConfig
}

func NewRegistry(config RegistryConfig, listener webhookclient.Listener) (*Registry, error) {
yggdrasilStore, err := webhookclient.CreateArgusStore(config.ArgusConfig, webhookclient.WithLogger(config.Logger))
if err != nil {
return nil, err
}
hookStorage := webhookclient.CreateCacheStore(config.CacheConfig, webhookclient.WithLogger(config.Logger), webhookclient.WithStorage(yggdrasilStore))
yggdrasilStore.SetListener(hookStorage)

if listener != nil {
hookStorage.SetListener(listener)
}

return &Registry{
config: config,
hookStore: hookStorage,
}, nil
}

// jsonResponse is an internal convenience function to write a json response
func jsonResponse(rw http.ResponseWriter, code int, msg string) {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(code)
rw.Write([]byte(fmt.Sprintf(`{"message":"%s"}`, msg)))
}

// get is an api call to return all the registered listeners
func (r *Registry) GetRegistry(rw http.ResponseWriter, req *http.Request) {
logging.Info(r.config.Logger).Log(logging.MessageKey(), "get registry")
items, err := r.hookStore.GetWebhook()
if err != nil {
jsonResponse(rw, http.StatusInternalServerError, err.Error())
}
data := []struct {
URL string `json:"url"`
ContentType string `json:"content_type"`
FailureURL string `json:"failure_url"`
Events []string `json:"events"`
Matcher struct {
DeviceId []string `json:"device_id"`
} `json:"matcher,omitempty"`
Until time.Time `json:"until"`
LastRegistration string `json:"registered_from_address"`
}{}
for _, hook := range items {
data = append(data, struct {
URL string `json:"url"`
ContentType string `json:"content_type"`
FailureURL string `json:"failure_url"`
Events []string `json:"events"`
Matcher struct {
DeviceId []string `json:"device_id"`
} `json:"matcher,omitempty"`
Until time.Time `json:"until"`

LastRegistration string `json:"registered_from_address"`
}{URL: hook.Config.URL, ContentType: hook.Config.ContentType, FailureURL: hook.FailureURL, Events: hook.Events, Matcher: hook.Matcher, Until: hook.Until, LastRegistration: hook.Address})
}

if msg, err := json.Marshal(data); err != nil {
jsonResponse(rw, http.StatusInternalServerError, err.Error())
} else {
rw.Header().Set("Content-Type", "application/json")
rw.Write(msg)
}
}

// update is an api call to processes a listenener registration for adding and updating
func (r *Registry) UpdateRegistry(rw http.ResponseWriter, req *http.Request) {
payload, err := ioutil.ReadAll(req.Body)
req.Body.Close()

w, err := webhook.NewW(payload, req.RemoteAddr)
if err != nil {
jsonResponse(rw, http.StatusBadRequest, err.Error())
return
}

err = r.hookStore.Push(*w)
if err != nil {
jsonResponse(rw, http.StatusInternalServerError, err.Error())
return
}

jsonResponse(rw, http.StatusOK, "Success")
}