Skip to content

Commit

Permalink
[chore][fileconsumer] Cleanup file handle when closed by reader (#26030)
Browse files Browse the repository at this point in the history
Reboot of #25912
  • Loading branch information
djaglowski authored Sep 5, 2023
1 parent 930b421 commit 9d5ec53
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
5 changes: 1 addition & 4 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.eof {
r.Close()
if err := os.Remove(r.file.Name()); err != nil {
m.Errorf("could not delete %s", r.file.Name())
}
r.Delete()
}
}(r)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package fileconsumer

import (
"bytes"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1476,6 +1477,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
shortFileLine := tokenWithLength(bytesPerLine - 1)
longFileLines := 100000
longFileSize := longFileLines * bytesPerLine
longFileFirstLine := "first line of long file\n"

require.NoError(t, featuregate.GlobalRegistry().Set(allowFileDeletion.ID(), true))
defer func() {
Expand All @@ -1496,6 +1498,8 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
require.NoError(t, shortFile.Close())

longFile := openTemp(t, tempDir)
_, err = longFile.WriteString(longFileFirstLine)
require.NoError(t, err)
for line := 0; line < longFileLines; line++ {
_, err := longFile.WriteString(string(tokenWithLength(bytesPerLine-1)) + "\n")
require.NoError(t, err)
Expand Down Expand Up @@ -1538,8 +1542,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
// Verify that only long file is remembered and that (0 < offset < fileSize)
require.Equal(t, 1, len(operator.knownFiles))
reader := operator.knownFiles[0]
require.Equal(t, longFile.Name(), reader.file.Name())
require.Greater(t, reader.Offset, int64(0))
require.True(t, bytes.HasPrefix(reader.Fingerprint.FirstBytes, []byte(longFileFirstLine)))
require.Less(t, reader.Offset, int64(longFileSize))
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/stanza/fileconsumer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,25 @@ func (r *reader) finalizeHeader() {
r.HeaderFinalized = true
}

// Delete will close and delete the file
func (r *reader) Delete() {
if r.file == nil {
return
}
f := r.file
r.Close()
if err := os.Remove(f.Name()); err != nil {
r.Errorf("could not delete %s", f.Name())
}
}

// Close will close the file
func (r *reader) Close() {
if r.file != nil {
if err := r.file.Close(); err != nil {
r.Debugw("Problem closing reader", zap.Error(err))
}
r.file = nil
}

if r.headerReader != nil {
Expand Down

0 comments on commit 9d5ec53

Please sign in to comment.