Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener interface draft solution 2 #462

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package main

import "net/http"
Expand All @@ -7,3 +9,14 @@ import "net/http"
type Client interface {
Do(*http.Request) (*http.Response, error)
}

func nopClient(next Client) Client {
return next
}

// DoerFunc implements Client
type doerFunc func(*http.Request) (*http.Response, error)

func (d doerFunc) Do(req *http.Request) (*http.Response, error) {
return d(req)
}
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ type Service struct {
}
type Consul struct {
Client ConsulClient
Registrations []Registration
Registrations []ConsulRegistration
DisableGenerateId bool
}
type ConsulClient struct {
Address string
Scheme string
WaitTime string
}
type Registration struct {
type ConsulRegistration struct {
Id string
Name string
Tags []string
Expand Down
11 changes: 0 additions & 11 deletions httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ var (
errNilHistogram = errors.New("histogram cannot be nil")
)

func nopClient(next Client) Client {
return next
}

// DoerFunc implements HTTPClient
type doerFunc func(*http.Request) (*http.Response, error)

func (d doerFunc) Do(req *http.Request) (*http.Response, error) {
return d(req)
}

type metricWrapper struct {
now func() time.Time
queryLatency prometheus.ObserverVec
Expand Down
187 changes: 185 additions & 2 deletions listenerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@
package main

import (
"container/ring"
"errors"
"fmt"
"math/rand"
"net/url"
"regexp"
"time"

webhook "github.com/xmidt-org/webhook-schema"
"go.uber.org/zap"
)

//This is a stub for the webhook and kafka listeners. This will be removed once the webhook-schema configuration is approved

type ListenerStub struct {
PartnerIds []string
Webhook webhook.Registration
PartnerIds []string
Registration Registration
}

// Webhook is a substructure with data related to event delivery.
Expand Down Expand Up @@ -139,3 +146,179 @@ type RegistrationV2 struct {
// TODO: list of supported formats
Expires time.Time `json:"expires"`
}

// Deprecated: This structure should only be used for backwards compatibility
// matching. Use RegistrationV2 instead.
// RegistrationV1 is a special struct for unmarshaling a webhook as part of a webhook registration request.
type RegistrationV1 struct {
// Address is the subscription request origin HTTP Address.
Address string `json:"registered_from_address"`

// Config contains data to inform how events are delivered.
Config DeliveryConfig `json:"config"`

// FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow.
// Optional, set to "" to disable notifications.
FailureURL string `json:"failure_url"`

// Events is the list of regular expressions to match an event type against.
Events []string `json:"events"`

// Matcher type contains values to match against the metadata.
Matcher MetadataMatcherConfig `json:"matcher,omitempty"`

// Duration describes how long the subscription lasts once added.
Duration webhook.CustomDuration `json:"duration"`

// Until describes the time this subscription expires.
Until time.Time `json:"until"`
}

// MetadataMatcherConfig is Webhook substructure with config to match event metadata.
type MetadataMatcherConfig struct {
// DeviceID is the list of regular expressions to match device id type against.
DeviceID []string `json:"device_id"`
}

// Deprecated: This substructure should only be used for backwards compatibility
// matching. Use Webhook instead.
// DeliveryConfig is a Webhook substructure with data related to event delivery.
type DeliveryConfig struct {
// URL is the HTTP URL to deliver messages to.
ReceiverURL string `json:"url"`

// ContentType is content type value to set WRP messages to (unless already specified in the WRP).
ContentType string `json:"content_type"`

// Secret is the string value for the SHA1 HMAC.
// (Optional, set to "" to disable behavior).
Secret string `json:"secret,omitempty"`

// AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL.
AlternativeURLs []string `json:"alt_urls,omitempty"`
}

type Registration interface {
UpdateSender(*SinkSender) error
GetId() string
}

func (v1 *RegistrationV1) UpdateSender(ss *SinkSender) (err error) {
l := ss.logger.With(zap.String("webhook.address", v1.Address))

// Validate the failure URL, if present
if err = validateFailureURL(v1.FailureURL); err != nil {
return err
}
// Create and validate the event regex objects
// nolint:prealloc
var events []*regexp.Regexp
for _, event := range v1.Events {
var re *regexp.Regexp
if re, err = regexp.Compile(event); err != nil {
return err
}
events = append(events, re)
}
if len(events) < 1 {
err = errors.New("events must not be empty")
return err
}

// Create the matcher regex objects
matcher := []*regexp.Regexp{}
for _, item := range v1.Matcher.DeviceID {
if item == ".*" {
// Match everything - skip the filtering
matcher = []*regexp.Regexp{}
break
}

var re *regexp.Regexp
var err error
if re, err = regexp.Compile(item); nil != err {
err = fmt.Errorf("invalid matcher item: '%s'", item)
return err
}
matcher = append(matcher, re)
}

// Validate the various urls
urlCount := len(v1.Config.AlternativeURLs)
for i := 0; i < urlCount; i++ {
_, err := url.Parse(v1.Config.AlternativeURLs[i])
if err != nil {
l.Error("failed to update url", zap.Any("url", v1.Config.AlternativeURLs[i]), zap.Error(err))
return err
}
}

ss.renewalTimeGauge.Set(float64(time.Now().Unix()))

// write/update obs
ss.mutex.Lock()

ss.id = v1.Config.ReceiverURL
ss.logger = l
ss.deliverUntil = v1.Until
ss.deliverUntilGauge.Set(float64(ss.deliverUntil.Unix()))

ss.events = events
ss.deliveryRetryMaxGauge.Set(float64(ss.deliveryRetries))

// if matcher list is empty set it nil for Queue() logic
ss.matcher = nil
if 0 < len(matcher) {
ss.matcher = matcher
}

if urlCount == 0 {
ss.urls = ring.New(1)
ss.urls.Value = ss.id
} else {
r := ring.New(urlCount)
for i := 0; i < urlCount; i++ {

r.Value = v1.Config.AlternativeURLs[i]
r = r.Next()
}
ss.urls = r
}
// Randomize where we start so all the instances don't synchronize
r := rand.New(rand.NewSource(time.Now().UnixNano()))
offset := r.Intn(ss.urls.Len())
for 0 < offset {
ss.urls = ss.urls.Next()
offset--
}
// Update this here in case we make this configurable later
ss.maxWorkersGauge.Set(float64(ss.maxWorkers))

ss.mutex.Unlock()

return
}

func (v1 *RegistrationV1) GetId() string {
return v1.Config.ReceiverURL
}

func (v2 *RegistrationV2) UpdateSender(ss *SinkSender) (err error) {
// Validate the failure URL, if present
if err = validateFailureURL(v2.FailureURL); err != nil {
return err
}

return
}
func (v2 *RegistrationV2) GetId() string {
return v2.CanonicalName
}

func validateFailureURL(fUrl string) (err error) {
if fUrl == "" {
_, err = url.ParseRequestURI(fUrl)
return
}
return
}
Loading
Loading