From 64a391cb4d8d964d928e732356aa0c130ccc0f3b Mon Sep 17 00:00:00 2001 From: Jack Murdock Date: Thu, 11 Jun 2020 16:19:34 -0700 Subject: [PATCH] fixed consul xresolver, service discovery and improved logging (#490) * fixed consul xresolver and improved logging * adding pr url * fixed consul service discovery to pass QueryOptions --- CHANGELOG.md | 5 +-- service/consul/instancer.go | 13 ++++---- xresolver/consul/consullistener.go | 43 ++++++++++++++----------- xresolver/consul/consullistener_test.go | 12 ++++--- xresolver/xresolver.go | 27 +++++++++++----- xresolver/xresolver_test.go | 5 +-- 6 files changed, 63 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a443b90b..3cfafda5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] - +- fixed `ConsulWatch` in xresolver by storing and watching the correct part of the url [#490](https://github.com/xmidt-org/webpa-common/pull/490) +- fixed consul service discovery to pass QueryOptions [#490](https://github.com/xmidt-org/webpa-common/pull/490) ## [v1.10.1] ### Fixed -- Device metadata didn't return a read-only view of its map claims resulting in data races [483](https://github.com/xmidt-org/webpa-common/pull/483) +- Device metadata didn't return a read-only view of its map claims resulting in data races [#483](https://github.com/xmidt-org/webpa-common/pull/483) ## [v1.10.0] diff --git a/service/consul/instancer.go b/service/consul/instancer.go index ffbeb76b..ddf79436 100644 --- a/service/consul/instancer.go +++ b/service/consul/instancer.go @@ -37,12 +37,13 @@ func NewInstancer(o InstancerOptions) sd.Instancer { } i := &instancer{ - client: o.Client, - logger: log.With(o.Logger, "service", o.Service, "tags", fmt.Sprint(o.Tags), "passingOnly", o.PassingOnly, "datacenter", o.QueryOptions.Datacenter), - service: o.Service, - passingOnly: o.PassingOnly, - stop: make(chan struct{}), - registry: make(map[chan<- sd.Event]bool), + client: o.Client, + logger: log.With(o.Logger, "service", o.Service, "tags", fmt.Sprint(o.Tags), "passingOnly", o.PassingOnly, "datacenter", o.QueryOptions.Datacenter), + service: o.Service, + passingOnly: o.PassingOnly, + queryOptions: o.QueryOptions, + stop: make(chan struct{}), + registry: make(map[chan<- sd.Event]bool), } if len(o.Tags) > 0 { diff --git a/xresolver/consul/consullistener.go b/xresolver/consul/consullistener.go index 03f6ab45..076e4e9f 100644 --- a/xresolver/consul/consullistener.go +++ b/xresolver/consul/consullistener.go @@ -4,9 +4,11 @@ import ( "context" "errors" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/xmidt-org/webpa-common/logging" "github.com/xmidt-org/webpa-common/service/monitor" "github.com/xmidt-org/webpa-common/xresolver" + "net/url" "regexp" ) @@ -14,7 +16,7 @@ var find = regexp.MustCompile("(.*)" + regexp.QuoteMeta("[") + "(.*)" + regexp.Q type Options struct { // Watch is what to url to match with the consul service - // exp. { "beta.google.com" : "caduceus" } + // exp. { "http://beta.google.com:8080/notify" : "caduceus" } Watch map[string]string `json:"watch"` Logger log.Logger `json:"-"` @@ -23,8 +25,6 @@ type Options struct { type ConsulWatcher struct { logger log.Logger - config Options - watch map[string]string balancers map[string]*xresolver.RoundRobin } @@ -35,10 +35,7 @@ func NewConsulWatcher(o Options) *ConsulWatcher { } watcher := &ConsulWatcher{ - logger: logging.Debug(o.Logger), - - config: o, - + logger: log.WithPrefix(o.Logger, "component", "consulWatcher"), balancers: make(map[string]*xresolver.RoundRobin), watch: make(map[string]string), } @@ -53,7 +50,7 @@ func NewConsulWatcher(o Options) *ConsulWatcher { } func (watcher *ConsulWatcher) MonitorEvent(e monitor.Event) { - logging.Debug(watcher.logger, logging.MessageKey(), "received update route event", "event", e) + log.WithPrefix(watcher.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "received update route event", "event", e) // update balancers str := find.FindStringSubmatch(e.Key) @@ -63,24 +60,31 @@ func (watcher *ConsulWatcher) MonitorEvent(e monitor.Event) { service := str[1] if rr, found := watcher.balancers[service]; found { - routes := make([]xresolver.Route, len(e.Instances)) - for index, instance := range e.Instances { + routes := make([]xresolver.Route, 0) + for _, instance := range e.Instances { // find records route, err := xresolver.CreateRoute(instance) if err != nil { - logging.Error(watcher.logger, logging.MessageKey(), "failed to create route", logging.MessageKey(), err, "instance", instance) + log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log(logging.MessageKey(), "failed to create route", logging.MessageKey(), err, "instance", instance) continue } - routes[index] = route + routes = append(routes, route) } rr.Update(routes) - logging.Info(watcher.logger, logging.MessageKey(), "updating routes", "service", service, "new-routes", routes) + log.WithPrefix(watcher.logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "updating routes", "service", service, "new-routes", routes) } } -func (watcher *ConsulWatcher) WatchService(url string, service string) { - if _, found := watcher.watch[url]; !found { - watcher.watch[url] = service +func (watcher *ConsulWatcher) WatchService(watchURL string, service string) { + parsedURL, err := url.Parse(watchURL) + if err != nil { + log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log("failed to parse url", "url", watchURL, "service", service) + return + } + log.WithPrefix(watcher.logger, level.Key(), level.InfoValue()).Log(logging.MessageKey(), "Watching Service", "url", watchURL, "service", service, "host", parsedURL.Hostname()) + + if _, found := watcher.watch[parsedURL.Hostname()]; !found { + watcher.watch[parsedURL.Hostname()] = service if _, found := watcher.balancers[service]; !found { watcher.balancers[service] = xresolver.NewRoundRobinBalancer() } @@ -88,10 +92,11 @@ func (watcher *ConsulWatcher) WatchService(url string, service string) { } func (watcher *ConsulWatcher) LookupRoutes(ctx context.Context, host string) ([]xresolver.Route, error) { - if _, found := watcher.config.Watch[host]; !found { + if _, found := watcher.watch[host]; !found { + log.WithPrefix(watcher.logger, level.Key(), level.ErrorValue()).Log("watch not found ", "host", host) return []xresolver.Route{}, errors.New(host + " is not part of the consul listener") } - records, err := watcher.balancers[watcher.config.Watch[host]].Get() - logging.Debug(watcher.logger, logging.MessageKey(), "looking up routes", "routes", records, logging.ErrorKey(), err) + records, err := watcher.balancers[watcher.watch[host]].Get() + log.WithPrefix(watcher.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "looking up routes", "routes", records, logging.ErrorKey(), err) return records, err } diff --git a/xresolver/consul/consullistener_test.go b/xresolver/consul/consullistener_test.go index 59111768..7e458084 100644 --- a/xresolver/consul/consullistener_test.go +++ b/xresolver/consul/consullistener_test.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "github.com/stretchr/testify/assert" + "github.com/xmidt-org/webpa-common/logging" "github.com/xmidt-org/webpa-common/service/monitor" "github.com/xmidt-org/webpa-common/xresolver" "io/ioutil" @@ -19,8 +20,9 @@ func TestConsulWatcher(t *testing.T) { customport := "8080" service := "custom" expectedBody := "Hello World\n" + fallBackURL := "http://" + net.JoinHostPort(customhost, customport) - //customInstance := "custom.host-A.com" + // customInstance := "custom.host-A.com" serverA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "a"+expectedBody) @@ -33,7 +35,7 @@ func TestConsulWatcher(t *testing.T) { defer serverB.Close() watcher := NewConsulWatcher(Options{ - Watch: map[string]string{customhost: service}, + Watch: map[string]string{fallBackURL: service}, }) // note: MonitorEvent is Listen interface in the monitor package @@ -44,13 +46,13 @@ func TestConsulWatcher(t *testing.T) { client := &http.Client{ Transport: &http.Transport{ - DialContext: xresolver.NewResolver(xresolver.DefaultDialer, watcher).DialContext, + DialContext: xresolver.NewResolver(xresolver.DefaultDialer, logging.NewTestLogger(nil, t), watcher).DialContext, // note: DisableKeepAlives is required so when we do the request again we don't reuse the same connection. DisableKeepAlives: true, }, } - req, err := http.NewRequest("GET", "http://"+net.JoinHostPort(customhost, customport), nil) + req, err := http.NewRequest("GET", fallBackURL, nil) assert.NoError(err) res, err := client.Do(req) @@ -64,7 +66,7 @@ func TestConsulWatcher(t *testing.T) { assert.Equal("a"+expectedBody, string(body)) } - req, err = http.NewRequest("GET", "http://"+net.JoinHostPort(customhost, customport), nil) + req, err = http.NewRequest("GET", fallBackURL, nil) assert.NoError(err) res, err = client.Do(req) diff --git a/xresolver/xresolver.go b/xresolver/xresolver.go index 578fac1d..7d0f5ad4 100644 --- a/xresolver/xresolver.go +++ b/xresolver/xresolver.go @@ -3,6 +3,10 @@ package xresolver import ( "context" "errors" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/xmidt-org/webpa-common/logging" + "net" "strconv" "sync" @@ -16,12 +20,17 @@ type resolver struct { resolvers map[Lookup]bool lock sync.RWMutex dialer net.Dialer + logger log.Logger } -func NewResolver(dialer net.Dialer, lookups ...Lookup) Resolver { +func NewResolver(dialer net.Dialer, logger log.Logger, lookups ...Lookup) Resolver { + if logger == nil { + logger = logging.DefaultLogger() + } r := &resolver{ resolvers: make(map[Lookup]bool), dialer: dialer, + logger: log.WithPrefix(logger, "component", "xresolver"), } for _, lookup := range lookups { @@ -70,7 +79,7 @@ func (resolve *resolver) getRoutes(ctx context.Context, host string) []Route { return routes } -func (resolve *resolver) DialContext(ctx context.Context, network, addr string) (con net.Conn, err error) { +func (resolve *resolver) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -84,25 +93,27 @@ func (resolve *resolver) DialContext(ctx context.Context, network, addr string) routes := resolve.getRoutes(ctx, host) // generate Conn or err from records - con, err = resolve.createConnection(routes, network, port) + con, route, err := resolve.createConnection(routes, network, port) if err == nil { - return + log.WithPrefix(resolve.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "successfully created connection using xresolver", "new-route", route.String(), "addr", addr) + return con, err } + log.WithPrefix(resolve.logger, level.Key(), level.DebugValue()).Log(logging.MessageKey(), "failed to create connection with other routes using original address", "addr", addr) // if no connection, create using the default dialer return resolve.dialer.DialContext(ctx, network, addr) } -func (resolve *resolver) createConnection(routes []Route, network, port string) (con net.Conn, err error) { +func (resolve *resolver) createConnection(routes []Route, network, port string) (net.Conn, Route, error) { for _, route := range routes { portUsed := port if route.Port != 0 { portUsed = strconv.Itoa(route.Port) } - con, err = resolve.dialer.Dial(network, net.JoinHostPort(route.Host, portUsed)) + con, err := resolve.dialer.Dial(network, net.JoinHostPort(route.Host, portUsed)) if err == nil { - return + return con, route, err } } - return nil, errors.New("failed to create connection from routes") + return nil, Route{}, errors.New("failed to create connection from routes") } diff --git a/xresolver/xresolver_test.go b/xresolver/xresolver_test.go index bb096ad0..20de1a74 100644 --- a/xresolver/xresolver_test.go +++ b/xresolver/xresolver_test.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/xmidt-org/webpa-common/logging" "io/ioutil" "net/http" "net/http/httptest" @@ -21,7 +22,7 @@ func TestClient(t *testing.T) { client := &http.Client{ Transport: &http.Transport{ - DialContext: NewResolver(DefaultDialer).DialContext, + DialContext: NewResolver(DefaultDialer, logging.NewTestLogger(nil, t)).DialContext, }, } @@ -62,7 +63,7 @@ func TestClientWithResolver(t *testing.T) { fakeLookUp := new(mockLookUp) fakeLookUp.On("LookupRoutes", mock.Anything, customhost).Return([]Route{route}, nil) - r := NewResolver(DefaultDialer, fakeLookUp) + r := NewResolver(DefaultDialer, logging.NewTestLogger(nil, t), fakeLookUp) client := &http.Client{ Transport: &http.Transport{