Skip to content

Commit

Permalink
Merge pull request #21 from yandex/pooling
Browse files Browse the repository at this point in the history
Object pools
  • Loading branch information
direvius committed Jan 19, 2016
2 parents d2e29c2 + b474ee9 commit e3606ac
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 302 deletions.
18 changes: 11 additions & 7 deletions aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package aggregate

import "golang.org/x/net/context"

type Sample interface {
String() string
}

type ResultListener interface {
Start(context.Context) error
Sink() chan<- Sample
Sink() chan<- *Sample
}

type resultListener struct {
sink chan<- *Sample
}

func (rl *resultListener) Sink() chan<- *Sample {
return rl.sink
}

func Drain(ctx context.Context, results <-chan Sample) []Sample {
samples := []Sample{}
func Drain(ctx context.Context, results <-chan *Sample) []*Sample {
samples := []*Sample{}
loop:
for {
select {
Expand Down
17 changes: 5 additions & 12 deletions aggregate/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,16 @@ import (
"golang.org/x/net/context"
)

type resultListener struct {
sink chan<- Sample
}

func (rl *resultListener) Sink() chan<- Sample {
return rl.sink
}

// Implements ResultListener interface
type LoggingResultListener struct {
resultListener

source <-chan Sample
source <-chan *Sample
}

func (rl *LoggingResultListener) handle(r Sample) {
log.Println(r)
func (rl *LoggingResultListener) handle(s *Sample) {
log.Println(s)
ReleaseSample(s)
}

func (rl *LoggingResultListener) Start(ctx context.Context) error {
Expand All @@ -48,7 +41,7 @@ loop:
}

func NewLoggingResultListener(*config.ResultListener) (ResultListener, error) {
ch := make(chan Sample, 32)
ch := make(chan *Sample, 32)
return &LoggingResultListener{
source: ch,
resultListener: resultListener{
Expand Down
64 changes: 12 additions & 52 deletions aggregate/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,28 @@ package aggregate

import (
"bufio"
"fmt"
"os"
"time"

"github.com/yandex/pandora/config"
"golang.org/x/net/context"
)

type PhoutSample struct {
TS float64
Tag string
RT int
Connect int
Send int
Latency int
Receive int
IntervalEvent int
Egress int
Igress int
NetCode int
ProtoCode int
}

func (ps *PhoutSample) String() string {
return fmt.Sprintf(
"%.3f\t%s\t%d\t"+
"%d\t%d\t"+
"%d\t%d\t"+
"%d\t"+
"%d\t%d\t"+
"%d\t%d",
ps.TS, ps.Tag, ps.RT,
ps.Connect, ps.Send,
ps.Latency, ps.Receive,
ps.IntervalEvent,
ps.Egress, ps.Igress,
ps.NetCode, ps.ProtoCode,
)
}

type PhantomCompatible interface {
Sample
PhoutSample() *PhoutSample
}

type PhoutResultListener struct {
resultListener

source <-chan Sample
source <-chan *Sample
phout *bufio.Writer
buffer []byte
}

func (rl *PhoutResultListener) handle(r Sample) error {
pc, ok := r.(PhantomCompatible)
if ok {
_, err := rl.phout.WriteString(fmt.Sprintf("%s\n", pc.PhoutSample()))
if err != nil {
return err
}
} else {
return fmt.Errorf("Not phantom compatible sample")
}
return nil
func (rl *PhoutResultListener) handle(s *Sample) error {

rl.buffer = s.AppendToPhout(rl.buffer)
_, err := rl.phout.Write(rl.buffer)
rl.buffer = rl.buffer[:0]
ReleaseSample(s)
return err
}

func (rl *PhoutResultListener) Start(ctx context.Context) error {
Expand Down Expand Up @@ -109,13 +68,14 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) {
phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
}
writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB
ch := make(chan Sample, 65536)
ch := make(chan *Sample, 65536)
return &PhoutResultListener{
source: ch,
resultListener: resultListener{
sink: ch,
},
phout: writer,
phout: writer,
buffer: make([]byte, 0, 1024),
}, nil
}

Expand Down
97 changes: 97 additions & 0 deletions aggregate/sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package aggregate

import (
"fmt"
"strconv"
"sync"
)

const (
phoutDelimiter = '\t'
phoutNewLine = '\n'
)

var samplePool = &sync.Pool{New: func() interface{} { return &Sample{} }}

type Sample struct {
TS float64
Tag string
RT int
Connect int
Send int
Latency int
Receive int
IntervalEvent int
Egress int
Igress int
NetCode int
ProtoCode int
Err error
}

func AcquireSample(ts float64, tag string) *Sample {
s := samplePool.Get().(*Sample)
s.TS = ts
s.Tag = tag
s.RT = 0
s.Connect = 0
s.Send = 0
s.Latency = 0
s.Receive = 0
s.IntervalEvent = 0
s.Egress = 0
s.Igress = 0
s.NetCode = 0
s.ProtoCode = 0
s.Err = nil
return s
}

func ReleaseSample(s *Sample) {
samplePool.Put(s)
}

func (ps *Sample) String() string {
return fmt.Sprintf(
"%.3f\t%s\t%d\t"+
"%d\t%d\t"+
"%d\t%d\t"+
"%d\t"+
"%d\t%d\t"+
"%d\t%d",
ps.TS, ps.Tag, ps.RT,
ps.Connect, ps.Send,
ps.Latency, ps.Receive,
ps.IntervalEvent,
ps.Egress, ps.Igress,
ps.NetCode, ps.ProtoCode,
)
}

func (ps *Sample) AppendToPhout(dst []byte) []byte {
dst = strconv.AppendFloat(dst, ps.TS, 'f', 3, 64)
dst = append(dst, phoutDelimiter)
dst = append(dst, ps.Tag...)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.RT), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Connect), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Send), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Latency), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Receive), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.IntervalEvent), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Egress), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.Igress), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.NetCode), 10)
dst = append(dst, phoutDelimiter)
dst = strconv.AppendInt(dst, int64(ps.ProtoCode), 10)
dst = append(dst, phoutNewLine)
return dst
}
24 changes: 19 additions & 5 deletions ammo/ammo.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
package ammo

import "golang.org/x/net/context"
import (
"sync"

"golang.org/x/net/context"
)

type Provider interface {
Start(context.Context) error
Source() <-chan Ammo
Release(Ammo) // return unused Ammo object to memory pool
}

type BaseProvider struct {
decoder Decoder
source <-chan Ammo
pool sync.Pool
}

type Ammo interface{}

type Decoder func([]byte) (Ammo, error)
type Decoder interface {
Decode([]byte, Ammo) (Ammo, error)
}

func NewBaseProvider(source <-chan Ammo, decoder Decoder) *BaseProvider {
func NewBaseProvider(source <-chan Ammo, decoder Decoder, New func() interface{}) *BaseProvider {
return &BaseProvider{
source: source,
decoder: decoder,
pool: sync.Pool{New: New},
}
}

func (ap *BaseProvider) Source() <-chan Ammo {
return ap.source
}

func (ap *BaseProvider) Decode(src []byte) (Ammo, error) {
return ap.decoder(src)
func (ap *BaseProvider) Release(a Ammo) {
ap.pool.Put(a)
}

func (ap *BaseProvider) decode(src []byte) (Ammo, error) {
a := ap.pool.Get()
return ap.decoder.Decode(src, a)
}

// Drain reads all ammos from ammo.Provider. Useful for tests.
Expand Down
16 changes: 11 additions & 5 deletions ammo/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ type Log struct {
Message string
}

// LogJSONDecode implements ammo.Decoder interface
func LogJSONDecode(jsonDoc []byte) (Ammo, error) {
a := &Log{}
// LogJSONDecoder implements ammo.Decoder interface
type LogJSONDecoder struct{}

func (*LogJSONDecoder) Decode(jsonDoc []byte, a Ammo) (Ammo, error) {
err := json.Unmarshal(jsonDoc, a)
return a, err
}

func NewLogJSONDecoder() Decoder {
return &LogJSONDecoder{}
}

type LogProvider struct {
*BaseProvider

Expand All @@ -31,7 +36,7 @@ func (ap *LogProvider) Start(ctx context.Context) error {
defer close(ap.sink)
loop:
for i := 0; i < ap.size; i++ {
if a, err := ap.Decode([]byte(fmt.Sprintf(`{"message": "Job #%d"}`, i))); err == nil {
if a, err := ap.decode([]byte(fmt.Sprintf(`{"message": "Job #%d"}`, i))); err == nil {
select {
case ap.sink <- a:
case <-ctx.Done():
Expand All @@ -52,7 +57,8 @@ func NewLogAmmoProvider(c *config.AmmoProvider) (Provider, error) {
sink: ammoCh,
BaseProvider: NewBaseProvider(
ammoCh,
LogJSONDecode,
NewLogJSONDecoder(),
func() interface{} { return &Log{} },
),
}
return ap, nil
Expand Down
Loading

0 comments on commit e3606ac

Please sign in to comment.