diff --git a/aggregate/phout.go b/aggregate/phout.go index e3ce63458..12d67e900 100644 --- a/aggregate/phout.go +++ b/aggregate/phout.go @@ -68,11 +68,8 @@ func (rl *PhoutResultListener) handle(r Sample) error { } func (rl *PhoutResultListener) Start(ctx context.Context) error { - go func() { // flush results file every second - for _ = range time.NewTicker(1 * time.Second).C { - rl.phout.Flush() - } - }() + defer rl.phout.Flush() + shouldFlush := time.NewTicker(1 * time.Second).C loop: for { select { @@ -80,6 +77,13 @@ loop: if err := rl.handle(r); err != nil { return err } + select { + case <-shouldFlush: + rl.phout.Flush() + default: + } + case <-time.After(1 * time.Second): + rl.phout.Flush() case <-ctx.Done(): // Context is done, but we should read all data from source for { @@ -94,7 +98,6 @@ loop: } } } - rl.phout.Flush() return nil }