From e516f262bf1be4bd286dd35c88f815015797a288 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Mon, 11 Jan 2016 17:26:16 +0300 Subject: [PATCH 01/11] interface instead of a function --- ammo/ammo.go | 6 ++++-- ammo/dummy.go | 12 +++++++++--- ammo/http.go | 12 +++++++++--- ammo/http_test.go | 5 +++-- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/ammo/ammo.go b/ammo/ammo.go index 152c32c12..b7b2c801f 100644 --- a/ammo/ammo.go +++ b/ammo/ammo.go @@ -14,7 +14,9 @@ type BaseProvider struct { type Ammo interface{} -type Decoder func([]byte) (Ammo, error) +type Decoder interface { + Decode([]byte) (Ammo, error) +} func NewBaseProvider(source <-chan Ammo, decoder Decoder) *BaseProvider { return &BaseProvider{ @@ -28,7 +30,7 @@ func (ap *BaseProvider) Source() <-chan Ammo { } func (ap *BaseProvider) Decode(src []byte) (Ammo, error) { - return ap.decoder(src) + return ap.decoder.Decode(src) } // Drain reads all ammos from ammo.Provider. Useful for tests. diff --git a/ammo/dummy.go b/ammo/dummy.go index e39df0b53..40a610a3b 100644 --- a/ammo/dummy.go +++ b/ammo/dummy.go @@ -13,13 +13,19 @@ type Log struct { Message string } -// LogJSONDecode implements ammo.Decoder interface -func LogJSONDecode(jsonDoc []byte) (Ammo, error) { +// LogJSONDecoder implements ammo.Decoder interface +type LogJSONDecoder struct{} + +func (*LogJSONDecoder) Decode(jsonDoc []byte) (Ammo, error) { a := &Log{} err := json.Unmarshal(jsonDoc, a) return a, err } +func NewLogJSONDecoder() Decoder { + return &LogJSONDecoder{} +} + type LogProvider struct { *BaseProvider @@ -52,7 +58,7 @@ func NewLogAmmoProvider(c *config.AmmoProvider) (Provider, error) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - LogJSONDecode, + NewLogJSONDecoder(), ), } return ap, nil diff --git a/ammo/http.go b/ammo/http.go index 23531b173..6f028abfb 100644 --- a/ammo/http.go +++ b/ammo/http.go @@ -33,13 +33,19 @@ func (h *Http) Request() (*http.Request, error) { return req, err } -// HttpJSONDecode implements ammo.Decoder interface -func HttpJSONDecode(jsonDoc []byte) (Ammo, error) { +// HttpJSONDecoder implements ammo.Decoder interface +type HttpJSONDecoder struct{} + +func (*HttpJSONDecoder) Decode(jsonDoc []byte) (Ammo, error) { a := &Http{} err := a.UnmarshalJSON(jsonDoc) return a, err } +func NewHttpJSONDecoder() Decoder { + return &HttpJSONDecoder{} +} + // ffjson: skip type HttpProvider struct { *BaseProvider @@ -100,7 +106,7 @@ func NewHttpProvider(c *config.AmmoProvider) (Provider, error) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - HttpJSONDecode, + NewHttpJSONDecoder(), ), } return ap, nil diff --git a/ammo/http_test.go b/ammo/http_test.go index 14e106ed8..49f6b00b9 100644 --- a/ammo/http_test.go +++ b/ammo/http_test.go @@ -51,7 +51,7 @@ func TestHttpProvider(t *testing.T) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - HttpJSONDecode, + NewHttpJSONDecoder(), ), } promise := utils.PromiseCtx(providerCtx, provider.Start) @@ -82,6 +82,7 @@ var result Ammo func BenchmarkJsonDecoder(b *testing.B) { f, err := os.Open(httpTestFilename) + decoder := NewHttpJSONDecoder() if err != nil { b.Fatal(err) } @@ -93,7 +94,7 @@ func BenchmarkJsonDecoder(b *testing.B) { } var a Ammo for n := 0; n < b.N; n++ { - a, _ = HttpJSONDecode(jsonDoc) + a, _ = decoder.Decode(jsonDoc) } result = a } From 4bd308aa764d39be5dc75b0e0c2ea93b46c85043 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Mon, 11 Jan 2016 17:53:28 +0300 Subject: [PATCH 02/11] benchmark pooled ammo provider --- ammo/http.go | 23 +++++++++++++++++++---- ammo/http_test.go | 4 ++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/ammo/http.go b/ammo/http.go index 6f028abfb..bee26098f 100644 --- a/ammo/http.go +++ b/ammo/http.go @@ -8,6 +8,7 @@ import ( "log" "net/http" "os" + "sync" "github.com/yandex/pandora/config" "golang.org/x/net/context" @@ -34,16 +35,30 @@ func (h *Http) Request() (*http.Request, error) { } // HttpJSONDecoder implements ammo.Decoder interface -type HttpJSONDecoder struct{} +type HttpJSONDecoder struct{ + pool sync.Pool +} -func (*HttpJSONDecoder) Decode(jsonDoc []byte) (Ammo, error) { - a := &Http{} +func (d *HttpJSONDecoder) Decode(jsonDoc []byte) (Ammo, error) { + a := d.pool.Get().(*Http) err := a.UnmarshalJSON(jsonDoc) return a, err } +// be polite and return unused Ammo to the pool +// be shure that you return Http because we don't make any checks here +func (d *HttpJSONDecoder) Release(a Ammo) { + d.pool.Put(a) +} + func NewHttpJSONDecoder() Decoder { - return &HttpJSONDecoder{} + return &HttpJSONDecoder{ + pool: sync.Pool{ + New: func() (interface{}){ + return &Http{} + }, + }, + } } // ffjson: skip diff --git a/ammo/http_test.go b/ammo/http_test.go index 49f6b00b9..1e978b55f 100644 --- a/ammo/http_test.go +++ b/ammo/http_test.go @@ -82,7 +82,7 @@ var result Ammo func BenchmarkJsonDecoder(b *testing.B) { f, err := os.Open(httpTestFilename) - decoder := NewHttpJSONDecoder() + decoder := NewHttpJSONDecoder().(*HttpJSONDecoder) if err != nil { b.Fatal(err) } @@ -95,6 +95,6 @@ func BenchmarkJsonDecoder(b *testing.B) { var a Ammo for n := 0; n < b.N; n++ { a, _ = decoder.Decode(jsonDoc) + decoder.Release(a) } - result = a } From 8a75e221ad87f2e853ed139261b187b0e7542b61 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Mon, 11 Jan 2016 18:02:36 +0300 Subject: [PATCH 03/11] formatting --- ammo/http.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ammo/http.go b/ammo/http.go index bee26098f..c9e982731 100644 --- a/ammo/http.go +++ b/ammo/http.go @@ -35,7 +35,7 @@ func (h *Http) Request() (*http.Request, error) { } // HttpJSONDecoder implements ammo.Decoder interface -type HttpJSONDecoder struct{ +type HttpJSONDecoder struct { pool sync.Pool } @@ -54,7 +54,7 @@ func (d *HttpJSONDecoder) Release(a Ammo) { func NewHttpJSONDecoder() Decoder { return &HttpJSONDecoder{ pool: sync.Pool{ - New: func() (interface{}){ + New: func() interface{} { return &Http{} }, }, From bb966813bb13dcaf06a3bdfcb9e0260c298d7105 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Wed, 13 Jan 2016 14:26:17 +0300 Subject: [PATCH 04/11] object pool for AmmoProviders --- ammo/ammo.go | 22 +++++++++++++++++----- ammo/dummy.go | 6 +++--- ammo/http.go | 32 ++++++-------------------------- ammo/http_test.go | 34 ++++++++++++++++++++++++++++++---- 4 files changed, 56 insertions(+), 38 deletions(-) diff --git a/ammo/ammo.go b/ammo/ammo.go index b7b2c801f..f8905ab2d 100644 --- a/ammo/ammo.go +++ b/ammo/ammo.go @@ -1,27 +1,34 @@ package ammo -import "golang.org/x/net/context" +import ( + "sync" + + "golang.org/x/net/context" +) type Provider interface { Start(context.Context) error Source() <-chan Ammo + Release(Ammo) // return unused Ammo object to memory pool } type BaseProvider struct { decoder Decoder source <-chan Ammo + pool sync.Pool } type Ammo interface{} type Decoder interface { - Decode([]byte) (Ammo, error) + Decode([]byte, Ammo) (Ammo, error) } -func NewBaseProvider(source <-chan Ammo, decoder Decoder) *BaseProvider { +func NewBaseProvider(source <-chan Ammo, decoder Decoder, New func() interface{}) *BaseProvider { return &BaseProvider{ source: source, decoder: decoder, + pool: sync.Pool{New: New}, } } @@ -29,8 +36,13 @@ func (ap *BaseProvider) Source() <-chan Ammo { return ap.source } -func (ap *BaseProvider) Decode(src []byte) (Ammo, error) { - return ap.decoder.Decode(src) +func (ap *BaseProvider) Release(a Ammo) { + ap.pool.Put(a) +} + +func (ap *BaseProvider) decode(src []byte) (Ammo, error) { + a := ap.pool.Get() + return ap.decoder.Decode(src, a) } // Drain reads all ammos from ammo.Provider. Useful for tests. diff --git a/ammo/dummy.go b/ammo/dummy.go index 40a610a3b..e3ba236cd 100644 --- a/ammo/dummy.go +++ b/ammo/dummy.go @@ -16,8 +16,7 @@ type Log struct { // LogJSONDecoder implements ammo.Decoder interface type LogJSONDecoder struct{} -func (*LogJSONDecoder) Decode(jsonDoc []byte) (Ammo, error) { - a := &Log{} +func (*LogJSONDecoder) Decode(jsonDoc []byte, a Ammo) (Ammo, error) { err := json.Unmarshal(jsonDoc, a) return a, err } @@ -37,7 +36,7 @@ func (ap *LogProvider) Start(ctx context.Context) error { defer close(ap.sink) loop: for i := 0; i < ap.size; i++ { - if a, err := ap.Decode([]byte(fmt.Sprintf(`{"message": "Job #%d"}`, i))); err == nil { + if a, err := ap.decode([]byte(fmt.Sprintf(`{"message": "Job #%d"}`, i))); err == nil { select { case ap.sink <- a: case <-ctx.Done(): @@ -59,6 +58,7 @@ func NewLogAmmoProvider(c *config.AmmoProvider) (Provider, error) { BaseProvider: NewBaseProvider( ammoCh, NewLogJSONDecoder(), + func() interface{} { return &Log{} }, ), } return ap, nil diff --git a/ammo/http.go b/ammo/http.go index c9e982731..671cdf09f 100644 --- a/ammo/http.go +++ b/ammo/http.go @@ -8,7 +8,6 @@ import ( "log" "net/http" "os" - "sync" "github.com/yandex/pandora/config" "golang.org/x/net/context" @@ -24,7 +23,6 @@ type Http struct { } func (h *Http) Request() (*http.Request, error) { - // FIXME: something wrong here with https req, err := http.NewRequest(h.Method, "http://"+h.Host+h.Uri, nil) if err == nil { for k, v := range h.Headers { @@ -35,32 +33,13 @@ func (h *Http) Request() (*http.Request, error) { } // HttpJSONDecoder implements ammo.Decoder interface -type HttpJSONDecoder struct { - pool sync.Pool -} +type HttpJSONDecoder struct{} -func (d *HttpJSONDecoder) Decode(jsonDoc []byte) (Ammo, error) { - a := d.pool.Get().(*Http) - err := a.UnmarshalJSON(jsonDoc) +func (d *HttpJSONDecoder) Decode(jsonDoc []byte, a Ammo) (Ammo, error) { + err := a.(*Http).UnmarshalJSON(jsonDoc) return a, err } -// be polite and return unused Ammo to the pool -// be shure that you return Http because we don't make any checks here -func (d *HttpJSONDecoder) Release(a Ammo) { - d.pool.Put(a) -} - -func NewHttpJSONDecoder() Decoder { - return &HttpJSONDecoder{ - pool: sync.Pool{ - New: func() interface{} { - return &Http{} - }, - }, - } -} - // ffjson: skip type HttpProvider struct { *BaseProvider @@ -87,7 +66,7 @@ loop: scanner.Split(bufio.ScanLines) for scanner.Scan() && (ap.ammoLimit == 0 || ammoNumber < ap.ammoLimit) { data := scanner.Bytes() - if a, err := ap.Decode(data); err != nil { + if a, err := ap.decode(data); err != nil { return fmt.Errorf("failed to decode ammo: %v", err) } else { ammoNumber++ @@ -121,7 +100,8 @@ func NewHttpProvider(c *config.AmmoProvider) (Provider, error) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - NewHttpJSONDecoder(), + &HttpJSONDecoder{}, + func() interface{} { return &Http{} }, ), } return ap, nil diff --git a/ammo/http_test.go b/ammo/http_test.go index 1e978b55f..5c2aa6882 100644 --- a/ammo/http_test.go +++ b/ammo/http_test.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "os" + "sync" "testing" "time" @@ -51,7 +52,8 @@ func TestHttpProvider(t *testing.T) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - NewHttpJSONDecoder(), + &HttpJSONDecoder{}, + func() interface{} { return &Http{} }, ), } promise := utils.PromiseCtx(providerCtx, provider.Start) @@ -82,7 +84,7 @@ var result Ammo func BenchmarkJsonDecoder(b *testing.B) { f, err := os.Open(httpTestFilename) - decoder := NewHttpJSONDecoder().(*HttpJSONDecoder) + decoder := &HttpJSONDecoder{} if err != nil { b.Fatal(err) } @@ -94,7 +96,31 @@ func BenchmarkJsonDecoder(b *testing.B) { } var a Ammo for n := 0; n < b.N; n++ { - a, _ = decoder.Decode(jsonDoc) - decoder.Release(a) + a, _ = decoder.Decode(jsonDoc, &Http{}) } + _ = a +} + +func BenchmarkJsonDecoderWithPool(b *testing.B) { + f, err := os.Open(httpTestFilename) + decoder := &HttpJSONDecoder{} + if err != nil { + b.Fatal(err) + } + defer f.Close() + r := bufio.NewReader(f) + jsonDoc, isPrefix, err := r.ReadLine() + if err != nil || isPrefix { + b.Fatal(errors.New("Couldn't properly read ammo sample from data file")) + } + var a Ammo + pool := sync.Pool{ + New: func() interface{} { return &Http{} }, + } + for n := 0; n < b.N; n++ { + h := pool.Get().(*Http) + a, _ = decoder.Decode(jsonDoc, h) + pool.Put(h) + } + _ = a } From e9eceda29f957a76f38da77a5689d14690018cb2 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Wed, 13 Jan 2016 14:43:13 +0300 Subject: [PATCH 05/11] release ammo after it was shooted --- engine/user.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/user.go b/engine/user.go index b39e1f41d..83e467a85 100644 --- a/engine/user.go +++ b/engine/user.go @@ -33,14 +33,15 @@ func (u *User) Run(ctx context.Context) error { loop: for { select { - case j, more := <-source: + case ammo, more := <-source: if !more { log.Println("Ammo ended") break loop } _, more = <-control if more { - u.Gun.Shoot(ctx, j, sink) + u.Gun.Shoot(ctx, ammo, sink) + u.Ammunition.Release(ammo) } else { log.Println("Limiter ended.") break loop From c497a46e07bbce2e7c53f8392aa8fae8241ce786 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Wed, 13 Jan 2016 15:24:57 +0300 Subject: [PATCH 06/11] move request convertation implementation to guns --- ammo/http.go | 11 ----------- gun/http/http.go | 16 ++++++++++------ gun/spdy/spdy.go | 4 +++- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/ammo/http.go b/ammo/http.go index 671cdf09f..b93227e48 100644 --- a/ammo/http.go +++ b/ammo/http.go @@ -6,7 +6,6 @@ import ( "bufio" "fmt" "log" - "net/http" "os" "github.com/yandex/pandora/config" @@ -22,16 +21,6 @@ type Http struct { Tag string } -func (h *Http) Request() (*http.Request, error) { - req, err := http.NewRequest(h.Method, "http://"+h.Host+h.Uri, nil) - if err == nil { - for k, v := range h.Headers { - req.Header.Set(k, v) - } - } - return req, err -} - // HttpJSONDecoder implements ammo.Decoder interface type HttpJSONDecoder struct{} diff --git a/gun/http/http.go b/gun/http/http.go index 4bc8e1bb6..55705d96d 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -57,18 +57,22 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, if ha.Tag != "" { ss.tag += "|" + ha.Tag } - req, err := ha.Request() + var uri string + if hg.ssl { + uri = "https://" + ha.Host + ha.Uri + } else { + uri = "http://" + ha.Host + ha.Uri + } + req, err := http.NewRequest(ha.Method, uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) ss.err = err return err } - req.URL.Host = hg.target - if hg.ssl { - req.URL.Scheme = "https" - } else { - req.URL.Scheme = "http" + for k, v := range ha.Headers { + req.Header.Set(k, v) } + req.URL.Host = hg.target res, err := hg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index e1614b668..cd328bc9d 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "log" + "net/http" "time" "github.com/amahi/spdy" @@ -55,7 +56,8 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggreg if ha.Tag != "" { ss.tag += "|" + ha.Tag } - req, err := ha.Request() + + req, err := http.NewRequest(ha.Method, "https://"+ha.Host+ha.Uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) ss.err = err From 43f7507d69148a837ebb049269618fa7620c09dd Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Wed, 13 Jan 2016 17:40:05 +0300 Subject: [PATCH 07/11] use byte buffer for phantom phout --- aggregate/phout.go | 42 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/aggregate/phout.go b/aggregate/phout.go index 12d67e900..3acaae47d 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -4,12 +4,18 @@ import ( "bufio" "fmt" "os" + "strconv" "time" "github.com/yandex/pandora/config" "golang.org/x/net/context" ) +const ( + phoutDelimiter = '\t' + phoutLineEnd = '\n' +) + type PhoutSample struct { TS float64 Tag string @@ -42,6 +48,34 @@ func (ps *PhoutSample) String() string { ) } +func (ps *PhoutSample) AppendTo(dst []byte) []byte { + dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64) + dst = append(dst, phoutDelimiter) + dst = append(dst, ps.Tag...) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.RT), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Connect), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Send), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Latency), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Receive), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Egress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Igress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.NetCode), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10) + dst = append(dst, phoutLineEnd) + return dst +} + type PhantomCompatible interface { Sample PhoutSample() *PhoutSample @@ -52,12 +86,15 @@ type PhoutResultListener struct { source <-chan Sample phout *bufio.Writer + buffer []byte } func (rl *PhoutResultListener) handle(r Sample) error { pc, ok := r.(PhantomCompatible) if ok { - _, err := rl.phout.WriteString(fmt.Sprintf("%s\n", pc.PhoutSample())) + rl.buffer = pc.PhoutSample().AppendTo(rl.buffer) + _, err := rl.phout.Write(rl.buffer) + rl.buffer = rl.buffer[:0] if err != nil { return err } @@ -115,7 +152,8 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) { resultListener: resultListener{ sink: ch, }, - phout: writer, + phout: writer, + buffer: make([]byte, 0, 1024), }, nil } From 09182639eba8b5e9114908870391394815c62782 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Wed, 13 Jan 2016 21:55:59 +0300 Subject: [PATCH 08/11] refactor aggregator --- aggregate/aggregate.go | 18 ++++--- aggregate/logger.go | 22 ++++----- aggregate/phout.go | 96 +++++--------------------------------- aggregate/sample.go | 83 +++++++++++++++++++++++++++++++++ gun/gun.go | 3 +- gun/http/http.go | 65 ++++++-------------------- gun/http/http_test.go | 22 ++++----- gun/log.go | 16 ++++--- gun/spdy/spdy.go | 103 ++++++++++++++--------------------------- gun/spdy/spdy_test.go | 24 +++++----- 10 files changed, 198 insertions(+), 254 deletions(-) create mode 100644 aggregate/sample.go diff --git a/aggregate/aggregate.go b/aggregate/aggregate.go index a97b26dd0..330bd7222 100644 --- a/aggregate/aggregate.go +++ b/aggregate/aggregate.go @@ -2,17 +2,21 @@ package aggregate import "golang.org/x/net/context" -type Sample interface { - String() string -} - type ResultListener interface { Start(context.Context) error - Sink() chan<- Sample + Sink() chan<- interface{} +} + +type resultListener struct { + sink chan<- interface{} +} + +func (rl *resultListener) Sink() chan<- interface{} { + return rl.sink } -func Drain(ctx context.Context, results <-chan Sample) []Sample { - samples := []Sample{} +func Drain(ctx context.Context, results <-chan interface{}) []interface{} { + samples := []interface{}{} loop: for { select { diff --git a/aggregate/logger.go b/aggregate/logger.go index 020383ae5..221ab9dfc 100644 --- a/aggregate/logger.go +++ b/aggregate/logger.go @@ -1,29 +1,27 @@ package aggregate import ( + "fmt" "log" "github.com/yandex/pandora/config" "golang.org/x/net/context" ) -type resultListener struct { - sink chan<- Sample -} - -func (rl *resultListener) Sink() chan<- Sample { - return rl.sink -} - // Implements ResultListener interface type LoggingResultListener struct { resultListener - source <-chan Sample + source <-chan interface{} } -func (rl *LoggingResultListener) handle(r Sample) { - log.Println(r) +func (rl *LoggingResultListener) handle(r interface{}) { + r, ok := r.(fmt.Stringer) + if !ok { + log.Println("Can't convert result to String") + } else { + log.Println(r) + } } func (rl *LoggingResultListener) Start(ctx context.Context) error { @@ -48,7 +46,7 @@ loop: } func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) { - ch := make(chan Sample, 32) + ch := make(chan interface{}, 32) return &LoggingResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/phout.go b/aggregate/phout.go index 3acaae47d..2486549ce 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -2,106 +2,34 @@ package aggregate import ( "bufio" - "fmt" "os" - "strconv" "time" "github.com/yandex/pandora/config" "golang.org/x/net/context" ) -const ( - phoutDelimiter = '\t' - phoutLineEnd = '\n' -) - -type PhoutSample struct { - TS float64 - Tag string - RT int - Connect int - Send int - Latency int - Receive int - IntervalEvent int - Egress int - Igress int - NetCode int - ProtoCode int -} - -func (ps *PhoutSample) String() string { - return fmt.Sprintf( - "%.3f\t%s\t%d\t"+ - "%d\t%d\t"+ - "%d\t%d\t"+ - "%d\t"+ - "%d\t%d\t"+ - "%d\t%d", - ps.TS, ps.Tag, ps.RT, - ps.Connect, ps.Send, - ps.Latency, ps.Receive, - ps.IntervalEvent, - ps.Egress, ps.Igress, - ps.NetCode, ps.ProtoCode, - ) -} - -func (ps *PhoutSample) AppendTo(dst []byte) []byte { - dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64) - dst = append(dst, phoutDelimiter) - dst = append(dst, ps.Tag...) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.RT), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Connect), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Send), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Latency), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Receive), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Egress), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Igress), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.NetCode), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10) - dst = append(dst, phoutLineEnd) - return dst -} - -type PhantomCompatible interface { - Sample - PhoutSample() *PhoutSample +type PhoutSerializable interface { + AppendToPhout([]byte) []byte } type PhoutResultListener struct { resultListener - source <-chan Sample + source <-chan interface{} phout *bufio.Writer buffer []byte } -func (rl *PhoutResultListener) handle(r Sample) error { - pc, ok := r.(PhantomCompatible) - if ok { - rl.buffer = pc.PhoutSample().AppendTo(rl.buffer) - _, err := rl.phout.Write(rl.buffer) - rl.buffer = rl.buffer[:0] - if err != nil { - return err - } - } else { - return fmt.Errorf("Not phantom compatible sample") +func (rl *PhoutResultListener) handle(s interface{}) error { + ps, ok := s.(PhoutSerializable) + if !ok { + panic("Result sample is not PhoutSerializable") } - return nil + rl.buffer = ps.AppendToPhout(rl.buffer) + _, err := rl.phout.Write(rl.buffer) + rl.buffer = rl.buffer[:0] + return err } func (rl *PhoutResultListener) Start(ctx context.Context) error { @@ -146,7 +74,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) { phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666) } writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB - ch := make(chan Sample, 65536) + ch := make(chan interface{}, 65536) return &PhoutResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/sample.go b/aggregate/sample.go new file mode 100644 index 000000000..71a07f3cb --- /dev/null +++ b/aggregate/sample.go @@ -0,0 +1,83 @@ +package aggregate + +import ( + "fmt" + "strconv" + "sync" +) + +const ( + phoutDelimiter = '\t' + phoutNewLine = '\n' +) + +var samplePool = &sync.Pool{New: func() interface{} { return &Sample{} }} + +type Sample struct { + TS float64 + Tag string + RT int + Connect int + Send int + Latency int + Receive int + IntervalEvent int + Egress int + Igress int + NetCode int + ProtoCode int + Err error +} + +func AcquireSample() *Sample { + return samplePool.Get().(*Sample) +} + +func ReleaseSample(s *Sample) { + samplePool.Put(s) +} + +func (ps *Sample) String() string { + return fmt.Sprintf( + "%.3f\t%s\t%d\t"+ + "%d\t%d\t"+ + "%d\t%d\t"+ + "%d\t"+ + "%d\t%d\t"+ + "%d\t%d", + ps.TS, ps.Tag, ps.RT, + ps.Connect, ps.Send, + ps.Latency, ps.Receive, + ps.IntervalEvent, + ps.Egress, ps.Igress, + ps.NetCode, ps.ProtoCode, + ) +} + +func (ps *Sample) AppendToPhout(dst []byte) []byte { + dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64) + dst = append(dst, phoutDelimiter) + dst = append(dst, ps.Tag...) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.RT), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Connect), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Send), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Latency), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Receive), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Egress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Igress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.NetCode), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10) + dst = append(dst, phoutNewLine) + return dst +} diff --git a/gun/gun.go b/gun/gun.go index 95a898549..63e8b076e 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -1,11 +1,10 @@ package gun import ( - "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" "golang.org/x/net/context" ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- aggregate.Sample) error + Shoot(context.Context, ammo.Ammo, chan<- interface{}) error } diff --git a/gun/http/http.go b/gun/http/http.go index 55705d96d..3b8236b77 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -35,27 +35,25 @@ type HttpGun struct { // Shoot to target, this method is not thread safe func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- aggregate.Sample) error { + results chan<- interface{}) error { if hg.client == nil { hg.Connect(results) } start := time.Now() - ss := &HttpSample{ts: float64(start.UnixNano()) / 1e9, tag: "REQUEST"} + // TODO: acquire/release + ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} defer func() { - ss.rt = int(time.Since(start).Seconds() * 1e6) + ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) if !ok { - errStr := fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a) - log.Println(errStr) - ss.err = errors.New(errStr) - return ss.err + panic(fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a)) } if ha.Tag != "" { - ss.tag += "|" + ha.Tag + ss.Tag += "|" + ha.Tag } var uri string if hg.ssl { @@ -66,7 +64,8 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, req, err := http.NewRequest(ha.Method, uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } for k, v := range ha.Headers { @@ -76,14 +75,16 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, res, err := hg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } defer res.Body.Close() _, err = io.Copy(ioutil.Discard, res.Body) if err != nil { log.Printf("Error reading response body: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } @@ -91,7 +92,7 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, //data := make([]byte, int(res.ContentLength)) // _, err = res.Body.(io.Reader).Read(data) // fmt.Println(string(data)) - ss.StatusCode = res.StatusCode + ss.ProtoCode = res.StatusCode return nil } @@ -99,7 +100,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { +func (hg *HttpGun) Connect(results chan<- interface{}) { hg.Close() config := tls.Config{ InsecureSkipVerify: true, @@ -140,44 +141,6 @@ func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { // results <- ss } -type HttpSample struct { - ts float64 // Unix Timestamp in seconds - rt int // response time in milliseconds - StatusCode int // protocol status code - tag string - err error -} - -func (ds *HttpSample) PhoutSample() *aggregate.PhoutSample { - var protoCode, netCode int - if ds.err != nil { - protoCode = 500 - netCode = 999 - log.Printf("Error code. %v\n", ds.err) - } else { - netCode = 0 - protoCode = ds.StatusCode - } - return &aggregate.PhoutSample{ - TS: ds.ts, - Tag: ds.tag, - RT: ds.rt, - Connect: 0, - Send: 0, - Latency: 0, - Receive: 0, - IntervalEvent: 0, - Egress: 0, - Igress: 0, - NetCode: netCode, - ProtoCode: protoCode, - } -} - -func (ds *HttpSample) String() string { - return fmt.Sprintf("rt: %d [%d] %s", ds.rt, ds.StatusCode, ds.tag) -} - func New(c *config.Gun) (gun.Gun, error) { params := c.Parameters if params == nil { diff --git a/gun/http/http_test.go b/gun/http/http_test.go index bf315b598..43e71c150 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -20,7 +20,7 @@ func TestHttpGunWithSsl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan interface{}) requests := make(chan *http.Request) ts := httptest.NewTLSServer( @@ -52,11 +52,10 @@ func TestHttpGunWithSsl(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + sample, casted := (results[0]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "REQUEST", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) select { case r := <-requests: @@ -82,7 +81,7 @@ func TestHttpGunWithHttp(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan interface{}) requests := make(chan *http.Request) ts := httptest.NewServer( @@ -114,11 +113,10 @@ func TestHttpGunWithHttp(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + sample, casted := (results[0]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "REQUEST", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) select { case r := <-requests: diff --git a/gun/log.go b/gun/log.go index 174292f1c..a69813e98 100644 --- a/gun/log.go +++ b/gun/log.go @@ -3,6 +3,7 @@ package gun import ( "fmt" "log" + "strconv" "golang.org/x/net/context" @@ -13,9 +14,9 @@ import ( type LogGun struct{} -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggregate.Sample) error { +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { log.Println("Log message: ", a.(*ammo.Log).Message) - results <- &DummySample{0} + results <- &aggregate.Sample{} return nil } @@ -23,14 +24,17 @@ type DummySample struct { value int } -func (ds *DummySample) PhoutSample() *aggregate.PhoutSample { - return &aggregate.PhoutSample{} -} - func (ds *DummySample) String() string { return fmt.Sprintf("My value is %d", ds.value) } +func (ds *DummySample) AppendToPhout(dst []byte) []byte { + dst = append(dst, "My value is "...) + dst = strconv.AppendInt(dst, int64(ds.value), 10) + dst = append(dst, '\n') + return dst +} + func NewLogGunFromConfig(c *config.Gun) (g Gun, err error) { return &LogGun{}, nil } diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index cd328bc9d..3032cfe77 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -25,7 +25,7 @@ type SpdyGun struct { client *spdy.Client } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggregate.Sample) error { +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { if sg.client == nil { if err := sg.connect(results); err != nil { return err @@ -40,33 +40,32 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggreg }() } start := time.Now() - ss := &SpdySample{ts: float64(start.UnixNano()) / 1e9, tag: "REQUEST"} + ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} defer func() { - ss.rt = int(time.Since(start).Seconds() * 1e6) + ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) if !ok { - errStr := fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a) - log.Println(errStr) - ss.err = errors.New(errStr) - return ss.err + panic(fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a)) } if ha.Tag != "" { - ss.tag += "|" + ha.Tag + ss.Tag += "|" + ha.Tag } req, err := http.NewRequest(ha.Method, "https://"+ha.Host+ha.Uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } res, err := sg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } @@ -74,14 +73,15 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggreg _, err = io.Copy(ioutil.Discard, res.Body) if err != nil { log.Printf("Error reading response body: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } // TODO: make this an optional verbose answ_log output //data := make([]byte, int(res.ContentLength)) // _, err = res.Body.(io.Reader).Read(data) // fmt.Println(string(data)) - ss.StatusCode = res.StatusCode + ss.ProtoCode = res.StatusCode return err } @@ -91,36 +91,41 @@ func (sg *SpdyGun) Close() { } } -func (sg *SpdyGun) connect(results chan<- aggregate.Sample) error { +func (sg *SpdyGun) connect(results chan<- interface{}) error { // FIXME: rewrite connection logic, it isn't thread safe right now. - connectStart := time.Now() + start := time.Now() + ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "CONNECT"} + defer func() { + ss.RT = int(time.Since(start).Seconds() * 1e6) + results <- ss + }() config := tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"spdy/3.1"}, } conn, err := tls.Dial("tcp", sg.target, &config) if err != nil { - return fmt.Errorf("client: dial: %s\n", err) + ss.Err = err + ss.NetCode = 999 + return err } client, err := spdy.NewClientConn(conn) if err != nil { - return fmt.Errorf("client: connect: %v\n", err) + ss.Err = err + ss.NetCode = 999 + return err + } else { + ss.ProtoCode = 200 } if sg.client != nil { sg.Close() } sg.client = client - ss := &SpdySample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"} - ss.rt = int(time.Since(connectStart).Seconds() * 1e6) - ss.err = err - if ss.err == nil { - ss.StatusCode = 200 - } - results <- ss + return nil } -func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { +func (sg *SpdyGun) Ping(results chan<- interface{}) { if sg.client == nil { return } @@ -133,13 +138,14 @@ func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { if !pinged { log.Printf("client: ping: timed out\n") } - ss := &SpdySample{ts: float64(pingStart.UnixNano()) / 1e9, tag: "PING"} - ss.rt = int(time.Since(pingStart).Seconds() * 1e6) - ss.err = err - if ss.err == nil && pinged { - ss.StatusCode = 200 + ss := &aggregate.Sample{TS: float64(pingStart.UnixNano()) / 1e9, Tag: "PING"} + ss.RT = int(time.Since(pingStart).Seconds() * 1e6) + + if err == nil && pinged { + ss.ProtoCode = 200 } else { - ss.StatusCode = 500 + ss.Err = err + ss.ProtoCode = 500 } results <- ss if err != nil { @@ -147,43 +153,6 @@ func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { } } -type SpdySample struct { - ts float64 // Unix Timestamp in seconds - rt int // response time in milliseconds - StatusCode int // protocol status code - tag string - err error -} - -func (ds *SpdySample) PhoutSample() *aggregate.PhoutSample { - var protoCode, netCode int - if ds.err != nil { - protoCode = 500 - netCode = 999 - } else { - netCode = 0 - protoCode = ds.StatusCode - } - return &aggregate.PhoutSample{ - TS: ds.ts, - Tag: ds.tag, - RT: ds.rt, - Connect: 0, - Send: 0, - Latency: 0, - Receive: 0, - IntervalEvent: 0, - Egress: 0, - Igress: 0, - NetCode: netCode, - ProtoCode: protoCode, - } -} - -func (ds *SpdySample) String() string { - return fmt.Sprintf("rt: %d [%d] %s", ds.rt, ds.StatusCode, ds.tag) -} - func New(c *config.Gun) (gun.Gun, error) { params := c.Parameters if params == nil { diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 4d93e95e2..07bf631dc 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/SlyMarbo/spdy" // we specially use spdy server from another library - "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/yandex/pandora/aggregate" @@ -22,7 +21,7 @@ func TestSpdyGun(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan interface{}) gun := &SpdyGun{ target: "localhost:3000", @@ -47,20 +46,19 @@ func TestSpdyGun(t *testing.T) { require.Len(t, results, 2) { // first result is connect - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "CONNECT", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + + sample, casted := (results[0]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "CONNECT", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) } { // second result is request - rPhout, casted := (results[1]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - spew.Dump(phoutSample) - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + + sample, casted := (results[1]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "REQUEST", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) } // TODO: test scenaries with errors From 9afb17d94be8b1cf71ed95120b9d6f329eaa28d7 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Wed, 13 Jan 2016 22:21:01 +0300 Subject: [PATCH 09/11] aggregator sample pooling --- aggregate/aggregate.go | 10 +++++----- aggregate/logger.go | 15 +++++---------- aggregate/phout.go | 18 ++++++------------ aggregate/sample.go | 18 ++++++++++++++++-- gun/gun.go | 3 ++- gun/http/http.go | 9 ++++----- gun/http/http_test.go | 16 ++++++---------- gun/log.go | 2 +- gun/spdy/spdy.go | 12 ++++++------ gun/spdy/spdy_test.go | 14 +++++--------- 10 files changed, 56 insertions(+), 61 deletions(-) diff --git a/aggregate/aggregate.go b/aggregate/aggregate.go index 330bd7222..28dc6e3d4 100644 --- a/aggregate/aggregate.go +++ b/aggregate/aggregate.go @@ -4,19 +4,19 @@ import "golang.org/x/net/context" type ResultListener interface { Start(context.Context) error - Sink() chan<- interface{} + Sink() chan<- *Sample } type resultListener struct { - sink chan<- interface{} + sink chan<- *Sample } -func (rl *resultListener) Sink() chan<- interface{} { +func (rl *resultListener) Sink() chan<- *Sample { return rl.sink } -func Drain(ctx context.Context, results <-chan interface{}) []interface{} { - samples := []interface{}{} +func Drain(ctx context.Context, results <-chan *Sample) []*Sample { + samples := []*Sample{} loop: for { select { diff --git a/aggregate/logger.go b/aggregate/logger.go index 221ab9dfc..d9b60cc38 100644 --- a/aggregate/logger.go +++ b/aggregate/logger.go @@ -1,7 +1,6 @@ package aggregate import ( - "fmt" "log" "github.com/yandex/pandora/config" @@ -12,16 +11,12 @@ import ( type LoggingResultListener struct { resultListener - source <-chan interface{} + source <-chan *Sample } -func (rl *LoggingResultListener) handle(r interface{}) { - r, ok := r.(fmt.Stringer) - if !ok { - log.Println("Can't convert result to String") - } else { - log.Println(r) - } +func (rl *LoggingResultListener) handle(s *Sample) { + log.Println(s) + ReleaseSample(s) } func (rl *LoggingResultListener) Start(ctx context.Context) error { @@ -46,7 +41,7 @@ loop: } func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) { - ch := make(chan interface{}, 32) + ch := make(chan *Sample, 32) return &LoggingResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/phout.go b/aggregate/phout.go index 2486549ce..d1c5aaaae 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -9,26 +9,20 @@ import ( "golang.org/x/net/context" ) -type PhoutSerializable interface { - AppendToPhout([]byte) []byte -} - type PhoutResultListener struct { resultListener - source <-chan interface{} + source <-chan *Sample phout *bufio.Writer buffer []byte } -func (rl *PhoutResultListener) handle(s interface{}) error { - ps, ok := s.(PhoutSerializable) - if !ok { - panic("Result sample is not PhoutSerializable") - } - rl.buffer = ps.AppendToPhout(rl.buffer) +func (rl *PhoutResultListener) handle(s *Sample) error { + + rl.buffer = s.AppendToPhout(rl.buffer) _, err := rl.phout.Write(rl.buffer) rl.buffer = rl.buffer[:0] + ReleaseSample(s) return err } @@ -74,7 +68,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) { phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666) } writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB - ch := make(chan interface{}, 65536) + ch := make(chan *Sample, 65536) return &PhoutResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/sample.go b/aggregate/sample.go index 71a07f3cb..ccca741b9 100644 --- a/aggregate/sample.go +++ b/aggregate/sample.go @@ -29,8 +29,22 @@ type Sample struct { Err error } -func AcquireSample() *Sample { - return samplePool.Get().(*Sample) +func AcquireSample(ts float64, tag string) *Sample { + s := samplePool.Get().(*Sample) + s.TS = ts + s.Tag = tag + s.RT = 0 + s.Connect = 0 + s.Send = 0 + s.Latency = 0 + s.Receive = 0 + s.IntervalEvent = 0 + s.Egress = 0 + s.Igress = 0 + s.NetCode = 0 + s.ProtoCode = 0 + s.Err = nil + return s } func ReleaseSample(s *Sample) { diff --git a/gun/gun.go b/gun/gun.go index 63e8b076e..d66ea56b2 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -1,10 +1,11 @@ package gun import ( + "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" "golang.org/x/net/context" ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- interface{}) error + Shoot(context.Context, ammo.Ammo, chan<- *aggregate.Sample) error } diff --git a/gun/http/http.go b/gun/http/http.go index 3b8236b77..438a0c79d 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -35,14 +35,13 @@ type HttpGun struct { // Shoot to target, this method is not thread safe func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- interface{}) error { + results chan<- *aggregate.Sample) error { if hg.client == nil { hg.Connect(results) } start := time.Now() - // TODO: acquire/release - ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss @@ -100,7 +99,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- interface{}) { +func (hg *HttpGun) Connect(results chan<- *aggregate.Sample) { hg.Close() config := tls.Config{ InsecureSkipVerify: true, @@ -132,7 +131,7 @@ func (hg *HttpGun) Connect(results chan<- interface{}) { // log.Printf("client: connect: %s\n", err) // return // } - // ss := &HttpSample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"} + // ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") // ss.rt = int(time.Since(connectStart).Seconds() * 1e6) // ss.err = err // if ss.err == nil { diff --git a/gun/http/http_test.go b/gun/http/http_test.go index 43e71c150..5a498118e 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -20,7 +20,7 @@ func TestHttpGunWithSsl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan interface{}) + result := make(chan *aggregate.Sample) requests := make(chan *http.Request) ts := httptest.NewTLSServer( @@ -52,10 +52,8 @@ func TestHttpGunWithSsl(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - sample, casted := (results[0]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "REQUEST", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "REQUEST", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) select { case r := <-requests: @@ -81,7 +79,7 @@ func TestHttpGunWithHttp(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan interface{}) + result := make(chan *aggregate.Sample) requests := make(chan *http.Request) ts := httptest.NewServer( @@ -113,10 +111,8 @@ func TestHttpGunWithHttp(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - sample, casted := (results[0]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "REQUEST", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "REQUEST", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) select { case r := <-requests: diff --git a/gun/log.go b/gun/log.go index a69813e98..c6bb44ec7 100644 --- a/gun/log.go +++ b/gun/log.go @@ -14,7 +14,7 @@ import ( type LogGun struct{} -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { log.Println("Log message: ", a.(*ammo.Log).Message) results <- &aggregate.Sample{} return nil diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index 3032cfe77..d99e4e972 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -25,7 +25,7 @@ type SpdyGun struct { client *spdy.Client } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { if sg.client == nil { if err := sg.connect(results); err != nil { return err @@ -40,7 +40,7 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interf }() } start := time.Now() - ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss @@ -91,10 +91,10 @@ func (sg *SpdyGun) Close() { } } -func (sg *SpdyGun) connect(results chan<- interface{}) error { +func (sg *SpdyGun) connect(results chan<- *aggregate.Sample) error { // FIXME: rewrite connection logic, it isn't thread safe right now. start := time.Now() - ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "CONNECT"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss @@ -125,7 +125,7 @@ func (sg *SpdyGun) connect(results chan<- interface{}) error { return nil } -func (sg *SpdyGun) Ping(results chan<- interface{}) { +func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { if sg.client == nil { return } @@ -138,7 +138,7 @@ func (sg *SpdyGun) Ping(results chan<- interface{}) { if !pinged { log.Printf("client: ping: timed out\n") } - ss := &aggregate.Sample{TS: float64(pingStart.UnixNano()) / 1e9, Tag: "PING"} + ss := aggregate.AcquireSample(float64(pingStart.UnixNano())/1e9, "PING") ss.RT = int(time.Since(pingStart).Seconds() * 1e6) if err == nil && pinged { diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 07bf631dc..e843de802 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -21,7 +21,7 @@ func TestSpdyGun(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan interface{}) + result := make(chan *aggregate.Sample) gun := &SpdyGun{ target: "localhost:3000", @@ -47,18 +47,14 @@ func TestSpdyGun(t *testing.T) { { // first result is connect - sample, casted := (results[0]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "CONNECT", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "CONNECT", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) } { // second result is request - sample, casted := (results[1]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "REQUEST", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "REQUEST", results[1].Tag) + assert.Equal(t, 200, results[1].ProtoCode) } // TODO: test scenaries with errors From b1f3ce9dd6862b0de0ded99403ff8b4a9662d9c6 Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Fri, 15 Jan 2016 17:29:22 +0300 Subject: [PATCH 10/11] test connect and ping --- gun/spdy/spdy.go | 26 ++++++++++++++----------- gun/spdy/spdy_test.go | 45 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index d99e4e972..fb1607542 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -27,18 +27,11 @@ type SpdyGun struct { func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { if sg.client == nil { - if err := sg.connect(results); err != nil { + if err := sg.Connect(results); err != nil { return err } } - if sg.pingPeriod > 0 { - pingTimer := time.NewTicker(sg.pingPeriod) - go func() { - for range pingTimer.C { - sg.Ping(results) - } - }() - } + start := time.Now() ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { @@ -91,7 +84,7 @@ func (sg *SpdyGun) Close() { } } -func (sg *SpdyGun) connect(results chan<- *aggregate.Sample) error { +func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error { // FIXME: rewrite connection logic, it isn't thread safe right now. start := time.Now() ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") @@ -149,7 +142,7 @@ func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { } results <- ss if err != nil { - sg.connect(results) + sg.Connect(results) } } @@ -185,5 +178,16 @@ func New(c *config.Gun) (gun.Gun, error) { return nil, fmt.Errorf("Target is of the wrong type."+ " Expected 'string' got '%T'", t) } + // TODO: implement this logic somewhere + // if pingPeriod > 0 { + // go func() { + // for range time.NewTicker(pingPeriod).C { + // if g.closed { + // return + // } + // g.Ping(results) + // } + // }() + // } return g, nil } diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index e843de802..7ba67bad0 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -28,6 +28,7 @@ func TestSpdyGun(t *testing.T) { pingPeriod: time.Second * 5, } promise := utils.Promise(func() error { + defer gun.Close() defer close(result) return gun.Shoot(ctx, &ammo.Http{ Host: "example.org", @@ -58,7 +59,6 @@ func TestSpdyGun(t *testing.T) { } // TODO: test scenaries with errors - // TODO: test ping logic select { case err := <-promise: @@ -69,6 +69,49 @@ func TestSpdyGun(t *testing.T) { } +func TestSpdyConnectPing(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + result := make(chan *aggregate.Sample) + + gun := &SpdyGun{ + target: "localhost:3000", + pingPeriod: time.Second * 5, + } + promise := utils.Promise(func() error { + defer gun.Close() + defer close(result) + if err := gun.Connect(result); err != nil { + return err + } + gun.Ping(result) + return nil + }) + + results := aggregate.Drain(ctx, result) + require.Len(t, results, 2) + { + // first result is connect + + assert.Equal(t, "CONNECT", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) + } + { + // second result is PING + + assert.Equal(t, "PING", results[1].Tag) + assert.Equal(t, 200, results[1].ProtoCode) + } + select { + case err := <-promise: + require.NoError(t, err) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + +} + func runSpdyTestServer() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") From b474ee91964383a0205e53c261f952be959a20be Mon Sep 17 00:00:00 2001 From: Alexey Lavrenuke Date: Fri, 15 Jan 2016 18:00:14 +0300 Subject: [PATCH 11/11] bind results channel method in Gun interface, more tests --- config/config_test.go | 3 ++- engine/user.go | 4 ++-- gun/gun.go | 3 ++- gun/http/http.go | 20 +++++++++------- gun/http/http_test.go | 14 ++++++----- gun/log.go | 28 +++++++--------------- gun/spdy/spdy.go | 56 ++++++++++++++++++++++++------------------- gun/spdy/spdy_test.go | 39 ++++++++++++++++++++++++------ 8 files changed, 99 insertions(+), 68 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 5743271ff..94545ca64 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -33,7 +33,8 @@ func TestGlobalConfig(t *testing.T) { gc := &Gun{ GunType: "spdy", Parameters: map[string]interface{}{ - "Target": "localhost:3000", + "Target": "localhost:3000", + "PingPeriod": "5", }, } globalConfig := &Global{ diff --git a/engine/user.go b/engine/user.go index 83e467a85..8736659b3 100644 --- a/engine/user.go +++ b/engine/user.go @@ -29,7 +29,7 @@ func (u *User) Run(ctx context.Context) error { }() control := u.Limiter.Control() source := u.Ammunition.Source() - sink := u.Results.Sink() + u.Gun.BindResultsTo(u.Results.Sink()) loop: for { select { @@ -40,7 +40,7 @@ loop: } _, more = <-control if more { - u.Gun.Shoot(ctx, ammo, sink) + u.Gun.Shoot(ctx, ammo) u.Ammunition.Release(ammo) } else { log.Println("Limiter ended.") diff --git a/gun/gun.go b/gun/gun.go index d66ea56b2..85f048b09 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -7,5 +7,6 @@ import ( ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- *aggregate.Sample) error + Shoot(context.Context, ammo.Ammo) error + BindResultsTo(chan<- *aggregate.Sample) } diff --git a/gun/http/http.go b/gun/http/http.go index 438a0c79d..8b3b42143 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -28,23 +28,27 @@ const ( ) type HttpGun struct { - target string - ssl bool - client *http.Client + target string + ssl bool + client *http.Client + results chan<- *aggregate.Sample +} + +func (hg *HttpGun) BindResultsTo(results chan<- *aggregate.Sample) { + hg.results = results } // Shoot to target, this method is not thread safe -func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- *aggregate.Sample) error { +func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo) error { if hg.client == nil { - hg.Connect(results) + hg.Connect() } start := time.Now() ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) - results <- ss + hg.results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) @@ -99,7 +103,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- *aggregate.Sample) { +func (hg *HttpGun) Connect() { hg.Close() config := tls.Config{ InsecureSkipVerify: true, diff --git a/gun/http/http_test.go b/gun/http/http_test.go index 5a498118e..8ebb5ceac 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -33,8 +33,9 @@ func TestHttpGunWithSsl(t *testing.T) { defer ts.Close() gun := &HttpGun{ - target: ts.Listener.Addr().String(), - ssl: true, + target: ts.Listener.Addr().String(), + ssl: true, + results: result, } promise := utils.Promise(func() error { defer close(result) @@ -48,7 +49,7 @@ func TestHttpGunWithSsl(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) @@ -92,8 +93,9 @@ func TestHttpGunWithHttp(t *testing.T) { defer ts.Close() gun := &HttpGun{ - target: ts.Listener.Addr().String(), - ssl: false, + target: ts.Listener.Addr().String(), + ssl: false, + results: result, } promise := utils.Promise(func() error { defer close(result) @@ -107,7 +109,7 @@ func TestHttpGunWithHttp(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) diff --git a/gun/log.go b/gun/log.go index c6bb44ec7..b05444366 100644 --- a/gun/log.go +++ b/gun/log.go @@ -1,9 +1,8 @@ package gun import ( - "fmt" "log" - "strconv" + "time" "golang.org/x/net/context" @@ -12,27 +11,18 @@ import ( "github.com/yandex/pandora/config" ) -type LogGun struct{} - -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { - log.Println("Log message: ", a.(*ammo.Log).Message) - results <- &aggregate.Sample{} - return nil +type LogGun struct { + results chan<- *aggregate.Sample } -type DummySample struct { - value int +func (l *LogGun) BindResultsTo(results chan<- *aggregate.Sample) { + l.results = results } -func (ds *DummySample) String() string { - return fmt.Sprintf("My value is %d", ds.value) -} - -func (ds *DummySample) AppendToPhout(dst []byte) []byte { - dst = append(dst, "My value is "...) - dst = strconv.AppendInt(dst, int64(ds.value), 10) - dst = append(dst, '\n') - return dst +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo) error { + log.Println("Log message: ", a.(*ammo.Log).Message) + l.results <- aggregate.AcquireSample(float64(time.Now().UnixNano())/1e9, "REQUEST") + return nil } func NewLogGunFromConfig(c *config.Gun) (g Gun, err error) { diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index fb1607542..d78cb31c1 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -20,14 +20,19 @@ import ( ) type SpdyGun struct { - pingPeriod time.Duration - target string - client *spdy.Client + target string + client *spdy.Client + results chan<- *aggregate.Sample + closed bool } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { +func (sg *SpdyGun) BindResultsTo(results chan<- *aggregate.Sample) { + sg.results = results +} + +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo) error { if sg.client == nil { - if err := sg.Connect(results); err != nil { + if err := sg.Connect(); err != nil { return err } } @@ -36,7 +41,7 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggre ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) - results <- ss + sg.results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) @@ -79,18 +84,19 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggre } func (sg *SpdyGun) Close() { + sg.closed = true if sg.client != nil { sg.client.Close() } } -func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error { +func (sg *SpdyGun) Connect() error { // FIXME: rewrite connection logic, it isn't thread safe right now. start := time.Now() ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) - results <- ss + sg.results <- ss }() config := tls.Config{ InsecureSkipVerify: true, @@ -118,7 +124,7 @@ func (sg *SpdyGun) Connect(results chan<- *aggregate.Sample) error { return nil } -func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { +func (sg *SpdyGun) Ping() { if sg.client == nil { return } @@ -140,9 +146,22 @@ func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { ss.Err = err ss.ProtoCode = 500 } - results <- ss + sg.results <- ss if err != nil { - sg.Connect(results) + sg.Connect() + } +} + +func (sg *SpdyGun) startAutoPing(pingPeriod time.Duration) { + if pingPeriod > 0 { + go func() { + for range time.NewTicker(pingPeriod).C { + if sg.closed { + return + } + sg.Ping() + } + }() } } @@ -171,23 +190,12 @@ func New(c *config.Gun) (gun.Gun, error) { switch t := target.(type) { case string: g = &SpdyGun{ - pingPeriod: pingPeriod, - target: target.(string), + target: target.(string), } default: return nil, fmt.Errorf("Target is of the wrong type."+ " Expected 'string' got '%T'", t) } - // TODO: implement this logic somewhere - // if pingPeriod > 0 { - // go func() { - // for range time.NewTicker(pingPeriod).C { - // if g.closed { - // return - // } - // g.Ping(results) - // } - // }() - // } + g.(*SpdyGun).startAutoPing(pingPeriod) return g, nil } diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 7ba67bad0..c33482aa8 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" + "github.com/yandex/pandora/config" "github.com/yandex/pandora/utils" "golang.org/x/net/context" ) @@ -24,8 +25,8 @@ func TestSpdyGun(t *testing.T) { result := make(chan *aggregate.Sample) gun := &SpdyGun{ - target: "localhost:3000", - pingPeriod: time.Second * 5, + target: "localhost:3000", + results: result, } promise := utils.Promise(func() error { defer gun.Close() @@ -40,7 +41,7 @@ func TestSpdyGun(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) @@ -76,16 +77,16 @@ func TestSpdyConnectPing(t *testing.T) { result := make(chan *aggregate.Sample) gun := &SpdyGun{ - target: "localhost:3000", - pingPeriod: time.Second * 5, + target: "localhost:3000", + results: result, } promise := utils.Promise(func() error { defer gun.Close() defer close(result) - if err := gun.Connect(result); err != nil { + if err := gun.Connect(); err != nil { return err } - gun.Ping(result) + gun.Ping() return nil }) @@ -112,6 +113,30 @@ func TestSpdyConnectPing(t *testing.T) { } +func TestNewSpdyGun(t *testing.T) { + spdyConfig := &config.Gun{ + GunType: "spdy", + Parameters: map[string]interface{}{ + "Target": "localhost:3000", + "PingPeriod": 5.0, + }, + } + g, err := New(spdyConfig) + assert.NoError(t, err) + _, ok := g.(*SpdyGun) + assert.Equal(t, true, ok) + + failSpdyConfig := &config.Gun{ + GunType: "spdy", + Parameters: map[string]interface{}{ + "Target": "localhost:3000", + "PingPeriod": "not-a-number", + }, + } + _, err = New(failSpdyConfig) + assert.Error(t, err) +} + func runSpdyTestServer() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain")