Skip to content

Commit

Permalink
Merge pull request #19 from Comcast/feature/webhookIntegration
Browse files Browse the repository at this point in the history
Feature/webhook integration
  • Loading branch information
schmidtw authored Jun 12, 2017
2 parents 91e1ad3 + bf18203 commit 7b20a31
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 202 deletions.
32 changes: 30 additions & 2 deletions src/caduceus/caduceus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"crypto/tls"
"fmt"
"github.com/Comcast/webpa-common/concurrent"
"github.com/Comcast/webpa-common/handler"
"github.com/Comcast/webpa-common/secure"
"github.com/Comcast/webpa-common/secure/handler"
"github.com/Comcast/webpa-common/server"
"github.com/Comcast/webpa-common/webhook"
"github.com/gorilla/mux"
"github.com/justinas/alice"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"net/http"
"net/url"
"os"
"os/signal"
"time"
Expand Down Expand Up @@ -55,7 +57,7 @@ func caduceus(arguments []string) int {
Duration: caduceusConfig.ProfilerDuration,
QueueSize: caduceusConfig.ProfilerQueueSize,
}

tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
timeout := time.Duration(caduceusConfig.SenderClientTimeout) * time.Second

Expand Down Expand Up @@ -116,6 +118,32 @@ func caduceus(arguments []string) int {
mux.Handle("/api/v1/run", caduceusHandler.Then(serverWrapper))
mux.Handle("/api/v1/profile", caduceusHandler.Then(profileWrapper))



webhookFactory, err := webhook.NewFactory(v)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating new webhook factory: %s\n", err)
return 1
}

_, webhookHandler := webhookFactory.NewListAndHandler()
webhookRegistry := webhook.NewRegistry(nil, webhookFactory.PublishMessage)
webhookFactory.SetList( webhookRegistry )

// register webhook end points for api
mux.Handle("/api/v1/hook", caduceusHandler.ThenFunc(webhookRegistry.UpdateRegistry))
mux.Handle("/api/v1/hooks", caduceusHandler.ThenFunc(webhookRegistry.GetRegistry))

selfURL := &url.URL{
Scheme: "https",
Host: v.GetString("fqdn") + v.GetString("primary.address"),
}

webhookFactory.Initialize(mux, selfURL, webhookHandler, logger)
webhookFactory.PrepareAndStart()



caduceusHealth := &CaduceusHealth{}
var runnable concurrent.Runnable

Expand Down
55 changes: 23 additions & 32 deletions src/caduceus/outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type CaduceusOutboundSender struct {
client *http.Client
secret []byte
events []*regexp.Regexp
matcher map[string][]*regexp.Regexp
matcher []*regexp.Regexp
queueSize int
queue chan outboundRequest
profiler ServerProfiler
Expand All @@ -101,7 +101,7 @@ type CaduceusOutboundSender struct {

// New creates a new OutboundSender object from the factory, or returns an error.
func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {
if _, err = url.ParseRequestURI(osf.Listener.URL); nil != err {
if _, err = url.ParseRequestURI(osf.Listener.Config.URL); nil != err {
return
}

Expand Down Expand Up @@ -136,8 +136,8 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {
},
}

if "" != osf.Listener.Secret {
caduceusOutboundSender.secret = []byte(osf.Listener.Secret)
if "" != osf.Listener.Config.Secret {
caduceusOutboundSender.secret = []byte(osf.Listener.Config.Secret)
}

if "" != osf.Listener.FailureURL {
Expand Down Expand Up @@ -168,32 +168,21 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {
err = errors.New("Events must not be empty.")
return
}

// Create the matcher regex objects
if nil != osf.Listener.Matchers {
caduceusOutboundSender.matcher = make(map[string][]*regexp.Regexp)
for key, value := range osf.Listener.Matchers {
var list []*regexp.Regexp
for _, item := range value {
if ".*" == item {
// Match everything - skip the filtering
caduceusOutboundSender.matcher = nil
break
}
var re *regexp.Regexp
if re, err = regexp.Compile(item); nil != err {
err = fmt.Errorf("Invalid matcher item: Matcher['%s'] = '%s'", value, item)
return
}
list = append(list, re)
}

if nil == caduceusOutboundSender.matcher {
break
}

caduceusOutboundSender.matcher[key] = list
for _, item := range osf.Listener.Matcher.DeviceId {
if ".*" == item {
// Match everything - skip the filtering
caduceusOutboundSender.matcher = nil
break
}

var re *regexp.Regexp
if re, err = regexp.Compile(item); nil != err {
err = fmt.Errorf("Invalid matcher item: '%s'", item)
return
}
caduceusOutboundSender.matcher = append(caduceusOutboundSender.matcher, re)
}

caduceusOutboundSender.wg.Add(osf.NumWorkers)
Expand Down Expand Up @@ -261,7 +250,7 @@ func (obs *CaduceusOutboundSender) QueueJSON(req CaduceusRequest,
if eventRegex.MatchString(eventType) {
matchDevice := (nil == obs.matcher)
if nil != obs.matcher {
for _, deviceRegex := range obs.matcher["device_id"] {
for _, deviceRegex := range obs.matcher {
if deviceRegex.MatchString(deviceID) {
matchDevice = true
break
Expand Down Expand Up @@ -305,13 +294,14 @@ func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest, metaData map[st
if eventRegex.MatchString(eventType) {
matchDevice := (nil == obs.matcher)
if nil != obs.matcher {
for _, deviceRegex := range obs.matcher["device_id"] {
for _, deviceRegex := range obs.matcher {
if deviceRegex.MatchString(deviceID) {
matchDevice = true
break
}
}
}
/*
// if the device id matches then we want to look through all the metadata
// and make sure that the obs metadata matches the metadata provided
if matchDevice {
Expand All @@ -333,6 +323,7 @@ func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest, metaData map[st
}
}
}
*/
if matchDevice {
if len(obs.queue) < obs.queueSize {
outboundReq := outboundRequest{req: req,
Expand Down Expand Up @@ -375,10 +366,10 @@ func (obs *CaduceusOutboundSender) worker(id int) {
now := time.Now()
if now.Before(deliverUntil) && now.After(dropUntil) {
payload := bytes.NewReader(work.req.Payload)
req, err := http.NewRequest("POST", obs.listener.URL, payload)
req, err := http.NewRequest("POST", obs.listener.Config.URL, payload)
if nil != err {
// Report drop
obs.logger.Error("http.NewRequest(\"POST\", '%s', payload) failed: %s", obs.listener.URL, err)
obs.logger.Error("http.NewRequest(\"POST\", '%s', payload) failed: %s", obs.listener.Config.URL, err)
} else {
req.Header.Set("Content-Type", work.contentType)
req.Header.Set("X-Webpa-Event", work.event)
Expand Down
Loading

0 comments on commit 7b20a31

Please sign in to comment.