diff --git a/aggregate/aggregate.go b/aggregate/aggregate.go index a97b26dd0..330bd7222 100644 --- a/aggregate/aggregate.go +++ b/aggregate/aggregate.go @@ -2,17 +2,21 @@ package aggregate import "golang.org/x/net/context" -type Sample interface { - String() string -} - type ResultListener interface { Start(context.Context) error - Sink() chan<- Sample + Sink() chan<- interface{} +} + +type resultListener struct { + sink chan<- interface{} +} + +func (rl *resultListener) Sink() chan<- interface{} { + return rl.sink } -func Drain(ctx context.Context, results <-chan Sample) []Sample { - samples := []Sample{} +func Drain(ctx context.Context, results <-chan interface{}) []interface{} { + samples := []interface{}{} loop: for { select { diff --git a/aggregate/logger.go b/aggregate/logger.go index 020383ae5..221ab9dfc 100644 --- a/aggregate/logger.go +++ b/aggregate/logger.go @@ -1,29 +1,27 @@ package aggregate import ( + "fmt" "log" "github.com/yandex/pandora/config" "golang.org/x/net/context" ) -type resultListener struct { - sink chan<- Sample -} - -func (rl *resultListener) Sink() chan<- Sample { - return rl.sink -} - // Implements ResultListener interface type LoggingResultListener struct { resultListener - source <-chan Sample + source <-chan interface{} } -func (rl *LoggingResultListener) handle(r Sample) { - log.Println(r) +func (rl *LoggingResultListener) handle(r interface{}) { + r, ok := r.(fmt.Stringer) + if !ok { + log.Println("Can't convert result to String") + } else { + log.Println(r) + } } func (rl *LoggingResultListener) Start(ctx context.Context) error { @@ -48,7 +46,7 @@ loop: } func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) { - ch := make(chan Sample, 32) + ch := make(chan interface{}, 32) return &LoggingResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/phout.go b/aggregate/phout.go index 3acaae47d..2486549ce 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -2,106 +2,34 @@ package aggregate import ( "bufio" - "fmt" "os" - "strconv" "time" "github.com/yandex/pandora/config" "golang.org/x/net/context" ) -const ( - phoutDelimiter = '\t' - phoutLineEnd = '\n' -) - -type PhoutSample struct { - TS float64 - Tag string - RT int - Connect int - Send int - Latency int - Receive int - IntervalEvent int - Egress int - Igress int - NetCode int - ProtoCode int -} - -func (ps *PhoutSample) String() string { - return fmt.Sprintf( - "%.3f\t%s\t%d\t"+ - "%d\t%d\t"+ - "%d\t%d\t"+ - "%d\t"+ - "%d\t%d\t"+ - "%d\t%d", - ps.TS, ps.Tag, ps.RT, - ps.Connect, ps.Send, - ps.Latency, ps.Receive, - ps.IntervalEvent, - ps.Egress, ps.Igress, - ps.NetCode, ps.ProtoCode, - ) -} - -func (ps *PhoutSample) AppendTo(dst []byte) []byte { - dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64) - dst = append(dst, phoutDelimiter) - dst = append(dst, ps.Tag...) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.RT), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Connect), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Send), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Latency), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Receive), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Egress), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.Igress), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.NetCode), 10) - dst = append(dst, phoutDelimiter) - dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10) - dst = append(dst, phoutLineEnd) - return dst -} - -type PhantomCompatible interface { - Sample - PhoutSample() *PhoutSample +type PhoutSerializable interface { + AppendToPhout([]byte) []byte } type PhoutResultListener struct { resultListener - source <-chan Sample + source <-chan interface{} phout *bufio.Writer buffer []byte } -func (rl *PhoutResultListener) handle(r Sample) error { - pc, ok := r.(PhantomCompatible) - if ok { - rl.buffer = pc.PhoutSample().AppendTo(rl.buffer) - _, err := rl.phout.Write(rl.buffer) - rl.buffer = rl.buffer[:0] - if err != nil { - return err - } - } else { - return fmt.Errorf("Not phantom compatible sample") +func (rl *PhoutResultListener) handle(s interface{}) error { + ps, ok := s.(PhoutSerializable) + if !ok { + panic("Result sample is not PhoutSerializable") } - return nil + rl.buffer = ps.AppendToPhout(rl.buffer) + _, err := rl.phout.Write(rl.buffer) + rl.buffer = rl.buffer[:0] + return err } func (rl *PhoutResultListener) Start(ctx context.Context) error { @@ -146,7 +74,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) { phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666) } writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB - ch := make(chan Sample, 65536) + ch := make(chan interface{}, 65536) return &PhoutResultListener{ source: ch, resultListener: resultListener{ diff --git a/aggregate/sample.go b/aggregate/sample.go new file mode 100644 index 000000000..71a07f3cb --- /dev/null +++ b/aggregate/sample.go @@ -0,0 +1,83 @@ +package aggregate + +import ( + "fmt" + "strconv" + "sync" +) + +const ( + phoutDelimiter = '\t' + phoutNewLine = '\n' +) + +var samplePool = &sync.Pool{New: func() interface{} { return &Sample{} }} + +type Sample struct { + TS float64 + Tag string + RT int + Connect int + Send int + Latency int + Receive int + IntervalEvent int + Egress int + Igress int + NetCode int + ProtoCode int + Err error +} + +func AcquireSample() *Sample { + return samplePool.Get().(*Sample) +} + +func ReleaseSample(s *Sample) { + samplePool.Put(s) +} + +func (ps *Sample) String() string { + return fmt.Sprintf( + "%.3f\t%s\t%d\t"+ + "%d\t%d\t"+ + "%d\t%d\t"+ + "%d\t"+ + "%d\t%d\t"+ + "%d\t%d", + ps.TS, ps.Tag, ps.RT, + ps.Connect, ps.Send, + ps.Latency, ps.Receive, + ps.IntervalEvent, + ps.Egress, ps.Igress, + ps.NetCode, ps.ProtoCode, + ) +} + +func (ps *Sample) AppendToPhout(dst []byte) []byte { + dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64) + dst = append(dst, phoutDelimiter) + dst = append(dst, ps.Tag...) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.RT), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Connect), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Send), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Latency), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Receive), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Egress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.Igress), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.NetCode), 10) + dst = append(dst, phoutDelimiter) + dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10) + dst = append(dst, phoutNewLine) + return dst +} diff --git a/gun/gun.go b/gun/gun.go index 95a898549..63e8b076e 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -1,11 +1,10 @@ package gun import ( - "github.com/yandex/pandora/aggregate" "github.com/yandex/pandora/ammo" "golang.org/x/net/context" ) type Gun interface { - Shoot(context.Context, ammo.Ammo, chan<- aggregate.Sample) error + Shoot(context.Context, ammo.Ammo, chan<- interface{}) error } diff --git a/gun/http/http.go b/gun/http/http.go index 55705d96d..3b8236b77 100644 --- a/gun/http/http.go +++ b/gun/http/http.go @@ -35,27 +35,25 @@ type HttpGun struct { // Shoot to target, this method is not thread safe func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, - results chan<- aggregate.Sample) error { + results chan<- interface{}) error { if hg.client == nil { hg.Connect(results) } start := time.Now() - ss := &HttpSample{ts: float64(start.UnixNano()) / 1e9, tag: "REQUEST"} + // TODO: acquire/release + ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} defer func() { - ss.rt = int(time.Since(start).Seconds() * 1e6) + ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) if !ok { - errStr := fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a) - log.Println(errStr) - ss.err = errors.New(errStr) - return ss.err + panic(fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a)) } if ha.Tag != "" { - ss.tag += "|" + ha.Tag + ss.Tag += "|" + ha.Tag } var uri string if hg.ssl { @@ -66,7 +64,8 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, req, err := http.NewRequest(ha.Method, uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } for k, v := range ha.Headers { @@ -76,14 +75,16 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, res, err := hg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } defer res.Body.Close() _, err = io.Copy(ioutil.Discard, res.Body) if err != nil { log.Printf("Error reading response body: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } @@ -91,7 +92,7 @@ func (hg *HttpGun) Shoot(ctx context.Context, a ammo.Ammo, //data := make([]byte, int(res.ContentLength)) // _, err = res.Body.(io.Reader).Read(data) // fmt.Println(string(data)) - ss.StatusCode = res.StatusCode + ss.ProtoCode = res.StatusCode return nil } @@ -99,7 +100,7 @@ func (hg *HttpGun) Close() { hg.client = nil } -func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { +func (hg *HttpGun) Connect(results chan<- interface{}) { hg.Close() config := tls.Config{ InsecureSkipVerify: true, @@ -140,44 +141,6 @@ func (hg *HttpGun) Connect(results chan<- aggregate.Sample) { // results <- ss } -type HttpSample struct { - ts float64 // Unix Timestamp in seconds - rt int // response time in milliseconds - StatusCode int // protocol status code - tag string - err error -} - -func (ds *HttpSample) PhoutSample() *aggregate.PhoutSample { - var protoCode, netCode int - if ds.err != nil { - protoCode = 500 - netCode = 999 - log.Printf("Error code. %v\n", ds.err) - } else { - netCode = 0 - protoCode = ds.StatusCode - } - return &aggregate.PhoutSample{ - TS: ds.ts, - Tag: ds.tag, - RT: ds.rt, - Connect: 0, - Send: 0, - Latency: 0, - Receive: 0, - IntervalEvent: 0, - Egress: 0, - Igress: 0, - NetCode: netCode, - ProtoCode: protoCode, - } -} - -func (ds *HttpSample) String() string { - return fmt.Sprintf("rt: %d [%d] %s", ds.rt, ds.StatusCode, ds.tag) -} - func New(c *config.Gun) (gun.Gun, error) { params := c.Parameters if params == nil { diff --git a/gun/http/http_test.go b/gun/http/http_test.go index bf315b598..43e71c150 100644 --- a/gun/http/http_test.go +++ b/gun/http/http_test.go @@ -20,7 +20,7 @@ func TestHttpGunWithSsl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan interface{}) requests := make(chan *http.Request) ts := httptest.NewTLSServer( @@ -52,11 +52,10 @@ func TestHttpGunWithSsl(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + sample, casted := (results[0]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "REQUEST", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) select { case r := <-requests: @@ -82,7 +81,7 @@ func TestHttpGunWithHttp(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan interface{}) requests := make(chan *http.Request) ts := httptest.NewServer( @@ -114,11 +113,10 @@ func TestHttpGunWithHttp(t *testing.T) { }) results := aggregate.Drain(ctx, result) require.Len(t, results, 1) - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + sample, casted := (results[0]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "REQUEST", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) select { case r := <-requests: diff --git a/gun/log.go b/gun/log.go index 174292f1c..a69813e98 100644 --- a/gun/log.go +++ b/gun/log.go @@ -3,6 +3,7 @@ package gun import ( "fmt" "log" + "strconv" "golang.org/x/net/context" @@ -13,9 +14,9 @@ import ( type LogGun struct{} -func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggregate.Sample) error { +func (l *LogGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { log.Println("Log message: ", a.(*ammo.Log).Message) - results <- &DummySample{0} + results <- &aggregate.Sample{} return nil } @@ -23,14 +24,17 @@ type DummySample struct { value int } -func (ds *DummySample) PhoutSample() *aggregate.PhoutSample { - return &aggregate.PhoutSample{} -} - func (ds *DummySample) String() string { return fmt.Sprintf("My value is %d", ds.value) } +func (ds *DummySample) AppendToPhout(dst []byte) []byte { + dst = append(dst, "My value is "...) + dst = strconv.AppendInt(dst, int64(ds.value), 10) + dst = append(dst, '\n') + return dst +} + func NewLogGunFromConfig(c *config.Gun) (g Gun, err error) { return &LogGun{}, nil } diff --git a/gun/spdy/spdy.go b/gun/spdy/spdy.go index cd328bc9d..3032cfe77 100644 --- a/gun/spdy/spdy.go +++ b/gun/spdy/spdy.go @@ -25,7 +25,7 @@ type SpdyGun struct { client *spdy.Client } -func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggregate.Sample) error { +func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- interface{}) error { if sg.client == nil { if err := sg.connect(results); err != nil { return err @@ -40,33 +40,32 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggreg }() } start := time.Now() - ss := &SpdySample{ts: float64(start.UnixNano()) / 1e9, tag: "REQUEST"} + ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "REQUEST"} defer func() { - ss.rt = int(time.Since(start).Seconds() * 1e6) + ss.RT = int(time.Since(start).Seconds() * 1e6) results <- ss }() // now send the request to obtain a http response ha, ok := a.(*ammo.Http) if !ok { - errStr := fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a) - log.Println(errStr) - ss.err = errors.New(errStr) - return ss.err + panic(fmt.Sprintf("Got '%T' instead of 'HttpAmmo'", a)) } if ha.Tag != "" { - ss.tag += "|" + ha.Tag + ss.Tag += "|" + ha.Tag } req, err := http.NewRequest(ha.Method, "https://"+ha.Host+ha.Uri, nil) if err != nil { log.Printf("Error making HTTP request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } res, err := sg.client.Do(req) if err != nil { log.Printf("Error performing a request: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } @@ -74,14 +73,15 @@ func (sg *SpdyGun) Shoot(ctx context.Context, a ammo.Ammo, results chan<- aggreg _, err = io.Copy(ioutil.Discard, res.Body) if err != nil { log.Printf("Error reading response body: %s\n", err) - ss.err = err + ss.Err = err + ss.NetCode = 999 return err } // TODO: make this an optional verbose answ_log output //data := make([]byte, int(res.ContentLength)) // _, err = res.Body.(io.Reader).Read(data) // fmt.Println(string(data)) - ss.StatusCode = res.StatusCode + ss.ProtoCode = res.StatusCode return err } @@ -91,36 +91,41 @@ func (sg *SpdyGun) Close() { } } -func (sg *SpdyGun) connect(results chan<- aggregate.Sample) error { +func (sg *SpdyGun) connect(results chan<- interface{}) error { // FIXME: rewrite connection logic, it isn't thread safe right now. - connectStart := time.Now() + start := time.Now() + ss := &aggregate.Sample{TS: float64(start.UnixNano()) / 1e9, Tag: "CONNECT"} + defer func() { + ss.RT = int(time.Since(start).Seconds() * 1e6) + results <- ss + }() config := tls.Config{ InsecureSkipVerify: true, NextProtos: []string{"spdy/3.1"}, } conn, err := tls.Dial("tcp", sg.target, &config) if err != nil { - return fmt.Errorf("client: dial: %s\n", err) + ss.Err = err + ss.NetCode = 999 + return err } client, err := spdy.NewClientConn(conn) if err != nil { - return fmt.Errorf("client: connect: %v\n", err) + ss.Err = err + ss.NetCode = 999 + return err + } else { + ss.ProtoCode = 200 } if sg.client != nil { sg.Close() } sg.client = client - ss := &SpdySample{ts: float64(connectStart.UnixNano()) / 1e9, tag: "CONNECT"} - ss.rt = int(time.Since(connectStart).Seconds() * 1e6) - ss.err = err - if ss.err == nil { - ss.StatusCode = 200 - } - results <- ss + return nil } -func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { +func (sg *SpdyGun) Ping(results chan<- interface{}) { if sg.client == nil { return } @@ -133,13 +138,14 @@ func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { if !pinged { log.Printf("client: ping: timed out\n") } - ss := &SpdySample{ts: float64(pingStart.UnixNano()) / 1e9, tag: "PING"} - ss.rt = int(time.Since(pingStart).Seconds() * 1e6) - ss.err = err - if ss.err == nil && pinged { - ss.StatusCode = 200 + ss := &aggregate.Sample{TS: float64(pingStart.UnixNano()) / 1e9, Tag: "PING"} + ss.RT = int(time.Since(pingStart).Seconds() * 1e6) + + if err == nil && pinged { + ss.ProtoCode = 200 } else { - ss.StatusCode = 500 + ss.Err = err + ss.ProtoCode = 500 } results <- ss if err != nil { @@ -147,43 +153,6 @@ func (sg *SpdyGun) Ping(results chan<- aggregate.Sample) { } } -type SpdySample struct { - ts float64 // Unix Timestamp in seconds - rt int // response time in milliseconds - StatusCode int // protocol status code - tag string - err error -} - -func (ds *SpdySample) PhoutSample() *aggregate.PhoutSample { - var protoCode, netCode int - if ds.err != nil { - protoCode = 500 - netCode = 999 - } else { - netCode = 0 - protoCode = ds.StatusCode - } - return &aggregate.PhoutSample{ - TS: ds.ts, - Tag: ds.tag, - RT: ds.rt, - Connect: 0, - Send: 0, - Latency: 0, - Receive: 0, - IntervalEvent: 0, - Egress: 0, - Igress: 0, - NetCode: netCode, - ProtoCode: protoCode, - } -} - -func (ds *SpdySample) String() string { - return fmt.Sprintf("rt: %d [%d] %s", ds.rt, ds.StatusCode, ds.tag) -} - func New(c *config.Gun) (gun.Gun, error) { params := c.Parameters if params == nil { diff --git a/gun/spdy/spdy_test.go b/gun/spdy/spdy_test.go index 4d93e95e2..07bf631dc 100644 --- a/gun/spdy/spdy_test.go +++ b/gun/spdy/spdy_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/SlyMarbo/spdy" // we specially use spdy server from another library - "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/yandex/pandora/aggregate" @@ -22,7 +21,7 @@ func TestSpdyGun(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - result := make(chan aggregate.Sample) + result := make(chan interface{}) gun := &SpdyGun{ target: "localhost:3000", @@ -47,20 +46,19 @@ func TestSpdyGun(t *testing.T) { require.Len(t, results, 2) { // first result is connect - rPhout, casted := (results[0]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - assert.Equal(t, "CONNECT", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + + sample, casted := (results[0]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "CONNECT", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) } { // second result is request - rPhout, casted := (results[1]).(aggregate.PhantomCompatible) - require.True(t, casted, "Should be phantom compatible") - phoutSample := rPhout.PhoutSample() - spew.Dump(phoutSample) - assert.Equal(t, "REQUEST", phoutSample.Tag) - assert.Equal(t, 200, phoutSample.ProtoCode) + + sample, casted := (results[1]).(*aggregate.Sample) + require.True(t, casted, "Should be *aggregate.Sample") + assert.Equal(t, "REQUEST", sample.Tag) + assert.Equal(t, 200, sample.ProtoCode) } // TODO: test scenaries with errors