Skip to content

Commit

Permalink
aggregator sample pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
direvius committed Jan 13, 2016
1 parent 0918263 commit 9afb17d
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 61 deletions.
10 changes: 5 additions & 5 deletions aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import "golang.org/x/net/context"

type ResultListener interface {
Start(context.Context) error
Sink() chan<- interface{}
Sink() chan<- *Sample
}

type resultListener struct {
sink chan<- interface{}
sink chan<- *Sample
}

func (rl *resultListener) Sink() chan<- interface{} {
func (rl *resultListener) Sink() chan<- *Sample {
return rl.sink
}

func Drain(ctx context.Context, results <-chan interface{}) []interface{} {
samples := []interface{}{}
func Drain(ctx context.Context, results <-chan *Sample) []*Sample {
samples := []*Sample{}
loop:
for {
select {
Expand Down
15 changes: 5 additions & 10 deletions aggregate/logger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package aggregate

import (
"fmt"
"log"

"github.com/yandex/pandora/config"
Expand All @@ -12,16 +11,12 @@ import (
type LoggingResultListener struct {
resultListener

source <-chan interface{}
source <-chan *Sample
}

func (rl *LoggingResultListener) handle(r interface{}) {
r, ok := r.(fmt.Stringer)
if !ok {
log.Println("Can't convert result to String")
} else {
log.Println(r)
}
func (rl *LoggingResultListener) handle(s *Sample) {
log.Println(s)
ReleaseSample(s)
}

func (rl *LoggingResultListener) Start(ctx context.Context) error {
Expand All @@ -46,7 +41,7 @@ loop:
}

func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) {
ch := make(chan interface{}, 32)
ch := make(chan *Sample, 32)
return &LoggingResultListener{
source: ch,
resultListener: resultListener{
Expand Down
18 changes: 6 additions & 12 deletions aggregate/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,20 @@ import (
"golang.org/x/net/context"
)

type PhoutSerializable interface {
AppendToPhout([]byte) []byte
}

type PhoutResultListener struct {
resultListener

source <-chan interface{}
source <-chan *Sample
phout *bufio.Writer
buffer []byte
}

func (rl *PhoutResultListener) handle(s interface{}) error {
ps, ok := s.(PhoutSerializable)
if !ok {
panic("Result sample is not PhoutSerializable")
}
rl.buffer = ps.AppendToPhout(rl.buffer)
func (rl *PhoutResultListener) handle(s *Sample) error {

rl.buffer = s.AppendToPhout(rl.buffer)
_, err := rl.phout.Write(rl.buffer)
rl.buffer = rl.buffer[:0]
ReleaseSample(s)
return err
}

Expand Down Expand Up @@ -74,7 +68,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) {
phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
}
writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB
ch := make(chan interface{}, 65536)
ch := make(chan *Sample, 65536)
return &PhoutResultListener{
source: ch,
resultListener: resultListener{
Expand Down
18 changes: 16 additions & 2 deletions aggregate/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,22 @@ type Sample struct {
Err error
}

func AcquireSample() *Sample {
return samplePool.Get().(*Sample)
func AcquireSample(ts float64, tag string) *Sample {
s := samplePool.Get().(*Sample)
s.TS = ts
s.Tag = tag
s.RT = 0
s.Connect = 0
s.Send = 0
s.Latency = 0
s.Receive = 0
s.IntervalEvent = 0
s.Egress = 0
s.Igress = 0
s.NetCode = 0
s.ProtoCode = 0
s.Err = nil
return s
}

func ReleaseSample(s *Sample) {
Expand Down
3 changes: 2 additions & 1 deletion gun/gun.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package gun

import (
"github.com/yandex/pandora/aggregate"
"github.com/yandex/pandora/ammo"
"golang.org/x/net/context"
)

type Gun interface {
Shoot(context.Context, ammo.Ammo, chan<- interface{}) error
Shoot(context.Context, ammo.Ammo, chan<- *aggregate.Sample) error
}
9 changes: 4 additions & 5 deletions gun/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ type HttpGun struct {

// Shoot to target, this method is not thread safe
func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo,
results chan<- interface{}) error {
results chan<- *aggregate.Sample) error {

if hg.client == nil {
hg.Connect(results)
}
start := time.Now()
// TODO: acquire/release
ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"}
ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST")
defer func() {
ss.RT = int(time.Since(start).Seconds() * 1e6)
results <- ss
Expand Down Expand Up @@ -100,7 +99,7 @@ func (hg *HttpGun) Close() {
hg.client = nil
}

func (hg *HttpGun) Connect(results chan<- interface{}) {
func (hg *HttpGun) Connect(results chan<- *aggregate.Sample) {
hg.Close()
config := tls.Config{
InsecureSkipVerify: true,
Expand Down Expand Up @@ -132,7 +131,7 @@ func (hg *HttpGun) Connect(results chan<- interface{}) {
// log.Printf("client: connect: %s\n", err)
// return
// }
// ss := &HttpSample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"}
// ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT")
// ss.rt = int(time.Since(connectStart).Seconds() * 1e6)
// ss.err = err
// if ss.err == nil {
Expand Down
16 changes: 6 additions & 10 deletions gun/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHttpGunWithSsl(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

result := make(chan interface{})
result := make(chan *aggregate.Sample)
requests := make(chan *http.Request)

ts := httptest.NewTLSServer(
Expand Down Expand Up @@ -52,10 +52,8 @@ func TestHttpGunWithSsl(t *testing.T) {
})
results := aggregate.Drain(ctx, result)
require.Len(t, results, 1)
sample, casted := (results[0]).(*aggregate.Sample)
require.True(t, casted, "Should be *aggregate.Sample")
assert.Equal(t, "REQUEST", sample.Tag)
assert.Equal(t, 200, sample.ProtoCode)
assert.Equal(t, "REQUEST", results[0].Tag)
assert.Equal(t, 200, results[0].ProtoCode)

select {
case r := <-requests:
Expand All @@ -81,7 +79,7 @@ func TestHttpGunWithHttp(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

result := make(chan interface{})
result := make(chan *aggregate.Sample)
requests := make(chan *http.Request)

ts := httptest.NewServer(
Expand Down Expand Up @@ -113,10 +111,8 @@ func TestHttpGunWithHttp(t *testing.T) {
})
results := aggregate.Drain(ctx, result)
require.Len(t, results, 1)
sample, casted := (results[0]).(*aggregate.Sample)
require.True(t, casted, "Should be *aggregate.Sample")
assert.Equal(t, "REQUEST", sample.Tag)
assert.Equal(t, 200, sample.ProtoCode)
assert.Equal(t, "REQUEST", results[0].Tag)
assert.Equal(t, 200, results[0].ProtoCode)

select {
case r := <-requests:
Expand Down
2 changes: 1 addition & 1 deletion gun/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type LogGun struct{}

func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error {
func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error {
log.Println("Log message: ", a.(*ammo.Log).Message)
results <- &aggregate.Sample{}
return nil
Expand Down
12 changes: 6 additions & 6 deletions gun/spdy/spdy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type SpdyGun struct {
client *spdy.Client
}

func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error {
func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- *aggregate.Sample) error {
if sg.client == nil {
if err := sg.connect(results); err != nil {
return err
Expand All @@ -40,7 +40,7 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interf
}()
}
start := time.Now()
ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"}
ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "REQUEST")
defer func() {
ss.RT = int(time.Since(start).Seconds() * 1e6)
results <- ss
Expand Down Expand Up @@ -91,10 +91,10 @@ func (sg *SpdyGun) Close() {
}
}

func (sg *SpdyGun) connect(results chan<- interface{}) error {
func (sg *SpdyGun) connect(results chan<- *aggregate.Sample) error {
// FIXME: rewrite connection logic, it isn't thread safe right now.
start := time.Now()
ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "CONNECT"}
ss := aggregate.AcquireSample(float64(start.UnixNano())/1e9, "CONNECT")
defer func() {
ss.RT = int(time.Since(start).Seconds() * 1e6)
results <- ss
Expand Down Expand Up @@ -125,7 +125,7 @@ func (sg *SpdyGun) connect(results chan<- interface{}) error {
return nil
}

func (sg *SpdyGun) Ping(results chan<- interface{}) {
func (sg *SpdyGun) Ping(results chan<- *aggregate.Sample) {
if sg.client == nil {
return
}
Expand All @@ -138,7 +138,7 @@ func (sg *SpdyGun) Ping(results chan<- interface{}) {
if !pinged {
log.Printf("client: ping: timed out\n")
}
ss := &aggregate.Sample{TS: float64(pingStart.UnixNano()) / 1e9, Tag: "PING"}
ss := aggregate.AcquireSample(float64(pingStart.UnixNano())/1e9, "PING")
ss.RT = int(time.Since(pingStart).Seconds() * 1e6)

if err == nil && pinged {
Expand Down
14 changes: 5 additions & 9 deletions gun/spdy/spdy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSpdyGun(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

result := make(chan interface{})
result := make(chan *aggregate.Sample)

gun := &SpdyGun{
target: "localhost:3000",
Expand All @@ -47,18 +47,14 @@ func TestSpdyGun(t *testing.T) {
{
// first result is connect

sample, casted := (results[0]).(*aggregate.Sample)
require.True(t, casted, "Should be *aggregate.Sample")
assert.Equal(t, "CONNECT", sample.Tag)
assert.Equal(t, 200, sample.ProtoCode)
assert.Equal(t, "CONNECT", results[0].Tag)
assert.Equal(t, 200, results[0].ProtoCode)
}
{
// second result is request

sample, casted := (results[1]).(*aggregate.Sample)
require.True(t, casted, "Should be *aggregate.Sample")
assert.Equal(t, "REQUEST", sample.Tag)
assert.Equal(t, 200, sample.ProtoCode)
assert.Equal(t, "REQUEST", results[1].Tag)
assert.Equal(t, 200, results[1].ProtoCode)
}

// TODO: test scenaries with errors
Expand Down

0 comments on commit 9afb17d

Please sign in to comment.