Skip to content

Commit

Permalink
refactor aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
direvius committed Jan 13, 2016
1 parent 43f7507 commit 0918263
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 254 deletions.
18 changes: 11 additions & 7 deletions aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package aggregate

import "golang.org/x/net/context"

type Sample interface {
String() string
}

type ResultListener interface {
Start(context.Context) error
Sink() chan<- Sample
Sink() chan<- interface{}
}

type resultListener struct {
sink chan<- interface{}
}

func (rl *resultListener) Sink() chan<- interface{} {
return rl.sink
}

func Drain(ctx context.Context, results <-chan Sample) []Sample {
samples := []Sample{}
func Drain(ctx context.Context, results <-chan interface{}) []interface{} {
samples := []interface{}{}
loop:
for {
select {
Expand Down
22 changes: 10 additions & 12 deletions aggregate/logger.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package aggregate

import (
"fmt"
"log"

"github.com/yandex/pandora/config"
"golang.org/x/net/context"
)

type resultListener struct {
sink chan<- Sample
}

func (rl *resultListener) Sink() chan<- Sample {
return rl.sink
}

// Implements ResultListener interface
type LoggingResultListener struct {
resultListener

source <-chan Sample
source <-chan interface{}
}

func (rl *LoggingResultListener) handle(r Sample) {
log.Println(r)
func (rl *LoggingResultListener) handle(r interface{}) {
r, ok := r.(fmt.Stringer)
if !ok {
log.Println("Can't convert result to String")
} else {
log.Println(r)
}
}

func (rl *LoggingResultListener) Start(ctx context.Context) error {
Expand All @@ -48,7 +46,7 @@ loop:
}

func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) {
ch := make(chan Sample, 32)
ch := make(chan interface{}, 32)
return &LoggingResultListener{
source: ch,
resultListener: resultListener{
Expand Down
96 changes: 12 additions & 84 deletions aggregate/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,106 +2,34 @@ package aggregate

import (
"bufio"
"fmt"
"os"
"strconv"
"time"

"github.com/yandex/pandora/config"
"golang.org/x/net/context"
)

const (
phoutDelimiter = '\t'
phoutLineEnd = '\n'
)

type PhoutSample struct {
TS float64
Tag string
RT int
Connect int
Send int
Latency int
Receive int
IntervalEvent int
Egress int
Igress int
NetCode int
ProtoCode int
}

func (ps *PhoutSample) String() string {
return fmt.Sprintf(
"%.3f\t%s\t%d\t"+
"%d\t%d\t"+
"%d\t%d\t"+
"%d\t"+
"%d\t%d\t"+
"%d\t%d",
ps.TS, ps.Tag, ps.RT,
ps.Connect, ps.Send,
ps.Latency, ps.Receive,
ps.IntervalEvent,
ps.Egress, ps.Igress,
ps.NetCode, ps.ProtoCode,
)
}

func (ps *PhoutSample) AppendTo(dst []byte) []byte {
dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64)
dst = append(dst, phoutDelimiter)
dst = append(dst, ps.Tag...)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.RT), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Connect), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Send), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Latency), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Receive), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Egress), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Igress), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.NetCode), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10)
dst = append(dst, phoutLineEnd)
return dst
}

type PhantomCompatible interface {
Sample
PhoutSample() *PhoutSample
type PhoutSerializable interface {
AppendToPhout([]byte) []byte
}

type PhoutResultListener struct {
resultListener

source <-chan Sample
source <-chan interface{}
phout *bufio.Writer
buffer []byte
}

func (rl *PhoutResultListener) handle(r Sample) error {
pc, ok := r.(PhantomCompatible)
if ok {
rl.buffer = pc.PhoutSample().AppendTo(rl.buffer)
_, err := rl.phout.Write(rl.buffer)
rl.buffer = rl.buffer[:0]
if err != nil {
return err
}
} else {
return fmt.Errorf("Not phantom compatible sample")
func (rl *PhoutResultListener) handle(s interface{}) error {
ps, ok := s.(PhoutSerializable)
if !ok {
panic("Result sample is not PhoutSerializable")
}
return nil
rl.buffer = ps.AppendToPhout(rl.buffer)
_, err := rl.phout.Write(rl.buffer)
rl.buffer = rl.buffer[:0]
return err
}

func (rl *PhoutResultListener) Start(ctx context.Context) error {
Expand Down Expand Up @@ -146,7 +74,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) {
phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
}
writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB
ch := make(chan Sample, 65536)
ch := make(chan interface{}, 65536)
return &PhoutResultListener{
source: ch,
resultListener: resultListener{
Expand Down
83 changes: 83 additions & 0 deletions aggregate/sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package aggregate

import (
"fmt"
"strconv"
"sync"
)

const (
phoutDelimiter = '\t'
phoutNewLine = '\n'
)

var samplePool = &sync.Pool{New: func() interface{} { return &Sample{} }}

type Sample struct {
TS float64
Tag string
RT int
Connect int
Send int
Latency int
Receive int
IntervalEvent int
Egress int
Igress int
NetCode int
ProtoCode int
Err error
}

func AcquireSample() *Sample {
return samplePool.Get().(*Sample)
}

func ReleaseSample(s *Sample) {
samplePool.Put(s)
}

func (ps *Sample) String() string {
return fmt.Sprintf(
"%.3f\t%s\t%d\t"+
"%d\t%d\t"+
"%d\t%d\t"+
"%d\t"+
"%d\t%d\t"+
"%d\t%d",
ps.TS, ps.Tag, ps.RT,
ps.Connect, ps.Send,
ps.Latency, ps.Receive,
ps.IntervalEvent,
ps.Egress, ps.Igress,
ps.NetCode, ps.ProtoCode,
)
}

func (ps *Sample) AppendToPhout(dst []byte) []byte {
dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64)
dst = append(dst, phoutDelimiter)
dst = append(dst, ps.Tag...)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.RT), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Connect), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Send), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Latency), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Receive), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Egress), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Igress), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.NetCode), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10)
dst = append(dst, phoutNewLine)
return dst
}
3 changes: 1 addition & 2 deletions gun/gun.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package gun

import (
"github.com/yandex/pandora/aggregate"
"github.com/yandex/pandora/ammo"
"golang.org/x/net/context"
)

type Gun interface {
Shoot(context.Context, ammo.Ammo, chan<- aggregate.Sample) error
Shoot(context.Context, ammo.Ammo, chan<- interface{}) error
}
Loading

0 comments on commit 0918263

Please sign in to comment.