Skip to content

Commit

Permalink
fixed consul xresolver, service discovery and improved logging (#490)
Browse files Browse the repository at this point in the history
* fixed consul xresolver and improved logging

* adding pr url

* fixed consul service discovery to pass QueryOptions
  • Loading branch information
kcajmagic authored Jun 11, 2020
1 parent f07406b commit 64a391c
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 42 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

- fixed `ConsulWatch` in xresolver by storing and watching the correct part of the url [#490](https://github.com/xmidt-org/webpa-common/pull/490)
- fixed consul service discovery to pass QueryOptions [#490](https://github.com/xmidt-org/webpa-common/pull/490)

## [v1.10.1]
### Fixed
- Device metadata didn't return a read-only view of its map claims resulting in data races [483](https://github.com/xmidt-org/webpa-common/pull/483)
- Device metadata didn't return a read-only view of its map claims resulting in data races [#483](https://github.com/xmidt-org/webpa-common/pull/483)


## [v1.10.0]
Expand Down
13 changes: 7 additions & 6 deletions service/consul/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ func NewInstancer(o InstancerOptions) sd.Instancer {
}

i := &instancer{
client: o.Client,
logger: log.With(o.Logger, "service", o.Service, "tags", fmt.Sprint(o.Tags), "passingOnly", o.PassingOnly, "datacenter", o.QueryOptions.Datacenter),
service: o.Service,
passingOnly: o.PassingOnly,
stop: make(chan struct{}),
registry: make(map[chan<- sd.Event]bool),
client: o.Client,
logger: log.With(o.Logger, "service", o.Service, "tags", fmt.Sprint(o.Tags), "passingOnly", o.PassingOnly, "datacenter", o.QueryOptions.Datacenter),
service: o.Service,
passingOnly: o.PassingOnly,
queryOptions: o.QueryOptions,
stop: make(chan struct{}),
registry: make(map[chan<- sd.Event]bool),
}

if len(o.Tags) > 0 {
Expand Down
43 changes: 24 additions & 19 deletions xresolver/consul/consullistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"context"
"errors"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/service/monitor"
"github.com/xmidt-org/webpa-common/xresolver"
"net/url"
"regexp"
)

var find = regexp.MustCompile("(.*)" + regexp.QuoteMeta("[") + "(.*)" + regexp.QuoteMeta("]") + regexp.QuoteMeta("{") + "(.*)" + regexp.QuoteMeta("}"))

type Options struct {
// Watch is what to url to match with the consul service
// exp. { "beta.google.com" : "caduceus" }
// exp. { "http://beta.google.com:8080/notify" : "caduceus" }
Watch map[string]string `json:"watch"`

Logger log.Logger `json:"-"`
Expand All @@ -23,8 +25,6 @@ type Options struct {
type ConsulWatcher struct {
logger log.Logger

config Options

watch map[string]string
balancers map[string]*xresolver.RoundRobin
}
Expand All @@ -35,10 +35,7 @@ func NewConsulWatcher(o Options) *ConsulWatcher {
}

watcher := &ConsulWatcher{
logger: logging.Debug(o.Logger),

config: o,

logger: log.WithPrefix(o.Logger, "component", "consulWatcher"),
balancers: make(map[string]*xresolver.RoundRobin),
watch: make(map[string]string),
}
Expand All @@ -53,7 +50,7 @@ func NewConsulWatcher(o Options) *ConsulWatcher {
}

func (watcher *ConsulWatcher) MonitorEvent(e monitor.Event) {
logging.Debug(watcher.logger, logging.MessageKey(), "received update route event", "event", e)
log.WithPrefix(watcher.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "received update route event", "event", e)

// update balancers
str := find.FindStringSubmatch(e.Key)
Expand All @@ -63,35 +60,43 @@ func (watcher *ConsulWatcher) MonitorEvent(e monitor.Event) {

service := str[1]
if rr, found := watcher.balancers[service]; found {
routes := make([]xresolver.Route, len(e.Instances))
for index, instance := range e.Instances {
routes := make([]xresolver.Route, 0)
for _, instance := range e.Instances {
// find records
route, err := xresolver.CreateRoute(instance)
if err != nil {
logging.Error(watcher.logger, logging.MessageKey(), "failed to create route", logging.MessageKey(), err, "instance", instance)
log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log(logging.MessageKey(), "failed to create route", logging.MessageKey(), err, "instance", instance)
continue
}
routes[index] = route
routes = append(routes, route)
}
rr.Update(routes)
logging.Info(watcher.logger, logging.MessageKey(), "updating routes", "service", service, "new-routes", routes)
log.WithPrefix(watcher.logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "updating routes", "service", service, "new-routes", routes)
}
}

func (watcher *ConsulWatcher) WatchService(url string, service string) {
if _, found := watcher.watch[url]; !found {
watcher.watch[url] = service
func (watcher *ConsulWatcher) WatchService(watchURL string, service string) {
parsedURL, err := url.Parse(watchURL)
if err != nil {
log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log("failed to parse url", "url", watchURL, "service", service)
return
}
log.WithPrefix(watcher.logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "Watching Service", "url", watchURL, "service", service, "host", parsedURL.Hostname())

if _, found := watcher.watch[parsedURL.Hostname()]; !found {
watcher.watch[parsedURL.Hostname()] = service
if _, found := watcher.balancers[service]; !found {
watcher.balancers[service] = xresolver.NewRoundRobinBalancer()
}
}
}

func (watcher *ConsulWatcher) LookupRoutes(ctx context.Context, host string) ([]xresolver.Route, error) {
if _, found := watcher.config.Watch[host]; !found {
if _, found := watcher.watch[host]; !found {
log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log("watch not found ", "host", host)
return []xresolver.Route{}, errors.New(host + " is not part of the consul listener")
}
records, err := watcher.balancers[watcher.config.Watch[host]].Get()
logging.Debug(watcher.logger, logging.MessageKey(), "looking up routes", "routes", records, logging.ErrorKey(), err)
records, err := watcher.balancers[watcher.watch[host]].Get()
log.WithPrefix(watcher.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "looking up routes", "routes", records, logging.ErrorKey(), err)
return records, err
}
12 changes: 7 additions & 5 deletions xresolver/consul/consullistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consul
import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/service/monitor"
"github.com/xmidt-org/webpa-common/xresolver"
"io/ioutil"
Expand All @@ -19,8 +20,9 @@ func TestConsulWatcher(t *testing.T) {
customport := "8080"
service := "custom"
expectedBody := "Hello World\n"
fallBackURL := "http://" + net.JoinHostPort(customhost, customport)

//customInstance := "custom.host-A.com"
// customInstance := "custom.host-A.com"

serverA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "a"+expectedBody)
Expand All @@ -33,7 +35,7 @@ func TestConsulWatcher(t *testing.T) {
defer serverB.Close()

watcher := NewConsulWatcher(Options{
Watch: map[string]string{customhost: service},
Watch: map[string]string{fallBackURL: service},
})

// note: MonitorEvent is Listen interface in the monitor package
Expand All @@ -44,13 +46,13 @@ func TestConsulWatcher(t *testing.T) {

client := &http.Client{
Transport: &http.Transport{
DialContext: xresolver.NewResolver(xresolver.DefaultDialer, watcher).DialContext,
DialContext: xresolver.NewResolver(xresolver.DefaultDialer, logging.NewTestLogger(nil, t), watcher).DialContext,
// note: DisableKeepAlives is required so when we do the request again we don't reuse the same connection.
DisableKeepAlives: true,
},
}

req, err := http.NewRequest("GET", "http://"+net.JoinHostPort(customhost, customport), nil)
req, err := http.NewRequest("GET", fallBackURL, nil)
assert.NoError(err)

res, err := client.Do(req)
Expand All @@ -64,7 +66,7 @@ func TestConsulWatcher(t *testing.T) {
assert.Equal("a"+expectedBody, string(body))
}

req, err = http.NewRequest("GET", "http://"+net.JoinHostPort(customhost, customport), nil)
req, err = http.NewRequest("GET", fallBackURL, nil)
assert.NoError(err)

res, err = client.Do(req)
Expand Down
27 changes: 19 additions & 8 deletions xresolver/xresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package xresolver
import (
"context"
"errors"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/xmidt-org/webpa-common/logging"

"net"
"strconv"
"sync"
Expand All @@ -16,12 +20,17 @@ type resolver struct {
resolvers map[Lookup]bool
lock sync.RWMutex
dialer net.Dialer
logger log.Logger
}

func NewResolver(dialer net.Dialer, lookups ...Lookup) Resolver {
func NewResolver(dialer net.Dialer, logger log.Logger, lookups ...Lookup) Resolver {
if logger == nil {
logger = logging.DefaultLogger()
}
r := &resolver{
resolvers: make(map[Lookup]bool),
dialer: dialer,
logger: log.WithPrefix(logger, "component", "xresolver"),
}

for _, lookup := range lookups {
Expand Down Expand Up @@ -70,7 +79,7 @@ func (resolve *resolver) getRoutes(ctx context.Context, host string) []Route {
return routes
}

func (resolve *resolver) DialContext(ctx context.Context, network, addr string) (con net.Conn, err error) {
func (resolve *resolver) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
Expand All @@ -84,25 +93,27 @@ func (resolve *resolver) DialContext(ctx context.Context, network, addr string)
routes := resolve.getRoutes(ctx, host)

// generate Conn or err from records
con, err = resolve.createConnection(routes, network, port)
con, route, err := resolve.createConnection(routes, network, port)
if err == nil {
return
log.WithPrefix(resolve.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "successfully created connection using xresolver", "new-route", route.String(), "addr", addr)
return con, err
}

log.WithPrefix(resolve.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "failed to create connection with other routes using original address", "addr", addr)
// if no connection, create using the default dialer
return resolve.dialer.DialContext(ctx, network, addr)
}

func (resolve *resolver) createConnection(routes []Route, network, port string) (con net.Conn, err error) {
func (resolve *resolver) createConnection(routes []Route, network, port string) (net.Conn, Route, error) {
for _, route := range routes {
portUsed := port
if route.Port != 0 {
portUsed = strconv.Itoa(route.Port)
}
con, err = resolve.dialer.Dial(network, net.JoinHostPort(route.Host, portUsed))
con, err := resolve.dialer.Dial(network, net.JoinHostPort(route.Host, portUsed))
if err == nil {
return
return con, route, err
}
}
return nil, errors.New("failed to create connection from routes")
return nil, Route{}, errors.New("failed to create connection from routes")
}
5 changes: 3 additions & 2 deletions xresolver/xresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/webpa-common/logging"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -21,7 +22,7 @@ func TestClient(t *testing.T) {

client := &http.Client{
Transport: &http.Transport{
DialContext: NewResolver(DefaultDialer).DialContext,
DialContext: NewResolver(DefaultDialer, logging.NewTestLogger(nil, t)).DialContext,
},
}

Expand Down Expand Up @@ -62,7 +63,7 @@ func TestClientWithResolver(t *testing.T) {

fakeLookUp := new(mockLookUp)
fakeLookUp.On("LookupRoutes", mock.Anything, customhost).Return([]Route{route}, nil)
r := NewResolver(DefaultDialer, fakeLookUp)
r := NewResolver(DefaultDialer, logging.NewTestLogger(nil, t), fakeLookUp)

client := &http.Client{
Transport: &http.Transport{
Expand Down

0 comments on commit 64a391c

Please sign in to comment.