diff --git a/main.go b/main.go index b836c85..279940b 100644 --- a/main.go +++ b/main.go @@ -136,6 +136,13 @@ func run(ctx context.Context, args runArgs) error { defer ln.Close() srv := &server{dir: args.Dir, ch: make(chan json.RawMessage, 1000), wake: make(chan struct{})} + defer func() { + c, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := srv.uploadOnce(c, args.Bucket, args.Prefix, upl); err != nil { + log.Printf("final upload attempt: %v", err) + } + }() group, ctx := errgroup.WithContext(ctx) group.Go(func() error { <-ctx.Done(); return ln.Close() }) group.Go(func() error { return srv.ingest(ctx, args.Mb<<20, args.D) }) @@ -172,6 +179,21 @@ type server struct { } func (srv *server) upload(ctx context.Context, d time.Duration, bucket, prefix string, upl *manager.Uploader) error { + ticker := time.NewTicker(d) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + _ = srv.uploadOnce(ctx, bucket, prefix, upl) + case <-srv.wake: + _ = srv.uploadOnce(ctx, bucket, prefix, upl) + } + } +} + +func (srv *server) uploadOnce(ctx context.Context, bucket, prefix string, upl *manager.Uploader) error { walkFn := func(path string, d fs.DirEntry, err error) error { if err != nil || d.IsDir() || !strings.HasSuffix(path, ".json.gz") { return nil @@ -191,18 +213,7 @@ func (srv *server) upload(ctx context.Context, d time.Duration, bucket, prefix s } return nil } - ticker := time.NewTicker(d) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - _ = filepath.WalkDir(srv.dir, walkFn) - case <-srv.wake: - _ = filepath.WalkDir(srv.dir, walkFn) - } - } + return filepath.WalkDir(srv.dir, walkFn) } func uploadFile(ctx context.Context, upl *manager.Uploader, bucket, prefix, name string) error {