Skip to content

Commit

Permalink
Feature/start stop (#128)
Browse files Browse the repository at this point in the history
* be consistent with receiver type

* make listener restartable and thread safe

* first attempt at atomic value approach

* 3-state CAS approach

* single shutdown channel should be ok

* Add concurrent and serial tests

* dead code clean up

* fix up assertions for serial case

* Update error messages

* updates test related to new type

* more consistent name

* Add more tests

* Export non-fatal listener errors

* test for value after listening is done

* Move entries to unreleased and add udpate
  • Loading branch information
joe94 authored Feb 16, 2021
1 parent 3118676 commit 9e05361
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 34 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Make client listener thread-safe and friendlier to uber/fx hooks. [#128](https://github.com/xmidt-org/argus/pull/128)
### Fixed
- Bug in which the userInputValidation config section was required even when it should've been optional. [#129](https://github.com/xmidt-org/argus/pull/129)
- Fix logging to use `xlog` instead of deprecated `webpa-common/logging` package. [#132](https://github.com/xmidt-org/argus/pull/132)
- Fix ListenerFunc interface. [#133](https://github.com/xmidt-org/argus/pull/133)


## [v0.3.10]
### Changed
Expand All @@ -21,9 +26,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Refactor client code and add unit tests around item CRUD operations [#119](https://github.com/xmidt-org/argus/pull/119)

### Fixed
- Fix logging to use `xlog` instead of deprecated `webpa-common/logging` package. [#132](https://github.com/xmidt-org/argus/pull/132)
- Fix behavior in which the owner of an existing item was overwritten in super user mode. [#116](https://github.com/xmidt-org/argus/pull/116)
- Fix ListenerFunc interface. [#133](https://github.com/xmidt-org/argus/pull/133)

### Added
- Item ID is validated to have the format of a SHA256 message hex digest. [#106](https://github.com/xmidt-org/argus/pull/106)
Expand Down
96 changes: 69 additions & 27 deletions chrysom/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -61,6 +62,9 @@ var (

ErrFailedAuthentication = errors.New("failed to authentication with argus")
ErrBadRequest = errors.New("argus rejected the request as invalid")

ErrListenerNotStopped = errors.New("listener is either running or starting")
ErrListenerNotRunning = errors.New("listener is either stopped or stopping")
)

var (
Expand Down Expand Up @@ -137,12 +141,22 @@ type Client struct {
observer *listenerConfig
}

// listening states
const (
stopped int32 = iota
running
transitioning
)

type listenerConfig struct {
listener Listener
ticker *time.Ticker
pollCount metrics.Counter
bucket string
owner string
listener Listener
ticker *time.Ticker
pullInterval time.Duration
pollCount metrics.Counter
bucket string
owner string
shutdown chan struct{}
state int32
}

func NewClient(config ClientConfig) (*Client, error) {
Expand All @@ -159,7 +173,7 @@ func NewClient(config ClientConfig) (*Client, error) {
client: config.HTTPClient,
auth: tokenAcquirer,
logger: config.Logger,
observer: createObserver(config.Logger, config),
observer: newObserver(config.Logger, config),
storeBaseURL: config.Address + storeAPIPath,
}

Expand All @@ -179,16 +193,18 @@ func translateNonSuccessStatusCode(code int) error {
}
}

func createObserver(logger log.Logger, config ClientConfig) *listenerConfig {
func newObserver(logger log.Logger, config ClientConfig) *listenerConfig {
if config.Listener == nil {
return nil
}
return &listenerConfig{
listener: config.Listener,
ticker: time.NewTicker(config.PullInterval),
pollCount: config.MetricsProvider.NewCounter(PollCounter),
bucket: config.Bucket,
owner: config.Owner,
listener: config.Listener,
ticker: time.NewTicker(config.PullInterval),
pullInterval: config.PullInterval,
pollCount: config.MetricsProvider.NewCounter(PollCounter),
bucket: config.Bucket,
owner: config.Owner,
shutdown: make(chan struct{}),
}
}

Expand Down Expand Up @@ -228,7 +244,7 @@ func buildTokenAcquirer(auth *Auth) (acquire.Acquirer, error) {
return &acquire.DefaultAcquirer{}, nil
}

func (c Client) sendRequest(owner, method, url string, body io.Reader) (response, error) {
func (c *Client) sendRequest(owner, method, url string, body io.Reader) (response, error) {
r, err := http.NewRequest(method, url, body)
if err != nil {
return response{}, fmt.Errorf(errWrappedFmt, errNewRequestFailure, err.Error())
Expand Down Expand Up @@ -344,38 +360,64 @@ func (c *Client) RemoveItem(id, bucket, owner string) (model.Item, error) {
return item, nil
}

// Start begins listening for updates on an interval given that client configuration
// is setup correctly. If a listener process is already in progress, calling Start()
// is a NoOp. If you want to restart the current listener process, call Stop() first.
func (c *Client) Start(ctx context.Context) error {
if c.observer == nil {
if c.observer == nil || c.observer.listener == nil {
level.Warn(c.logger).Log(xlog.MessageKey(), "No listener was setup to receive updates.")
return nil
}

if c.observer.ticker == nil {
level.Error(c.logger).Log(xlog.MessageKey(), "Observer ticker is nil")
return ErrUndefinedIntervalTicker
}

if !atomic.CompareAndSwapInt32(&c.observer.state, stopped, transitioning) {
level.Error(c.logger).Log(xlog.MessageKey(), "Start called when a listener was not in stopped state", "err", ErrListenerNotStopped)
return ErrListenerNotStopped
}

c.observer.ticker.Reset(c.observer.pullInterval)
go func() {
observer := c.observer
for range observer.ticker.C {
outcome := SuccessOutcome
items, err := c.GetItems(observer.bucket, observer.owner)
if err == nil {
observer.listener.Update(items)
} else {
outcome = FailureOutcome
level.Error(c.logger).Log(xlog.MessageKey(), "Failed to get items for listeners", xlog.ErrorKey(), err)
for {
select {
case <-c.observer.shutdown:
return
case <-c.observer.ticker.C:
outcome := SuccessOutcome
items, err := c.GetItems(c.observer.bucket, c.observer.owner)
if err == nil {
c.observer.listener.Update(items)
} else {
outcome = FailureOutcome
level.Error(c.logger).Log(xlog.MessageKey(), "Failed to get items for listeners", xlog.ErrorKey(), err)
}
c.observer.pollCount.With(OutcomeLabel, outcome).Add(1)
}
observer.pollCount.With(OutcomeLabel, outcome).Add(1)
}
}()

atomic.SwapInt32(&c.observer.state, running)
return nil
}

// Stop requests the current listener process to stop and waits for its goroutine to complete.
// Calling Stop() when a listener is not running (or while one is getting stopped) returns an
// error.
func (c *Client) Stop(ctx context.Context) error {
if c.observer != nil && c.observer.ticker != nil {
c.observer.ticker.Stop()
if c.observer == nil || c.observer.ticker == nil {
return nil
}

if !atomic.CompareAndSwapInt32(&c.observer.state, running, transitioning) {
level.Error(c.logger).Log(xlog.MessageKey(), "Stop called when a listener was not in running state", "err", ErrListenerNotStopped)
return ErrListenerNotRunning
}

c.observer.ticker.Stop()
c.observer.shutdown <- struct{}{}
atomic.SwapInt32(&c.observer.state, stopped)
return nil
}

Expand Down
146 changes: 146 additions & 0 deletions chrysom/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package chrysom

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

Expand All @@ -18,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/xmidt-org/argus/model"
"github.com/xmidt-org/argus/store"
"github.com/xmidt-org/themis/xlog"
)

const failingURL = "nowhere://"
Expand Down Expand Up @@ -613,6 +616,149 @@ func TestTranslateStatusCode(t *testing.T) {
})
}
}

func TestListenerStartStopPairsParallel(t *testing.T) {
require := require.New(t)
client, close := newStartStopClient(true)
defer close()

t.Run("ParallelGroup", func(t *testing.T) {
for i := 0; i < 20; i++ {
testNumber := i
t.Run(strconv.Itoa(testNumber), func(t *testing.T) {
t.Parallel()
assert := assert.New(t)
fmt.Printf("%d: Start\n", testNumber)
errStart := client.Start(context.Background())
if errStart != nil {
assert.Equal(ErrListenerNotStopped, errStart)
}
time.Sleep(time.Millisecond * 400)
errStop := client.Stop(context.Background())
if errStop != nil {
assert.Equal(ErrListenerNotRunning, errStop)
}
fmt.Printf("%d: Done\n", testNumber)
})
}
})

require.Equal(stopped, client.observer.state)
}

func TestListenerStartStopPairsSerial(t *testing.T) {
require := require.New(t)
client, close := newStartStopClient(true)
defer close()

for i := 0; i < 5; i++ {
testNumber := i
t.Run(strconv.Itoa(testNumber), func(t *testing.T) {
assert := assert.New(t)
fmt.Printf("%d: Start\n", testNumber)
assert.Nil(client.Start(context.Background()))
assert.Nil(client.Stop(context.Background()))
fmt.Printf("%d: Done\n", testNumber)
})
}
require.Equal(stopped, client.observer.state)
}

func TestListenerEdgeCases(t *testing.T) {
t.Run("NoListener", func(t *testing.T) {
client, stopServer := newStartStopClient(false)
defer stopServer()
assert := assert.New(t)
assert.Nil(client.Start(context.Background()))
assert.Nil(client.Stop(context.Background()))
})

t.Run("NilTicker", func(t *testing.T) {
assert := assert.New(t)
client, stopServer := newStartStopClient(true)
defer stopServer()
client.observer.ticker = nil
assert.Equal(ErrUndefinedIntervalTicker, client.Start(context.Background()))
})

t.Run("PartialUpdateFailures", func(t *testing.T) {
assert := assert.New(t)
tester := &getItemsStartStopTester{}
client, stopServer := tester.newSpecialStartStopClient()
defer stopServer()

assert.Nil(client.Start(context.Background()))

time.Sleep(time.Millisecond * 500)
assert.Nil(client.Stop(context.Background()))
assert.Len(tester.items, 1)
})
}

func newStartStopClient(includeListener bool) (*Client, func()) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Write(getItemsValidPayload())
}))
config := ClientConfig{
Address: server.URL,
HTTPClient: server.Client(),
MetricsProvider: provider.NewDiscardProvider(),
PullInterval: time.Millisecond * 200,
Bucket: "parallel-test-bucket",
Logger: xlog.Default(),
}
if includeListener {
config.Listener = ListenerFunc((func(_ Items) {
fmt.Println("Doing amazing work for 100ms")
time.Sleep(time.Millisecond * 100)
}))
}

client, err := NewClient(config)
if err != nil {
panic(err)
}

return client, server.Close
}

type getItemsStartStopTester struct {
items Items
}

func (g *getItemsStartStopTester) newSpecialStartStopClient() (*Client, func()) {
succeed := true
succeedFirstTimeOnlyServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if succeed {
rw.Write(getItemsValidPayload())
succeed = false
} else {
rw.WriteHeader(http.StatusInternalServerError)
}
}))

config := ClientConfig{
Address: succeedFirstTimeOnlyServer.URL,
HTTPClient: succeedFirstTimeOnlyServer.Client(),
MetricsProvider: provider.NewDiscardProvider(),
PullInterval: time.Millisecond * 200,
Bucket: "parallel-test-bucket",
Logger: xlog.Default(),
Listener: ListenerFunc((func(items Items) {
fmt.Println("Capturing all items")
g.items = append(g.items, items...)
})),
}

client, err := NewClient(config)

if err != nil {
panic(err)
}

return client, succeedFirstTimeOnlyServer.Close
}

func failAcquirer() (string, error) {
return "", errors.New("always fail")
}
Expand Down
5 changes: 0 additions & 5 deletions chrysom/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package chrysom

import (
"github.com/go-kit/kit/metrics"
"github.com/xmidt-org/webpa-common/xmetrics"
)

Expand Down Expand Up @@ -32,7 +31,3 @@ func Metrics() []xmetrics.Metric {
},
}
}

type measures struct {
pollCount metrics.Counter
}

0 comments on commit 9e05361

Please sign in to comment.