Skip to content

Commit

Permalink
Merge pull request #227 from signalfx/add_support_to_not_set_auth_wat…
Browse files Browse the repository at this point in the history
…cher

add support to set up watcher via update function
  • Loading branch information
jgheewala authored Oct 6, 2021
2 parents 602c94b + 236aa86 commit 58a037b
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 15 deletions.
63 changes: 48 additions & 15 deletions clientcfg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ type ClientConfigChangerSink struct {
urlParse func(string) (url *url.URL, err error)
}

// clientCfgChangerSinkInitHelper is a common init for ClientConfigChangerSink
// This api accepts "authToken func()" which if not set, will fall back to default behavior of setting up watcher for
// auth token, otherwise it will get the auth token to be used and do not set a watcher
func clientCfgChangerSinkInitHelper(httpSink *sfxclient.HTTPSink, conf *ClientConfig, authToken func() string, logger log.Logger) *ClientConfigChangerSink {
ret := &ClientConfigChangerSink{
Destination: httpSink,
urlParse: url.Parse,
logger: logger,
}

ret.endpointWatch(conf.Endpoint, "")
ret.disableCompressionWatch(conf.DisableCompression, false)
conf.Endpoint.Watch(ret.endpointWatch)
conf.DisableCompression.Watch(ret.disableCompressionWatch)
if authToken != nil {
ret.mu.Lock()
ret.Destination.AuthToken = authToken()
ret.mu.Unlock()
} else {
ret.authTokenWatch(conf.AuthToken, "")
conf.AuthToken.Watch(ret.authTokenWatch)
}
return ret
}

// WatchSinkChanges returns a new ClientConfigChangerSink that wraps a sink with auth/endpoint changes from distconf
func WatchSinkChanges(sink sfxclient.Sink, conf *ClientConfig, logger log.Logger) sfxclient.Sink {
httpSink, ok := sink.(*sfxclient.HTTPSink)
Expand All @@ -75,20 +100,24 @@ func WatchSinkChanges(sink sfxclient.Sink, conf *ClientConfig, logger log.Logger
return WatchHTTPSinkChange(httpSink, conf, logger)
}

// WatchHTTPSinkChange returns anew ClientConfigChangerSink that takes an http sink, instead of a regular sinc
// WatchHTTPSinkChange returns anew ClientConfigChangerSink that takes an http sink, instead of a regular sink
func WatchHTTPSinkChange(httpSink *sfxclient.HTTPSink, conf *ClientConfig, logger log.Logger) *ClientConfigChangerSink {
ret := &ClientConfigChangerSink{
Destination: httpSink,
urlParse: url.Parse,
logger: logger,
return clientCfgChangerSinkInitHelper(httpSink, conf, nil, logger)
}

// SetupSinkClientChanger returns a new ClientConfigChangerSink that wraps a sink with endpoint changes from distconf
// This API should be used if you do not want to have distconf auth watcher
func SetupSinkClientChanger(sink sfxclient.Sink, conf *ClientConfig, authToken func() string, logger log.Logger) sfxclient.Sink {
if httpSink, ok := sink.(*sfxclient.HTTPSink); ok {
ret := clientCfgChangerSinkInitHelper(httpSink, conf, authToken, logger)
return ret
}
ret.authTokenWatch(conf.AuthToken, "")
ret.endpointWatch(conf.Endpoint, "")
ret.disableCompressionWatch(conf.DisableCompression, false)
conf.Endpoint.Watch(ret.endpointWatch)
conf.AuthToken.Watch(ret.authTokenWatch)
conf.DisableCompression.Watch(ret.disableCompressionWatch)
return ret
return sink
}

// AuthUpdate will return a func which can be used for updating auth token if default watcher is not used for token update
func (s *ClientConfigChangerSink) AuthUpdate() func(string) {
return s.updateToken
}

// AddDatapoints forwards the call to Destination
Expand Down Expand Up @@ -119,13 +148,17 @@ func (s *ClientConfigChangerSink) disableCompressionWatch(newValue *distconf.Boo
s.mu.Unlock()
}

func (s *ClientConfigChangerSink) authTokenWatch(str *distconf.Str, oldValue string) {
s.logger.Log("auth watch")
func (s *ClientConfigChangerSink) updateToken(newToken string) {
s.mu.Lock()
s.Destination.AuthToken = str.Get()
s.Destination.AuthToken = newToken
s.mu.Unlock()
}

func (s *ClientConfigChangerSink) authTokenWatch(str *distconf.Str, _ string) {
s.logger.Log("auth watch")
s.updateToken(str.Get())
}

// endpointWatch returns a distconf watch that sets the correct ingest endpoint for a signalfx
// client
func (s *ClientConfigChangerSink) endpointWatch(str *distconf.Str, oldValue string) {
Expand Down
27 changes: 27 additions & 0 deletions clientcfg/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,30 @@ func TestClient(t *testing.T) {
})
})
}

func TestSetupSinkClientChanger(t *testing.T) {
mem := distconf.Mem()
dconf := distconf.New([]distconf.Reader{mem})
conf := &ClientConfig{}
logger := log.Discard
conf.Load(dconf)
authToken := func() string {
return "test-token"
}
newToken := "new-token"
Convey("http sink without auth watcher but with auth update func should work", t, func() {
httpSink := sfxclient.NewHTTPSink()
sink, ok := SetupSinkClientChanger(httpSink, conf, authToken, logger).(*ClientConfigChangerSink)
So(ok, ShouldBeTrue)
So(sink, ShouldNotBeNil)
authUpdaterFunc := sink.AuthUpdate()
So(authUpdaterFunc, ShouldNotBeNil)
authUpdaterFunc(newToken)
sink.mu.Lock()
So(sink.Destination.AuthToken, ShouldEqual, newToken)
sink.mu.Unlock()

basicSink := dptest.NewBasicSink()
So(SetupSinkClientChanger(basicSink, conf, nil, logger), ShouldEqual, basicSink)
})
}

0 comments on commit 58a037b

Please sign in to comment.