From 9e053615e221fd4eaa3438ed46a94f28b14f96d5 Mon Sep 17 00:00:00 2001 From: Joel Unzain Date: Mon, 15 Feb 2021 17:56:47 -0800 Subject: [PATCH] Feature/start stop (#128) * be consistent with receiver type * make listener restartable and thread safe * first attempt at atomic value approach * 3-state CAS approach * single shutdown channel should be ok * Add concurrent and serial tests * dead code clean up * fix up assertions for serial case * Update error messages * updates test related to new type * more consistent name * Add more tests * Export non-fatal listener errors * test for value after listening is done * Move entries to unreleased and add udpate --- CHANGELOG.md | 7 +- chrysom/client.go | 96 +++++++++++++++++++-------- chrysom/client_test.go | 146 +++++++++++++++++++++++++++++++++++++++++ chrysom/metrics.go | 5 -- 4 files changed, 220 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35f20ba0..23de46b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- Make client listener thread-safe and friendlier to uber/fx hooks. [#128](https://github.com/xmidt-org/argus/pull/128) ### Fixed - Bug in which the userInputValidation config section was required even when it should've been optional. [#129](https://github.com/xmidt-org/argus/pull/129) +- Fix logging to use `xlog` instead of deprecated `webpa-common/logging` package. [#132](https://github.com/xmidt-org/argus/pull/132) +- Fix ListenerFunc interface. [#133](https://github.com/xmidt-org/argus/pull/133) + ## [v0.3.10] ### Changed @@ -21,9 +26,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Refactor client code and add unit tests around item CRUD operations [#119](https://github.com/xmidt-org/argus/pull/119) ### Fixed -- Fix logging to use `xlog` instead of deprecated `webpa-common/logging` package. [#132](https://github.com/xmidt-org/argus/pull/132) - Fix behavior in which the owner of an existing item was overwritten in super user mode. [#116](https://github.com/xmidt-org/argus/pull/116) -- Fix ListenerFunc interface. [#133](https://github.com/xmidt-org/argus/pull/133) ### Added - Item ID is validated to have the format of a SHA256 message hex digest. [#106](https://github.com/xmidt-org/argus/pull/106) diff --git a/chrysom/client.go b/chrysom/client.go index 7c6e83a8..b0935c15 100644 --- a/chrysom/client.go +++ b/chrysom/client.go @@ -27,6 +27,7 @@ import ( "io/ioutil" "net/http" "strings" + "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -61,6 +62,9 @@ var ( ErrFailedAuthentication = errors.New("failed to authentication with argus") ErrBadRequest = errors.New("argus rejected the request as invalid") + + ErrListenerNotStopped = errors.New("listener is either running or starting") + ErrListenerNotRunning = errors.New("listener is either stopped or stopping") ) var ( @@ -137,12 +141,22 @@ type Client struct { observer *listenerConfig } +// listening states +const ( + stopped int32 = iota + running + transitioning +) + type listenerConfig struct { - listener Listener - ticker *time.Ticker - pollCount metrics.Counter - bucket string - owner string + listener Listener + ticker *time.Ticker + pullInterval time.Duration + pollCount metrics.Counter + bucket string + owner string + shutdown chan struct{} + state int32 } func NewClient(config ClientConfig) (*Client, error) { @@ -159,7 +173,7 @@ func NewClient(config ClientConfig) (*Client, error) { client: config.HTTPClient, auth: tokenAcquirer, logger: config.Logger, - observer: createObserver(config.Logger, config), + observer: newObserver(config.Logger, config), storeBaseURL: config.Address + storeAPIPath, } @@ -179,16 +193,18 @@ func translateNonSuccessStatusCode(code int) error { } } -func createObserver(logger log.Logger, config ClientConfig) *listenerConfig { +func newObserver(logger log.Logger, config ClientConfig) *listenerConfig { if config.Listener == nil { return nil } return &listenerConfig{ - listener: config.Listener, - ticker: time.NewTicker(config.PullInterval), - pollCount: config.MetricsProvider.NewCounter(PollCounter), - bucket: config.Bucket, - owner: config.Owner, + listener: config.Listener, + ticker: time.NewTicker(config.PullInterval), + pullInterval: config.PullInterval, + pollCount: config.MetricsProvider.NewCounter(PollCounter), + bucket: config.Bucket, + owner: config.Owner, + shutdown: make(chan struct{}), } } @@ -228,7 +244,7 @@ func buildTokenAcquirer(auth *Auth) (acquire.Acquirer, error) { return &acquire.DefaultAcquirer{}, nil } -func (c Client) sendRequest(owner, method, url string, body io.Reader) (response, error) { +func (c *Client) sendRequest(owner, method, url string, body io.Reader) (response, error) { r, err := http.NewRequest(method, url, body) if err != nil { return response{}, fmt.Errorf(errWrappedFmt, errNewRequestFailure, err.Error()) @@ -344,38 +360,64 @@ func (c *Client) RemoveItem(id, bucket, owner string) (model.Item, error) { return item, nil } +// Start begins listening for updates on an interval given that client configuration +// is setup correctly. If a listener process is already in progress, calling Start() +// is a NoOp. If you want to restart the current listener process, call Stop() first. func (c *Client) Start(ctx context.Context) error { - if c.observer == nil { + if c.observer == nil || c.observer.listener == nil { level.Warn(c.logger).Log(xlog.MessageKey(), "No listener was setup to receive updates.") return nil } - if c.observer.ticker == nil { + level.Error(c.logger).Log(xlog.MessageKey(), "Observer ticker is nil") return ErrUndefinedIntervalTicker } + if !atomic.CompareAndSwapInt32(&c.observer.state, stopped, transitioning) { + level.Error(c.logger).Log(xlog.MessageKey(), "Start called when a listener was not in stopped state", "err", ErrListenerNotStopped) + return ErrListenerNotStopped + } + + c.observer.ticker.Reset(c.observer.pullInterval) go func() { - observer := c.observer - for range observer.ticker.C { - outcome := SuccessOutcome - items, err := c.GetItems(observer.bucket, observer.owner) - if err == nil { - observer.listener.Update(items) - } else { - outcome = FailureOutcome - level.Error(c.logger).Log(xlog.MessageKey(), "Failed to get items for listeners", xlog.ErrorKey(), err) + for { + select { + case <-c.observer.shutdown: + return + case <-c.observer.ticker.C: + outcome := SuccessOutcome + items, err := c.GetItems(c.observer.bucket, c.observer.owner) + if err == nil { + c.observer.listener.Update(items) + } else { + outcome = FailureOutcome + level.Error(c.logger).Log(xlog.MessageKey(), "Failed to get items for listeners", xlog.ErrorKey(), err) + } + c.observer.pollCount.With(OutcomeLabel, outcome).Add(1) } - observer.pollCount.With(OutcomeLabel, outcome).Add(1) } }() + atomic.SwapInt32(&c.observer.state, running) return nil } +// Stop requests the current listener process to stop and waits for its goroutine to complete. +// Calling Stop() when a listener is not running (or while one is getting stopped) returns an +// error. func (c *Client) Stop(ctx context.Context) error { - if c.observer != nil && c.observer.ticker != nil { - c.observer.ticker.Stop() + if c.observer == nil || c.observer.ticker == nil { + return nil } + + if !atomic.CompareAndSwapInt32(&c.observer.state, running, transitioning) { + level.Error(c.logger).Log(xlog.MessageKey(), "Stop called when a listener was not in running state", "err", ErrListenerNotStopped) + return ErrListenerNotRunning + } + + c.observer.ticker.Stop() + c.observer.shutdown <- struct{}{} + atomic.SwapInt32(&c.observer.state, stopped) return nil } diff --git a/chrysom/client_test.go b/chrysom/client_test.go index 5f399494..22523fd8 100644 --- a/chrysom/client_test.go +++ b/chrysom/client_test.go @@ -2,12 +2,14 @@ package chrysom import ( "bytes" + "context" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "net/http/httptest" + "strconv" "testing" "time" @@ -18,6 +20,7 @@ import ( "github.com/stretchr/testify/require" "github.com/xmidt-org/argus/model" "github.com/xmidt-org/argus/store" + "github.com/xmidt-org/themis/xlog" ) const failingURL = "nowhere://" @@ -613,6 +616,149 @@ func TestTranslateStatusCode(t *testing.T) { }) } } + +func TestListenerStartStopPairsParallel(t *testing.T) { + require := require.New(t) + client, close := newStartStopClient(true) + defer close() + + t.Run("ParallelGroup", func(t *testing.T) { + for i := 0; i < 20; i++ { + testNumber := i + t.Run(strconv.Itoa(testNumber), func(t *testing.T) { + t.Parallel() + assert := assert.New(t) + fmt.Printf("%d: Start\n", testNumber) + errStart := client.Start(context.Background()) + if errStart != nil { + assert.Equal(ErrListenerNotStopped, errStart) + } + time.Sleep(time.Millisecond * 400) + errStop := client.Stop(context.Background()) + if errStop != nil { + assert.Equal(ErrListenerNotRunning, errStop) + } + fmt.Printf("%d: Done\n", testNumber) + }) + } + }) + + require.Equal(stopped, client.observer.state) +} + +func TestListenerStartStopPairsSerial(t *testing.T) { + require := require.New(t) + client, close := newStartStopClient(true) + defer close() + + for i := 0; i < 5; i++ { + testNumber := i + t.Run(strconv.Itoa(testNumber), func(t *testing.T) { + assert := assert.New(t) + fmt.Printf("%d: Start\n", testNumber) + assert.Nil(client.Start(context.Background())) + assert.Nil(client.Stop(context.Background())) + fmt.Printf("%d: Done\n", testNumber) + }) + } + require.Equal(stopped, client.observer.state) +} + +func TestListenerEdgeCases(t *testing.T) { + t.Run("NoListener", func(t *testing.T) { + client, stopServer := newStartStopClient(false) + defer stopServer() + assert := assert.New(t) + assert.Nil(client.Start(context.Background())) + assert.Nil(client.Stop(context.Background())) + }) + + t.Run("NilTicker", func(t *testing.T) { + assert := assert.New(t) + client, stopServer := newStartStopClient(true) + defer stopServer() + client.observer.ticker = nil + assert.Equal(ErrUndefinedIntervalTicker, client.Start(context.Background())) + }) + + t.Run("PartialUpdateFailures", func(t *testing.T) { + assert := assert.New(t) + tester := &getItemsStartStopTester{} + client, stopServer := tester.newSpecialStartStopClient() + defer stopServer() + + assert.Nil(client.Start(context.Background())) + + time.Sleep(time.Millisecond * 500) + assert.Nil(client.Stop(context.Background())) + assert.Len(tester.items, 1) + }) +} + +func newStartStopClient(includeListener bool) (*Client, func()) { + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.Write(getItemsValidPayload()) + })) + config := ClientConfig{ + Address: server.URL, + HTTPClient: server.Client(), + MetricsProvider: provider.NewDiscardProvider(), + PullInterval: time.Millisecond * 200, + Bucket: "parallel-test-bucket", + Logger: xlog.Default(), + } + if includeListener { + config.Listener = ListenerFunc((func(_ Items) { + fmt.Println("Doing amazing work for 100ms") + time.Sleep(time.Millisecond * 100) + })) + } + + client, err := NewClient(config) + if err != nil { + panic(err) + } + + return client, server.Close +} + +type getItemsStartStopTester struct { + items Items +} + +func (g *getItemsStartStopTester) newSpecialStartStopClient() (*Client, func()) { + succeed := true + succeedFirstTimeOnlyServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if succeed { + rw.Write(getItemsValidPayload()) + succeed = false + } else { + rw.WriteHeader(http.StatusInternalServerError) + } + })) + + config := ClientConfig{ + Address: succeedFirstTimeOnlyServer.URL, + HTTPClient: succeedFirstTimeOnlyServer.Client(), + MetricsProvider: provider.NewDiscardProvider(), + PullInterval: time.Millisecond * 200, + Bucket: "parallel-test-bucket", + Logger: xlog.Default(), + Listener: ListenerFunc((func(items Items) { + fmt.Println("Capturing all items") + g.items = append(g.items, items...) + })), + } + + client, err := NewClient(config) + + if err != nil { + panic(err) + } + + return client, succeedFirstTimeOnlyServer.Close +} + func failAcquirer() (string, error) { return "", errors.New("always fail") } diff --git a/chrysom/metrics.go b/chrysom/metrics.go index 599d8658..0ef0af05 100644 --- a/chrysom/metrics.go +++ b/chrysom/metrics.go @@ -1,7 +1,6 @@ package chrysom import ( - "github.com/go-kit/kit/metrics" "github.com/xmidt-org/webpa-common/xmetrics" ) @@ -32,7 +31,3 @@ func Metrics() []xmetrics.Metric { }, } } - -type measures struct { - pollCount metrics.Counter -}