Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove temp files on error in Compactor.writeNewFiles (#26074) #26080

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,17 @@ func (c *Compactor) RemoveTmpFiles(files []string) error {
return errors.Join(errs...)
}

func (c *Compactor) RemoveTmpFilesOnErr(files []string, originalErrs ...error) error {
removeErr := c.RemoveTmpFiles(files)
if removeErr == nil {
return errors.Join(originalErrs...)
} else if errJoin, ok := removeErr.(interface{ Unwrap() []error }); ok {
return errors.Join(append(originalErrs, errJoin.Unwrap()...)...)
} else {
return errors.Join(append(originalErrs, removeErr)...)
}
}

// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
Expand Down Expand Up @@ -1089,7 +1100,8 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
// Only return an error if we couldn't remove the temp files
return nil, c.RemoveTmpFilesOnErr(files, err)
}
break
} else if errors.As(err, &eInProgress) {
Expand All @@ -1100,27 +1112,14 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
// planner keeps track of which files are assigned to compaction plans now.
logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName))
}
return nil, err
} else if err != nil {
var errs []error
errs = append(errs, err)
// We hit an error and didn't finish the compaction. Abort.
// Remove any tmp files we already completed
// discard later errors to return the first one from the write() call
for _, f := range files {
err = os.RemoveAll(f)
if err != nil {
errs = append(errs, err)
}
}
// Remove the temp file
// discard later errors to return the first one from the write() call
err = os.RemoveAll(fileName)
if err != nil {
errs = append(errs, err)
}

return nil, errors.Join(errs...)
return nil, c.RemoveTmpFilesOnErr(files, err)
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
// Remove any tmp files we already completed, as well as the current
// file we were writing to.
return nil, c.RemoveTmpFilesOnErr(files, err, os.RemoveAll(fileName))
}

files = append(files, fileName)
Expand Down