diff --git a/aggregate/aggregate.go b/aggregate/aggregate.go index a97b26dd0..28dc6e3d4 100644 --- a/aggregate/aggregate.go +++ b/aggregate/aggregate.go @@ -2,17 +2,21 @@ package aggregate import "golang.org/x/net/context" -type Sample interface { - String() string -} - type ResultListener interface { Start(context.Context) error - Sink() chan<- Sample + Sink() chan<- *Sample +} + +type resultListener struct { + sink chan<- *Sample +} + +func (rl *resultListener) Sink() chan<- *Sample { + return rl.sink } -func Drain(ctx context.Context, results <-chan Sample) []Sample { - samples := []Sample{} +func Drain(ctx context.Context, results <-chan *Sample) []*Sample { + samples := []*Sample{} loop: for { select { diff --git a/aggregate/logger.go b/aggregate/logger.go index 020383ae5..d9b60cc38 100644 --- a/aggregate/logger.go +++ b/aggregate/logger.go @@ -7,23 +7,16 @@ import ( "golang.org/x/net/context" ) -type resultListener struct { - sink chan<- Sample -} - -func (rl *resultListener) Sink() chan<- Sample { - return rl.sink -} - // Implements ResultListener interface type LoggingResultListener struct { resultListener - source <-chan Sample + source <-chan *Sample } -func (rl *LoggingResultListener) handle(r Sample) { - log.Println(r) +func (rl *LoggingResultListener) handle(s *Sample) { + log.Println(s) + ReleaseSample(s) } func (rl *LoggingResultListener) Start(ctx context.Context) error { @@ -48,7 +41,7 @@ loop: } func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) { - ch := make(chan Sample, 32) + ch := make(chan *Sample, 32) return &LoggingResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/phout.go b/aggregate/phout.go index 12d67e900..d1c5aaaae 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -2,7 +2,6 @@ package aggregate import ( "bufio" - "fmt" "os" "time" @@ -10,61 +9,21 @@ import ( "golang.org/x/net/context" ) -type PhoutSample struct { - TS float64 - Tag string - RT int - Connect int - Send int - Latency int - Receive int - IntervalEvent int - Egress int - Igress int - NetCode int - ProtoCode int -} - -func (ps *PhoutSample) String() string { - return fmt.Sprintf( - "%.3f\t%s\t%d\t"+ - "%d\t%d\t"+ - "%d\t%d\t"+ - "%d\t"+ - "%d\t%d\t"+ - "%d\t%d", - ps.TS, ps.Tag, ps.RT, - ps.Connect, ps.Send, - ps.Latency, ps.Receive, - ps.IntervalEvent, - ps.Egress, ps.Igress, - ps.NetCode, ps.ProtoCode, - ) -} - -type PhantomCompatible interface { - Sample - PhoutSample() *PhoutSample -} - type PhoutResultListener struct { resultListener - source <-chan Sample + source <-chan *Sample phout *bufio.Writer + buffer []byte } -func (rl *PhoutResultListener) handle(r Sample) error { - pc, ok := r.(PhantomCompatible) - if ok { - _, err := rl.phout.WriteString(fmt.Sprintf("%s\n", pc.PhoutSample())) - if err != nil { - return err - } - } else { - return fmt.Errorf("Not phantom compatible sample") - } - return nil +func (rl *PhoutResultListener) handle(s *Sample) error { + + rl.buffer = s.AppendToPhout(rl.buffer) + _, err := rl.phout.Write(rl.buffer) + rl.buffer = rl.buffer[:0] + ReleaseSample(s) + return err } func (rl *PhoutResultListener) Start(ctx context.Context) error { @@ -109,13 +68,14 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) { phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666) } writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB - ch := make(chan Sample, 65536) + ch := make(chan *Sample, 65536) return &PhoutResultListener{ source: ch, resultListener: resultListener{ sink: ch, }, - phout: writer, + phout: writer, + buffer: make([]byte, 0, 1024), }, nil } diff --git a/aggregate/sample.go b/aggregate/sample.go new file mode 100644 index 000000000..ccca741b9 --- /dev/null +++ b/aggregate/sample.go @@ -0,0 +1,97 @@ +package aggregate + +import ( + "fmt" + "strconv" + "sync" +) + +const ( + phoutDelimiter = '\t' + phoutNewLine = '\n' +) + +var samplePool = &sync.Pool{New: func() interface{} { return &Sample{} }} + +type Sample struct { + TS float64 + Tag string + RT int + Connect int + Send int + Latency int + Receive int + IntervalEvent int + Egress int + Igress int + NetCode int + ProtoCode int + Err error +} + +func AcquireSample(ts float64, tag string) *Sample { + s := samplePool.Get().(*Sample) + s.TS = ts + s.Tag = tag + s.RT = 0 + s.Connect = 0 + s.Send = 0 + s.Latency = 0 + s.Receive = 0 + s.IntervalEvent = 0 + s.Egress = 0 + s.Igress = 0 + s.NetCode = 0 + s.ProtoCode = 0 + s.Err = nil + return s +} + +func ReleaseSample(s *Sample) { + samplePool.Put(s) +} + +func (ps *Sample) String() string { + return fmt.Sprintf( + "%.3f\t%s\t%d\t"+ + "%d\t%d\t"+ + "%d\t%d\t"+ + "%d\t"+ + "%d\t%d\t"+ + "%d\t%d", + ps.TS, ps.Tag, ps.RT, + ps.Connect, ps.Send, + ps.Latency, ps.Receive, + ps.IntervalEvent, + ps.Egress, ps.Igress, + ps.NetCode, ps.ProtoCode, + ) +} + +func (ps *Sample) AppendToPhout(dst []byte) []byte { + dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64) + dst = append(dst, phoutDelimiter) + dst = append(dst, ps.Tag...) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.RT), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Connect), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Send), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Latency), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Receive), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Egress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Igress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.NetCode), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10) + dst = append(dst, phoutNewLine) + return dst +} diff --git a/ammo/ammo.go b/ammo/ammo.go index 152c32c12..f8905ab2d 100644 --- a/ammo/ammo.go +++ b/ammo/ammo.go @@ -1,25 +1,34 @@ package ammo -import "golang.org/x/net/context" +import ( + "sync" + + "golang.org/x/net/context" +) type Provider interface { Start(context.Context) error Source() <-chan Ammo + Release(Ammo) // return unused Ammo object to memory pool } type BaseProvider struct { decoder Decoder source <-chan Ammo + pool sync.Pool } type Ammo interface{} -type Decoder func([]byte) (Ammo, error) +type Decoder interface { + Decode([]byte, Ammo) (Ammo, error) +} -func NewBaseProvider(source <-chan Ammo, decoder Decoder) *BaseProvider { +func NewBaseProvider(source <-chan Ammo, decoder Decoder, New func() interface{}) *BaseProvider { return &BaseProvider{ source: source, decoder: decoder, + pool: sync.Pool{New: New}, } } @@ -27,8 +36,13 @@ func (ap *BaseProvider) Source() <-chan Ammo { return ap.source } -func (ap *BaseProvider) Decode(src []byte) (Ammo, error) { - return ap.decoder(src) +func (ap *BaseProvider) Release(a Ammo) { + ap.pool.Put(a) +} + +func (ap *BaseProvider) decode(src []byte) (Ammo, error) { + a := ap.pool.Get() + return ap.decoder.Decode(src, a) } // Drain reads all ammos from ammo.Provider. Useful for tests. diff --git a/ammo/dummy.go b/ammo/dummy.go index e39df0b53..e3ba236cd 100644 --- a/ammo/dummy.go +++ b/ammo/dummy.go @@ -13,13 +13,18 @@ type Log struct { Message string } -// LogJSONDecode implements ammo.Decoder interface -func LogJSONDecode(jsonDoc []byte) (Ammo, error) { - a := &Log{} +// LogJSONDecoder implements ammo.Decoder interface +type LogJSONDecoder struct{} + +func (*LogJSONDecoder) Decode(jsonDoc []byte, a Ammo) (Ammo, error) { err := json.Unmarshal(jsonDoc, a) return a, err } +func NewLogJSONDecoder() Decoder { + return &LogJSONDecoder{} +} + type LogProvider struct { *BaseProvider @@ -31,7 +36,7 @@ func (ap *LogProvider) Start(ctx context.Context) error { defer close(ap.sink) loop: for i := 0; i < ap.size; i++ { - if a, err := ap.Decode([]byte(fmt.Sprintf(`{"message": "Job #%d"}`, i))); err == nil { + if a, err := ap.decode([]byte(fmt.Sprintf(`{"message": "Job #%d"}`, i))); err == nil { select { case ap.sink <- a: case <-ctx.Done(): @@ -52,7 +57,8 @@ func NewLogAmmoProvider(c *config.AmmoProvider) (Provider, error) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - LogJSONDecode, + NewLogJSONDecoder(), + func() interface{} { return &Log{} }, ), } return ap, nil diff --git a/ammo/http.go b/ammo/http.go index 23531b173..b93227e48 100644 --- a/ammo/http.go +++ b/ammo/http.go @@ -6,7 +6,6 @@ import ( "bufio" "fmt" "log" - "net/http" "os" "github.com/yandex/pandora/config" @@ -22,21 +21,11 @@ type Http struct { Tag string } -func (h *Http) Request() (*http.Request, error) { - // FIXME: something wrong here with https - req, err := http.NewRequest(h.Method, "http://"+h.Host+h.Uri, nil) - if err == nil { - for k, v := range h.Headers { - req.Header.Set(k, v) - } - } - return req, err -} +// HttpJSONDecoder implements ammo.Decoder interface +type HttpJSONDecoder struct{} -// HttpJSONDecode implements ammo.Decoder interface -func HttpJSONDecode(jsonDoc []byte) (Ammo, error) { - a := &Http{} - err := a.UnmarshalJSON(jsonDoc) +func (d *HttpJSONDecoder) Decode(jsonDoc []byte, a Ammo) (Ammo, error) { + err := a.(*Http).UnmarshalJSON(jsonDoc) return a, err } @@ -66,7 +55,7 @@ loop: scanner.Split(bufio.ScanLines) for scanner.Scan() && (ap.ammoLimit == 0 || ammoNumber < ap.ammoLimit) { data := scanner.Bytes() - if a, err := ap.Decode(data); err != nil { + if a, err := ap.decode(data); err != nil { return fmt.Errorf("failed to decode ammo: %v", err) } else { ammoNumber++ @@ -100,7 +89,8 @@ func NewHttpProvider(c *config.AmmoProvider) (Provider, error) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - HttpJSONDecode, + &HttpJSONDecoder{}, + func() interface{} { return &Http{} }, ), } return ap, nil diff --git a/ammo/http_test.go b/ammo/http_test.go index 14e106ed8..5c2aa6882 100644 --- a/ammo/http_test.go +++ b/ammo/http_test.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "os" + "sync" "testing" "time" @@ -51,7 +52,8 @@ func TestHttpProvider(t *testing.T) { sink: ammoCh, BaseProvider: NewBaseProvider( ammoCh, - HttpJSONDecode, + &HttpJSONDecoder{}, + func() interface{} { return &Http{} }, ), } promise := utils.PromiseCtx(providerCtx, provider.Start) @@ -82,6 +84,7 @@ var result Ammo func BenchmarkJsonDecoder(b *testing.B) { f, err := os.Open(httpTestFilename) + decoder := &HttpJSONDecoder{} if err != nil { b.Fatal(err) } @@ -93,7 +96,31 @@ func BenchmarkJsonDecoder(b *testing.B) { } var a Ammo for n := 0; n < b.N; n++ { - a, _ = HttpJSONDecode(jsonDoc) + a, _ = decoder.Decode(jsonDoc, &Http{}) } - result = a + _ = a +} + +func BenchmarkJsonDecoderWithPool(b *testing.B) { + f, err := os.Open(httpTestFilename) + decoder := &HttpJSONDecoder{} + if err != nil { + b.Fatal(err) + } + defer f.Close() + r := bufio.NewReader(f) + jsonDoc, isPrefix, err := r.ReadLine() + if err != nil || isPrefix { + b.Fatal(errors.New("Couldn't properly read ammo sample from data file")) + } + var a Ammo + pool := sync.Pool{ + New: func() interface{} { return &Http{} }, + } + for n := 0; n < b.N; n++ { + h := pool.Get().(*Http) + a, _ = decoder.Decode(jsonDoc, h) + pool.Put(h) + } + _ = a } diff --git a/config/config_test.go b/config/config_test.go index 5743271ff..94545ca64 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -33,7 +33,8 @@ func TestGlobalConfig(t *testing.T) { gc := &Gun{ GunType: "spdy", Parameters: map[string]interface{}{ - "Target": "localhost:3000", + "Target": "localhost:3000", + "PingPeriod": "5", }, } globalConfig := &Global{ diff --git a/engine/user.go b/engine/user.go index b39e1f41d..8736659b3 100644 --- a/engine/user.go +++ b/engine/user.go @@ -29,18 +29,19 @@ func (u *User) Run(ctx context.Context) error { }() control := u.Limiter.Control() source := u.Ammunition.Source() - sink := u.Results.Sink() + u.Gun.BindResultsTo(u.Results.Sink()) loop: for { select { - case j, more := <-source: + case ammo, more := <-source: if !more { log.Println("Ammo ended") break loop } _, more = <-control if more { - u.Gun.Shoot(ctx, j, sink) + u.Gun.Shoot(ctx, ammo) + u.Ammunition.Release(ammo) } else { log.Println("Limiter ended.") break loop diff --git a/gun/gun.go b/gun/gun.go index 95a898549..85f048b09 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -7,5 +7,6 @@ import ( ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- aggregate.Sample) error + Shoot(context.Context, ammo.Ammo) error + BindResultsTo(chan<- *aggregate.Sample) } diff --git a/gun/http/http.go b/gun/http/http.go index 4bc8e1bb6..8b3b42143 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -28,58 +28,66 @@ const ( ) type HttpGun struct { - target string - ssl bool - client *http.Client + target string + ssl bool + client *http.Client + results chan<- *aggregate.Sample +} + +func (hg *HttpGun) BindResultsTo(results chan<- *aggregate.Sample) { + hg.results = results } // Shoot to target, this method is not thread safe -func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- aggregate.Sample) error { +func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo) error { if hg.client == nil { - hg.Connect(results) + hg.Connect() } start := time.Now() - ss := &HttpSample{ts: float64(start.UnixNano()) / 1e9, tag: "REQUEST"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { - ss.rt = int(time.Since(start).Seconds() * 1e6) - results <- ss + ss.RT = int(time.Since(start).Seconds() * 1e6) + hg.results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) if !ok { - errStr := fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a) - log.Println(errStr) - ss.err = errors.New(errStr) - return ss.err + panic(fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a)) } if ha.Tag != "" { - ss.tag += "|" + ha.Tag + ss.Tag += "|" + ha.Tag + } + var uri string + if hg.ssl { + uri = "https://" + ha.Host + ha.Uri + } else { + uri = "http://" + ha.Host + ha.Uri } - req, err := ha.Request() + req, err := http.NewRequest(ha.Method, uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } - req.URL.Host = hg.target - if hg.ssl { - req.URL.Scheme = "https" - } else { - req.URL.Scheme = "http" + for k, v := range ha.Headers { + req.Header.Set(k, v) } + req.URL.Host = hg.target res, err := hg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } defer res.Body.Close() _, err = io.Copy(ioutil.Discard, res.Body) if err != nil { log.Printf("Error reading response body: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } @@ -87,7 +95,7 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, //data := make([]byte, int(res.ContentLength)) // _, err = res.Body.(io.Reader).Read(data) // fmt.Println(string(data)) - ss.StatusCode = res.StatusCode + ss.ProtoCode = res.StatusCode return nil } @@ -95,7 +103,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { +func (hg *HttpGun) Connect() { hg.Close() config := tls.Config{ InsecureSkipVerify: true, @@ -127,7 +135,7 @@ func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { // log.Printf("client: connect: %s\n", err) // return // } - // ss := &HttpSample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"} + // ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") // ss.rt = int(time.Since(connectStart).Seconds() * 1e6) // ss.err = err // if ss.err == nil { @@ -136,44 +144,6 @@ func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { // results <- ss } -type HttpSample struct { - ts float64 // Unix Timestamp in seconds - rt int // response time in milliseconds - StatusCode int // protocol status code - tag string - err error -} - -func (ds *HttpSample) PhoutSample() *aggregate.PhoutSample { - var protoCode, netCode int - if ds.err != nil { - protoCode = 500 - netCode = 999 - log.Printf("Error code. %v\n", ds.err) - } else { - netCode = 0 - protoCode = ds.StatusCode - } - return &aggregate.PhoutSample{ - TS: ds.ts, - Tag: ds.tag, - RT: ds.rt, - Connect: 0, - Send: 0, - Latency: 0, - Receive: 0, - IntervalEvent: 0, - Egress: 0, - Igress: 0, - NetCode: netCode, - ProtoCode: protoCode, - } -} - -func (ds *HttpSample) String() string { - return fmt.Sprintf("rt: %d [%d] %s", ds.rt, ds.StatusCode, ds.tag) -} - func New(c *config.Gun) (gun.Gun, error) { params := c.Parameters if params == nil { diff --git a/gun/http/http_test.go b/gun/http/http_test.go index bf315b598..8ebb5ceac 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -20,7 +20,7 @@ func TestHttpGunWithSsl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan *aggregate.Sample) requests := make(chan *http.Request) ts := httptest.NewTLSServer( @@ -33,8 +33,9 @@ func TestHttpGunWithSsl(t *testing.T) { defer ts.Close() gun := &HttpGun{ - target: ts.Listener.Addr().String(), - ssl: true, + target: ts.Listener.Addr().String(), + ssl: true, + results: result, } promise := utils.Promise(func() error { defer close(result) @@ -48,15 +49,12 @@ func TestHttpGunWithSsl(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + assert.Equal(t, "REQUEST", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) select { case r := <-requests: @@ -82,7 +80,7 @@ func TestHttpGunWithHttp(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan *aggregate.Sample) requests := make(chan *http.Request) ts := httptest.NewServer( @@ -95,8 +93,9 @@ func TestHttpGunWithHttp(t *testing.T) { defer ts.Close() gun := &HttpGun{ - target: ts.Listener.Addr().String(), - ssl: false, + target: ts.Listener.Addr().String(), + ssl: false, + results: result, } promise := utils.Promise(func() error { defer close(result) @@ -110,15 +109,12 @@ func TestHttpGunWithHttp(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + assert.Equal(t, "REQUEST", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) select { case r := <-requests: diff --git a/gun/log.go b/gun/log.go index 174292f1c..b05444366 100644 --- a/gun/log.go +++ b/gun/log.go @@ -1,8 +1,8 @@ package gun import ( - "fmt" "log" + "time" "golang.org/x/net/context" @@ -11,24 +11,18 @@ import ( "github.com/yandex/pandora/config" ) -type LogGun struct{} - -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggregate.Sample) error { - log.Println("Log message: ", a.(*ammo.Log).Message) - results <- &DummySample{0} - return nil +type LogGun struct { + results chan<- *aggregate.Sample } -type DummySample struct { - value int +func (l *LogGun) BindResultsTo(results chan<- *aggregate.Sample) { + l.results = results } -func (ds *DummySample) PhoutSample() *aggregate.PhoutSample { - return &aggregate.PhoutSample{} -} - -func (ds *DummySample) String() string { - return fmt.Sprintf("My value is %d", ds.value) +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo) error { + log.Println("Log message: ", a.(*ammo.Log).Message) + l.results <- aggregate.AcquireSample(float64(time.Now().UnixNano())/1e9, "REQUEST") + return nil } func NewLogGunFromConfig(c *config.Gun) (g Gun, err error) { diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index e1614b668..d78cb31c1 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "log" + "net/http" "time" "github.com/amahi/spdy" @@ -19,52 +20,50 @@ import ( ) type SpdyGun struct { - pingPeriod time.Duration - target string - client *spdy.Client + target string + client *spdy.Client + results chan<- *aggregate.Sample + closed bool } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggregate.Sample) error { +func (sg *SpdyGun) BindResultsTo(results chan<- *aggregate.Sample) { + sg.results = results +} + +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo) error { if sg.client == nil { - if err := sg.connect(results); err != nil { + if err := sg.Connect(); err != nil { return err } } - if sg.pingPeriod > 0 { - pingTimer := time.NewTicker(sg.pingPeriod) - go func() { - for range pingTimer.C { - sg.Ping(results) - } - }() - } + start := time.Now() - ss := &SpdySample{ts: float64(start.UnixNano()) / 1e9, tag: "REQUEST"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { - ss.rt = int(time.Since(start).Seconds() * 1e6) - results <- ss + ss.RT = int(time.Since(start).Seconds() * 1e6) + sg.results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) if !ok { - errStr := fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a) - log.Println(errStr) - ss.err = errors.New(errStr) - return ss.err + panic(fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a)) } if ha.Tag != "" { - ss.tag += "|" + ha.Tag + ss.Tag += "|" + ha.Tag } - req, err := ha.Request() + + req, err := http.NewRequest(ha.Method, "https://"+ha.Host+ha.Uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } res, err := sg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } @@ -72,53 +71,60 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggreg _, err = io.Copy(ioutil.Discard, res.Body) if err != nil { log.Printf("Error reading response body: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } // TODO: make this an optional verbose answ_log output //data := make([]byte, int(res.ContentLength)) // _, err = res.Body.(io.Reader).Read(data) // fmt.Println(string(data)) - ss.StatusCode = res.StatusCode + ss.ProtoCode = res.StatusCode return err } func (sg *SpdyGun) Close() { + sg.closed = true if sg.client != nil { sg.client.Close() } } -func (sg *SpdyGun) connect(results chan<- aggregate.Sample) error { +func (sg *SpdyGun) Connect() error { // FIXME: rewrite connection logic, it isn't thread safe right now. - connectStart := time.Now() + start := time.Now() + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") + defer func() { + ss.RT = int(time.Since(start).Seconds() * 1e6) + sg.results <- ss + }() config := tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"spdy/3.1"}, } conn, err := tls.Dial("tcp", sg.target, &config) if err != nil { - return fmt.Errorf("client: dial: %s\n", err) + ss.Err = err + ss.NetCode = 999 + return err } client, err := spdy.NewClientConn(conn) if err != nil { - return fmt.Errorf("client: connect: %v\n", err) + ss.Err = err + ss.NetCode = 999 + return err + } else { + ss.ProtoCode = 200 } if sg.client != nil { sg.Close() } sg.client = client - ss := &SpdySample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"} - ss.rt = int(time.Since(connectStart).Seconds() * 1e6) - ss.err = err - if ss.err == nil { - ss.StatusCode = 200 - } - results <- ss + return nil } -func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { +func (sg *SpdyGun) Ping() { if sg.client == nil { return } @@ -131,57 +137,34 @@ func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { if !pinged { log.Printf("client: ping: timed out\n") } - ss := &SpdySample{ts: float64(pingStart.UnixNano()) / 1e9, tag: "PING"} - ss.rt = int(time.Since(pingStart).Seconds() * 1e6) - ss.err = err - if ss.err == nil && pinged { - ss.StatusCode = 200 + ss := aggregate.AcquireSample(float64(pingStart.UnixNano())/1e9, "PING") + ss.RT = int(time.Since(pingStart).Seconds() * 1e6) + + if err == nil && pinged { + ss.ProtoCode = 200 } else { - ss.StatusCode = 500 + ss.Err = err + ss.ProtoCode = 500 } - results <- ss + sg.results <- ss if err != nil { - sg.connect(results) + sg.Connect() } } -type SpdySample struct { - ts float64 // Unix Timestamp in seconds - rt int // response time in milliseconds - StatusCode int // protocol status code - tag string - err error -} - -func (ds *SpdySample) PhoutSample() *aggregate.PhoutSample { - var protoCode, netCode int - if ds.err != nil { - protoCode = 500 - netCode = 999 - } else { - netCode = 0 - protoCode = ds.StatusCode - } - return &aggregate.PhoutSample{ - TS: ds.ts, - Tag: ds.tag, - RT: ds.rt, - Connect: 0, - Send: 0, - Latency: 0, - Receive: 0, - IntervalEvent: 0, - Egress: 0, - Igress: 0, - NetCode: netCode, - ProtoCode: protoCode, +func (sg *SpdyGun) startAutoPing(pingPeriod time.Duration) { + if pingPeriod > 0 { + go func() { + for range time.NewTicker(pingPeriod).C { + if sg.closed { + return + } + sg.Ping() + } + }() } } -func (ds *SpdySample) String() string { - return fmt.Sprintf("rt: %d [%d] %s", ds.rt, ds.StatusCode, ds.tag) -} - func New(c *config.Gun) (gun.Gun, error) { params := c.Parameters if params == nil { @@ -207,12 +190,12 @@ func New(c *config.Gun) (gun.Gun, error) { switch t := target.(type) { case string: g = &SpdyGun{ - pingPeriod: pingPeriod, - target: target.(string), + target: target.(string), } default: return nil, fmt.Errorf("Target is of the wrong type."+ " Expected 'string' got '%T'", t) } + g.(*SpdyGun).startAutoPing(pingPeriod) return g, nil } diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 4d93e95e2..c33482aa8 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -9,11 +9,11 @@ import ( "time" "github.com/SlyMarbo/spdy" // we specially use spdy server from another library - "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" + "github.com/yandex/pandora/config" "github.com/yandex/pandora/utils" "golang.org/x/net/context" ) @@ -22,13 +22,14 @@ func TestSpdyGun(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan *aggregate.Sample) gun := &SpdyGun{ - target: "localhost:3000", - pingPeriod: time.Second * 5, + target: "localhost:3000", + results: result, } promise := utils.Promise(func() error { + defer gun.Close() defer close(result) return gun.Shoot(ctx, &ammo.Http{ Host: "example.org", @@ -40,31 +41,25 @@ func TestSpdyGun(t *testing.T) { "Host": "example.org", "User-Agent": "Pandora/0.0.1", }, - }, result) + }) }) results := aggregate.Drain(ctx, result) require.Len(t, results, 2) { // first result is connect - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "CONNECT", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + + assert.Equal(t, "CONNECT", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) } { // second result is request - rPhout, casted := (results[1]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - spew.Dump(phoutSample) - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + + assert.Equal(t, "REQUEST", results[1].Tag) + assert.Equal(t, 200, results[1].ProtoCode) } // TODO: test scenaries with errors - // TODO: test ping logic select { case err := <-promise: @@ -75,6 +70,73 @@ func TestSpdyGun(t *testing.T) { } +func TestSpdyConnectPing(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + result := make(chan *aggregate.Sample) + + gun := &SpdyGun{ + target: "localhost:3000", + results: result, + } + promise := utils.Promise(func() error { + defer gun.Close() + defer close(result) + if err := gun.Connect(); err != nil { + return err + } + gun.Ping() + return nil + }) + + results := aggregate.Drain(ctx, result) + require.Len(t, results, 2) + { + // first result is connect + + assert.Equal(t, "CONNECT", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) + } + { + // second result is PING + + assert.Equal(t, "PING", results[1].Tag) + assert.Equal(t, 200, results[1].ProtoCode) + } + select { + case err := <-promise: + require.NoError(t, err) + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + +} + +func TestNewSpdyGun(t *testing.T) { + spdyConfig := &config.Gun{ + GunType: "spdy", + Parameters: map[string]interface{}{ + "Target": "localhost:3000", + "PingPeriod": 5.0, + }, + } + g, err := New(spdyConfig) + assert.NoError(t, err) + _, ok := g.(*SpdyGun) + assert.Equal(t, true, ok) + + failSpdyConfig := &config.Gun{ + GunType: "spdy", + Parameters: map[string]interface{}{ + "Target": "localhost:3000", + "PingPeriod": "not-a-number", + }, + } + _, err = New(failSpdyConfig) + assert.Error(t, err) +} + func runSpdyTestServer() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain")