From 236aa862e603e18093fb0c66ffcfdd106a9e595d Mon Sep 17 00:00:00 2001 From: jgheewala Date: Tue, 5 Oct 2021 18:40:21 -0400 Subject: [PATCH] add support to set up watcher via update function If a user of ClientConfigChangerSink doesn't want auth watcher, then currently there is no way to do so. Hence, introduced a new API SetupSinkClientChanger which will accept init ClientConfigChangerSink with the authToken that is provided. If the users later on wants to update auth token it can be done using API AuthUpdate. Above changes give freedom to caller for not setting auth watcher which in some cases might not be required at all --- clientcfg/client.go | 63 ++++++++++++++++++++++++++++++---------- clientcfg/client_test.go | 27 +++++++++++++++++ 2 files changed, 75 insertions(+), 15 deletions(-) diff --git a/clientcfg/client.go b/clientcfg/client.go index 7a0dcf0..3cd1656 100644 --- a/clientcfg/client.go +++ b/clientcfg/client.go @@ -66,6 +66,31 @@ type ClientConfigChangerSink struct { urlParse func(string) (url *url.URL, err error) } +// clientCfgChangerSinkInitHelper is a common init for ClientConfigChangerSink +// This api accepts "authToken func()" which if not set, will fall back to default behavior of setting up watcher for +// auth token, otherwise it will get the auth token to be used and do not set a watcher +func clientCfgChangerSinkInitHelper(httpSink *sfxclient.HTTPSink, conf *ClientConfig, authToken func() string, logger log.Logger) *ClientConfigChangerSink { + ret := &ClientConfigChangerSink{ + Destination: httpSink, + urlParse: url.Parse, + logger: logger, + } + + ret.endpointWatch(conf.Endpoint, "") + ret.disableCompressionWatch(conf.DisableCompression, false) + conf.Endpoint.Watch(ret.endpointWatch) + conf.DisableCompression.Watch(ret.disableCompressionWatch) + if authToken != nil { + ret.mu.Lock() + ret.Destination.AuthToken = authToken() + ret.mu.Unlock() + } else { + ret.authTokenWatch(conf.AuthToken, "") + conf.AuthToken.Watch(ret.authTokenWatch) + } + return ret +} + // WatchSinkChanges returns a new ClientConfigChangerSink that wraps a sink with auth/endpoint changes from distconf func WatchSinkChanges(sink sfxclient.Sink, conf *ClientConfig, logger log.Logger) sfxclient.Sink { httpSink, ok := sink.(*sfxclient.HTTPSink) @@ -75,20 +100,24 @@ func WatchSinkChanges(sink sfxclient.Sink, conf *ClientConfig, logger log.Logger return WatchHTTPSinkChange(httpSink, conf, logger) } -// WatchHTTPSinkChange returns anew ClientConfigChangerSink that takes an http sink, instead of a regular sinc +// WatchHTTPSinkChange returns anew ClientConfigChangerSink that takes an http sink, instead of a regular sink func WatchHTTPSinkChange(httpSink *sfxclient.HTTPSink, conf *ClientConfig, logger log.Logger) *ClientConfigChangerSink { - ret := &ClientConfigChangerSink{ - Destination: httpSink, - urlParse: url.Parse, - logger: logger, + return clientCfgChangerSinkInitHelper(httpSink, conf, nil, logger) +} + +// SetupSinkClientChanger returns a new ClientConfigChangerSink that wraps a sink with endpoint changes from distconf +// This API should be used if you do not want to have distconf auth watcher +func SetupSinkClientChanger(sink sfxclient.Sink, conf *ClientConfig, authToken func() string, logger log.Logger) sfxclient.Sink { + if httpSink, ok := sink.(*sfxclient.HTTPSink); ok { + ret := clientCfgChangerSinkInitHelper(httpSink, conf, authToken, logger) + return ret } - ret.authTokenWatch(conf.AuthToken, "") - ret.endpointWatch(conf.Endpoint, "") - ret.disableCompressionWatch(conf.DisableCompression, false) - conf.Endpoint.Watch(ret.endpointWatch) - conf.AuthToken.Watch(ret.authTokenWatch) - conf.DisableCompression.Watch(ret.disableCompressionWatch) - return ret + return sink +} + +// AuthUpdate will return a func which can be used for updating auth token if default watcher is not used for token update +func (s *ClientConfigChangerSink) AuthUpdate() func(string) { + return s.updateToken } // AddDatapoints forwards the call to Destination @@ -119,13 +148,17 @@ func (s *ClientConfigChangerSink) disableCompressionWatch(newValue *distconf.Boo s.mu.Unlock() } -func (s *ClientConfigChangerSink) authTokenWatch(str *distconf.Str, oldValue string) { - s.logger.Log("auth watch") +func (s *ClientConfigChangerSink) updateToken(newToken string) { s.mu.Lock() - s.Destination.AuthToken = str.Get() + s.Destination.AuthToken = newToken s.mu.Unlock() } +func (s *ClientConfigChangerSink) authTokenWatch(str *distconf.Str, _ string) { + s.logger.Log("auth watch") + s.updateToken(str.Get()) +} + // endpointWatch returns a distconf watch that sets the correct ingest endpoint for a signalfx // client func (s *ClientConfigChangerSink) endpointWatch(str *distconf.Str, oldValue string) { diff --git a/clientcfg/client_test.go b/clientcfg/client_test.go index 0c104fa..e97bfa4 100644 --- a/clientcfg/client_test.go +++ b/clientcfg/client_test.go @@ -83,3 +83,30 @@ func TestClient(t *testing.T) { }) }) } + +func TestSetupSinkClientChanger(t *testing.T) { + mem := distconf.Mem() + dconf := distconf.New([]distconf.Reader{mem}) + conf := &ClientConfig{} + logger := log.Discard + conf.Load(dconf) + authToken := func() string { + return "test-token" + } + newToken := "new-token" + Convey("http sink without auth watcher but with auth update func should work", t, func() { + httpSink := sfxclient.NewHTTPSink() + sink, ok := SetupSinkClientChanger(httpSink, conf, authToken, logger).(*ClientConfigChangerSink) + So(ok, ShouldBeTrue) + So(sink, ShouldNotBeNil) + authUpdaterFunc := sink.AuthUpdate() + So(authUpdaterFunc, ShouldNotBeNil) + authUpdaterFunc(newToken) + sink.mu.Lock() + So(sink.Destination.AuthToken, ShouldEqual, newToken) + sink.mu.Unlock() + + basicSink := dptest.NewBasicSink() + So(SetupSinkClientChanger(basicSink, conf, nil, logger), ShouldEqual, basicSink) + }) +}