Skip to content

Commit

Permalink
change phout buffer size, flush it every second
Browse files Browse the repository at this point in the history
  • Loading branch information
direvius committed Dec 27, 2015
1 parent 865daeb commit 6772bd4
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions aggregate/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ func (rl *PhoutResultListener) handle(r Sample) error {
}

func (rl *PhoutResultListener) Start(ctx context.Context) error {
go func() { // flush results file every second
for _ = range time.NewTicker(1 * time.Second).C {
rl.phout.Flush()
}
}()
loop:
for {
select {
case r := <-rl.source:
if err := rl.handle(r); err != nil {
return err
}
case <-time.After(1 * time.Second):
rl.phout.Flush()
case <-ctx.Done():
// Context is done, but we should read all data from source
for {
Expand All @@ -89,6 +92,7 @@ loop:
break loop
}
}
rl.phout.Flush()
}
}
return nil
Expand All @@ -101,7 +105,7 @@ func NewPhoutResultListener(filename string) (rl ResultListener, err error) {
} else {
phoutFile, err = os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0666)
}
writer := bufio.NewWriterSize(phoutFile, 1024*1024*4)
writer := bufio.NewWriterSize(phoutFile, 1024*512) // 512 KB
ch := make(chan Sample, 65536)
return &PhoutResultListener{
source: ch,
Expand Down

0 comments on commit 6772bd4

Please sign in to comment.