From b474ee91964383a0205e53c261f952be959a20be Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Fri, 15 Jan 2016 18:00:14 +0300 Subject: [PATCH] bind results channel method in Gun interface, more tests --- config/config_test.go | 3 ++- engine/user.go | 4 ++-- gun/gun.go | 3 ++- gun/http/http.go | 20 +++++++++------- gun/http/http_test.go | 14 ++++++----- gun/log.go | 28 +++++++--------------- gun/spdy/spdy.go | 56 ++++++++++++++++++++++++------------------- gun/spdy/spdy_test.go | 39 ++++++++++++++++++++++++------ 8 files changed, 99 insertions(+), 68 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 5743271ff..94545ca64 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -33,7 +33,8 @@ func TestGlobalConfig(t *testing.T) { gc := &Gun{ GunType: "spdy", Parameters: map[string]interface{}{ - "Target": "localhost:3000", + "Target": "localhost:3000", + "PingPeriod": "5", }, } globalConfig := &Global{ diff --git a/engine/user.go b/engine/user.go index 83e467a85..8736659b3 100644 --- a/engine/user.go +++ b/engine/user.go @@ -29,7 +29,7 @@ func (u *User) Run(ctx context.Context) error { }() control := u.Limiter.Control() source := u.Ammunition.Source() - sink := u.Results.Sink() + u.Gun.BindResultsTo(u.Results.Sink()) loop: for { select { @@ -40,7 +40,7 @@ loop: } _, more = <-control if more { - u.Gun.Shoot(ctx, ammo, sink) + u.Gun.Shoot(ctx, ammo) u.Ammunition.Release(ammo) } else { log.Println("Limiter ended.") diff --git a/gun/gun.go b/gun/gun.go index d66ea56b2..85f048b09 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -7,5 +7,6 @@ import ( ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- *aggregate.Sample) error + Shoot(context.Context, ammo.Ammo) error + BindResultsTo(chan<- *aggregate.Sample) } diff --git a/gun/http/http.go b/gun/http/http.go index 438a0c79d..8b3b42143 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -28,23 +28,27 @@ const ( ) type HttpGun struct { - target string - ssl bool - client *http.Client + target string + ssl bool + client *http.Client + results chan<- *aggregate.Sample +} + +func (hg *HttpGun) BindResultsTo(results chan<- *aggregate.Sample) { + hg.results = results } // Shoot to target, this method is not thread safe -func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- *aggregate.Sample) error { +func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo) error { if hg.client == nil { - hg.Connect(results) + hg.Connect() } start := time.Now() ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) - results <- ss + hg.results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) @@ -99,7 +103,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- *aggregate.Sample) { +func (hg *HttpGun) Connect() { hg.Close() config := tls.Config{ InsecureSkipVerify: true, diff --git a/gun/http/http_test.go b/gun/http/http_test.go index 5a498118e..8ebb5ceac 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -33,8 +33,9 @@ func TestHttpGunWithSsl(t *testing.T) { defer ts.Close() gun := &HttpGun{ - target: ts.Listener.Addr().String(), - ssl: true, + target: ts.Listener.Addr().String(), + ssl: true, + results: result, } promise := utils.Promise(func() error { defer close(result) @@ -48,7 +49,7 @@ func TestHttpGunWithSsl(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) @@ -92,8 +93,9 @@ func TestHttpGunWithHttp(t *testing.T) { defer ts.Close() gun := &HttpGun{ - target: ts.Listener.Addr().String(), - ssl: false, + target: ts.Listener.Addr().String(), + ssl: false, + results: result, } promise := utils.Promise(func() error { defer close(result) @@ -107,7 +109,7 @@ func TestHttpGunWithHttp(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) diff --git a/gun/log.go b/gun/log.go index c6bb44ec7..b05444366 100644 --- a/gun/log.go +++ b/gun/log.go @@ -1,9 +1,8 @@ package gun import ( - "fmt" "log" - "strconv" + "time" "golang.org/x/net/context" @@ -12,27 +11,18 @@ import ( "github.com/yandex/pandora/config" ) -type LogGun struct{} - -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { - log.Println("Log message: ", a.(*ammo.Log).Message) - results <- &aggregate.Sample{} - return nil +type LogGun struct { + results chan<- *aggregate.Sample } -type DummySample struct { - value int +func (l *LogGun) BindResultsTo(results chan<- *aggregate.Sample) { + l.results = results } -func (ds *DummySample) String() string { - return fmt.Sprintf("My value is %d", ds.value) -} - -func (ds *DummySample) AppendToPhout(dst []byte) []byte { - dst = append(dst, "My value is "...) - dst = strconv.AppendInt(dst, int64(ds.value), 10) - dst = append(dst, '\n') - return dst +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo) error { + log.Println("Log message: ", a.(*ammo.Log).Message) + l.results <- aggregate.AcquireSample(float64(time.Now().UnixNano())/1e9, "REQUEST") + return nil } func NewLogGunFromConfig(c *config.Gun) (g Gun, err error) { diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index fb1607542..d78cb31c1 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -20,14 +20,19 @@ import ( ) type SpdyGun struct { - pingPeriod time.Duration - target string - client *spdy.Client + target string + client *spdy.Client + results chan<- *aggregate.Sample + closed bool } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { +func (sg *SpdyGun) BindResultsTo(results chan<- *aggregate.Sample) { + sg.results = results +} + +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo) error { if sg.client == nil { - if err := sg.Connect(results); err != nil { + if err := sg.Connect(); err != nil { return err } } @@ -36,7 +41,7 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggre ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) - results <- ss + sg.results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) @@ -79,18 +84,19 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggre } func (sg *SpdyGun) Close() { + sg.closed = true if sg.client != nil { sg.client.Close() } } -func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error { +func (sg *SpdyGun) Connect() error { // FIXME: rewrite connection logic, it isn't thread safe right now. start := time.Now() ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) - results <- ss + sg.results <- ss }() config := tls.Config{ InsecureSkipVerify: true, @@ -118,7 +124,7 @@ func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error { return nil } -func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { +func (sg *SpdyGun) Ping() { if sg.client == nil { return } @@ -140,9 +146,22 @@ func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { ss.Err = err ss.ProtoCode = 500 } - results <- ss + sg.results <- ss if err != nil { - sg.Connect(results) + sg.Connect() + } +} + +func (sg *SpdyGun) startAutoPing(pingPeriod time.Duration) { + if pingPeriod > 0 { + go func() { + for range time.NewTicker(pingPeriod).C { + if sg.closed { + return + } + sg.Ping() + } + }() } } @@ -171,23 +190,12 @@ func New(c *config.Gun) (gun.Gun, error) { switch t := target.(type) { case string: g = &SpdyGun{ - pingPeriod: pingPeriod, - target: target.(string), + target: target.(string), } default: return nil, fmt.Errorf("Target is of the wrong type."+ " Expected 'string' got '%T'", t) } - // TODO: implement this logic somewhere - // if pingPeriod > 0 { - // go func() { - // for range time.NewTicker(pingPeriod).C { - // if g.closed { - // return - // } - // g.Ping(results) - // } - // }() - // } + g.(*SpdyGun).startAutoPing(pingPeriod) return g, nil } diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 7ba67bad0..c33482aa8 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" + "github.com/yandex/pandora/config" "github.com/yandex/pandora/utils" "golang.org/x/net/context" ) @@ -24,8 +25,8 @@ func TestSpdyGun(t *testing.T) { result := make(chan *aggregate.Sample) gun := &SpdyGun{ - target: "localhost:3000", - pingPeriod: time.Second * 5, + target: "localhost:3000", + results: result, } promise := utils.Promise(func() error { defer gun.Close() @@ -40,7 +41,7 @@ func TestSpdyGun(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) @@ -76,16 +77,16 @@ func TestSpdyConnectPing(t *testing.T) { result := make(chan *aggregate.Sample) gun := &SpdyGun{ - target: "localhost:3000", - pingPeriod: time.Second * 5, + target: "localhost:3000", + results: result, } promise := utils.Promise(func() error { defer gun.Close() defer close(result) - if err := gun.Connect(result); err != nil { + if err := gun.Connect(); err != nil { return err } - gun.Ping(result) + gun.Ping() return nil }) @@ -112,6 +113,30 @@ func TestSpdyConnectPing(t *testing.T) { } +func TestNewSpdyGun(t *testing.T) { + spdyConfig := &config.Gun{ + GunType: "spdy", + Parameters: map[string]interface{}{ + "Target": "localhost:3000", + "PingPeriod": 5.0, + }, + } + g, err := New(spdyConfig) + assert.NoError(t, err) + _, ok := g.(*SpdyGun) + assert.Equal(t, true, ok) + + failSpdyConfig := &config.Gun{ + GunType: "spdy", + Parameters: map[string]interface{}{ + "Target": "localhost:3000", + "PingPeriod": "not-a-number", + }, + } + _, err = New(failSpdyConfig) + assert.Error(t, err) +} + func runSpdyTestServer() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain")