Skip to content

Commit

Permalink
Attempt to upload accumulated files at exit
Browse files Browse the repository at this point in the history
  • Loading branch information
artyom committed Mar 27, 2022
1 parent d520b12 commit 509cda9
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ func run(ctx context.Context, args runArgs) error {
defer ln.Close()

srv := &server{dir: args.Dir, ch: make(chan json.RawMessage, 1000), wake: make(chan struct{})}
defer func() {
c, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.uploadOnce(c, args.Bucket, args.Prefix, upl); err != nil {
log.Printf("final upload attempt: %v", err)
}
}()
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error { <-ctx.Done(); return ln.Close() })
group.Go(func() error { return srv.ingest(ctx, args.Mb<<20, args.D) })
Expand Down Expand Up @@ -172,6 +179,21 @@ type server struct {
}

func (srv *server) upload(ctx context.Context, d time.Duration, bucket, prefix string, upl *manager.Uploader) error {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
_ = srv.uploadOnce(ctx, bucket, prefix, upl)
case <-srv.wake:
_ = srv.uploadOnce(ctx, bucket, prefix, upl)
}
}
}

func (srv *server) uploadOnce(ctx context.Context, bucket, prefix string, upl *manager.Uploader) error {
walkFn := func(path string, d fs.DirEntry, err error) error {
if err != nil || d.IsDir() || !strings.HasSuffix(path, ".json.gz") {
return nil
Expand All @@ -191,18 +213,7 @@ func (srv *server) upload(ctx context.Context, d time.Duration, bucket, prefix s
}
return nil
}
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
_ = filepath.WalkDir(srv.dir, walkFn)
case <-srv.wake:
_ = filepath.WalkDir(srv.dir, walkFn)
}
}
return filepath.WalkDir(srv.dir, walkFn)
}

func uploadFile(ctx context.Context, upl *manager.Uploader, bucket, prefix, name string) error {
Expand Down

0 comments on commit 509cda9

Please sign in to comment.