diff --git a/aggregate/aggregate.go b/aggregate/aggregate.go index 330bd7222..28dc6e3d4 100644 --- a/aggregate/aggregate.go +++ b/aggregate/aggregate.go @@ -4,19 +4,19 @@ import "golang.org/x/net/context" type ResultListener interface { Start(context.Context) error - Sink() chan<- interface{} + Sink() chan<- *Sample } type resultListener struct { - sink chan<- interface{} + sink chan<- *Sample } -func (rl *resultListener) Sink() chan<- interface{} { +func (rl *resultListener) Sink() chan<- *Sample { return rl.sink } -func Drain(ctx context.Context, results <-chan interface{}) []interface{} { - samples := []interface{}{} +func Drain(ctx context.Context, results <-chan *Sample) []*Sample { + samples := []*Sample{} loop: for { select { diff --git a/aggregate/logger.go b/aggregate/logger.go index 221ab9dfc..d9b60cc38 100644 --- a/aggregate/logger.go +++ b/aggregate/logger.go @@ -1,7 +1,6 @@ package aggregate import ( - "fmt" "log" "github.com/yandex/pandora/config" @@ -12,16 +11,12 @@ import ( type LoggingResultListener struct { resultListener - source <-chan interface{} + source <-chan *Sample } -func (rl *LoggingResultListener) handle(r interface{}) { - r, ok := r.(fmt.Stringer) - if !ok { - log.Println("Can't convert result to String") - } else { - log.Println(r) - } +func (rl *LoggingResultListener) handle(s *Sample) { + log.Println(s) + ReleaseSample(s) } func (rl *LoggingResultListener) Start(ctx context.Context) error { @@ -46,7 +41,7 @@ loop: } func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) { - ch := make(chan interface{}, 32) + ch := make(chan *Sample, 32) return &LoggingResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/phout.go b/aggregate/phout.go index 2486549ce..d1c5aaaae 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -9,26 +9,20 @@ import ( "golang.org/x/net/context" ) -type PhoutSerializable interface { - AppendToPhout([]byte) []byte -} - type PhoutResultListener struct { resultListener - source <-chan interface{} + source <-chan *Sample phout *bufio.Writer buffer []byte } -func (rl *PhoutResultListener) handle(s interface{}) error { - ps, ok := s.(PhoutSerializable) - if !ok { - panic("Result sample is not PhoutSerializable") - } - rl.buffer = ps.AppendToPhout(rl.buffer) +func (rl *PhoutResultListener) handle(s *Sample) error { + + rl.buffer = s.AppendToPhout(rl.buffer) _, err := rl.phout.Write(rl.buffer) rl.buffer = rl.buffer[:0] + ReleaseSample(s) return err } @@ -74,7 +68,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) { phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666) } writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB - ch := make(chan interface{}, 65536) + ch := make(chan *Sample, 65536) return &PhoutResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/sample.go b/aggregate/sample.go index 71a07f3cb..ccca741b9 100644 --- a/aggregate/sample.go +++ b/aggregate/sample.go @@ -29,8 +29,22 @@ type Sample struct { Err error } -func AcquireSample() *Sample { - return samplePool.Get().(*Sample) +func AcquireSample(ts float64, tag string) *Sample { + s := samplePool.Get().(*Sample) + s.TS = ts + s.Tag = tag + s.RT = 0 + s.Connect = 0 + s.Send = 0 + s.Latency = 0 + s.Receive = 0 + s.IntervalEvent = 0 + s.Egress = 0 + s.Igress = 0 + s.NetCode = 0 + s.ProtoCode = 0 + s.Err = nil + return s } func ReleaseSample(s *Sample) { diff --git a/gun/gun.go b/gun/gun.go index 63e8b076e..d66ea56b2 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -1,10 +1,11 @@ package gun import ( + "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" "golang.org/x/net/context" ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- interface{}) error + Shoot(context.Context, ammo.Ammo, chan<- *aggregate.Sample) error } diff --git a/gun/http/http.go b/gun/http/http.go index 3b8236b77..438a0c79d 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -35,14 +35,13 @@ type HttpGun struct { // Shoot to target, this method is not thread safe func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- interface{}) error { + results chan<- *aggregate.Sample) error { if hg.client == nil { hg.Connect(results) } start := time.Now() - // TODO: acquire/release - ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss @@ -100,7 +99,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- interface{}) { +func (hg *HttpGun) Connect(results chan<- *aggregate.Sample) { hg.Close() config := tls.Config{ InsecureSkipVerify: true, @@ -132,7 +131,7 @@ func (hg *HttpGun) Connect(results chan<- interface{}) { // log.Printf("client: connect: %s\n", err) // return // } - // ss := &HttpSample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"} + // ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") // ss.rt = int(time.Since(connectStart).Seconds() * 1e6) // ss.err = err // if ss.err == nil { diff --git a/gun/http/http_test.go b/gun/http/http_test.go index 43e71c150..5a498118e 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -20,7 +20,7 @@ func TestHttpGunWithSsl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan interface{}) + result := make(chan *aggregate.Sample) requests := make(chan *http.Request) ts := httptest.NewTLSServer( @@ -52,10 +52,8 @@ func TestHttpGunWithSsl(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - sample, casted := (results[0]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "REQUEST", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "REQUEST", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) select { case r := <-requests: @@ -81,7 +79,7 @@ func TestHttpGunWithHttp(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan interface{}) + result := make(chan *aggregate.Sample) requests := make(chan *http.Request) ts := httptest.NewServer( @@ -113,10 +111,8 @@ func TestHttpGunWithHttp(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - sample, casted := (results[0]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "REQUEST", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "REQUEST", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) select { case r := <-requests: diff --git a/gun/log.go b/gun/log.go index a69813e98..c6bb44ec7 100644 --- a/gun/log.go +++ b/gun/log.go @@ -14,7 +14,7 @@ import ( type LogGun struct{} -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { log.Println("Log message: ", a.(*ammo.Log).Message) results <- &aggregate.Sample{} return nil diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index 3032cfe77..d99e4e972 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -25,7 +25,7 @@ type SpdyGun struct { client *spdy.Client } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error { if sg.client == nil { if err := sg.connect(results); err != nil { return err @@ -40,7 +40,7 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interf }() } start := time.Now() - ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss @@ -91,10 +91,10 @@ func (sg *SpdyGun) Close() { } } -func (sg *SpdyGun) connect(results chan<- interface{}) error { +func (sg *SpdyGun) connect(results chan<- *aggregate.Sample) error { // FIXME: rewrite connection logic, it isn't thread safe right now. start := time.Now() - ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "CONNECT"} + ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT") defer func() { ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss @@ -125,7 +125,7 @@ func (sg *SpdyGun) connect(results chan<- interface{}) error { return nil } -func (sg *SpdyGun) Ping(results chan<- interface{}) { +func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) { if sg.client == nil { return } @@ -138,7 +138,7 @@ func (sg *SpdyGun) Ping(results chan<- interface{}) { if !pinged { log.Printf("client: ping: timed out\n") } - ss := &aggregate.Sample{TS: float64(pingStart.UnixNano()) / 1e9, Tag: "PING"} + ss := aggregate.AcquireSample(float64(pingStart.UnixNano())/1e9, "PING") ss.RT = int(time.Since(pingStart).Seconds() * 1e6) if err == nil && pinged { diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 07bf631dc..e843de802 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -21,7 +21,7 @@ func TestSpdyGun(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan interface{}) + result := make(chan *aggregate.Sample) gun := &SpdyGun{ target: "localhost:3000", @@ -47,18 +47,14 @@ func TestSpdyGun(t *testing.T) { { // first result is connect - sample, casted := (results[0]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "CONNECT", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "CONNECT", results[0].Tag) + assert.Equal(t, 200, results[0].ProtoCode) } { // second result is request - sample, casted := (results[1]).(*aggregate.Sample) - require.True(t, casted, "Should be *aggregate.Sample") - assert.Equal(t, "REQUEST", sample.Tag) - assert.Equal(t, 200, sample.ProtoCode) + assert.Equal(t, "REQUEST", results[1].Tag) + assert.Equal(t, 200, results[1].ProtoCode) } // TODO: test scenaries with errors