Skip to content

Commit

Permalink
phout caching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
direvius committed Dec 29, 2015
1 parent ea22bf8 commit d2e29c2
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions aggregate/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,22 @@ func (rl *PhoutResultListener) handle(r Sample) error {
}

func (rl *PhoutResultListener) Start(ctx context.Context) error {
go func() { // flush results file every second
for _ = range time.NewTicker(1 * time.Second).C {
rl.phout.Flush()
}
}()
defer rl.phout.Flush()
shouldFlush := time.NewTicker(1 * time.Second).C
loop:
for {
select {
case r := <-rl.source:
if err := rl.handle(r); err != nil {
return err
}
select {
case <-shouldFlush:
rl.phout.Flush()
default:
}
case <-time.After(1 * time.Second):
rl.phout.Flush()
case <-ctx.Done():
// Context is done, but we should read all data from source
for {
Expand All @@ -94,7 +98,6 @@ loop:
}
}
}
rl.phout.Flush()
return nil
}

Expand Down

0 comments on commit d2e29c2

Please sign in to comment.