-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pkg/stanza/fileconsumer] Add ability to read files asynchronously #25884
Changes from 13 commits
dbc668d
b991137
9b45d03
4e5dead
1e3c11e
93dc34a
2baec2e
c364493
879493c
bf39c05
7bd826c
4ec5086
28979bb
89609a6
4f24f25
2da8f53
dbc025f
6c7b121
89bb07a
cae3290
8a8cf31
506cefa
99e507c
c822f28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: 'enhancement' | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: fileconsumer | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Added a new feature gate that enables a thread pool mechanism to respect the poll_interval parameter. | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [18908] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,15 @@ | |
package fileconsumer | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/google/uuid" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" | ||
|
@@ -20,6 +25,10 @@ type fileInputBenchmark struct { | |
config func() *Config | ||
} | ||
|
||
type fileSizeBenchmark struct { | ||
sizes [2]int | ||
} | ||
|
||
type benchFile struct { | ||
*os.File | ||
log func(int) | ||
|
@@ -180,3 +189,66 @@ func BenchmarkFileInput(b *testing.B) { | |
}) | ||
} | ||
} | ||
|
||
func max(x, y int) int { | ||
if x < y { | ||
return y | ||
} | ||
return x | ||
} | ||
|
||
func (fileSize fileSizeBenchmark) createFiles(b *testing.B, rootDir string) { | ||
// create 50 files, some with large file sizes, other's with rather small | ||
getMessage := func(m int) string { return fmt.Sprintf("message %d", m) } | ||
logs := make([]string, 0) | ||
for i := 0; i < max(fileSize.sizes[0], fileSize.sizes[1]); i++ { | ||
logs = append(logs, getMessage(i)) | ||
} | ||
|
||
for i := 0; i < 50; i++ { | ||
file := openFile(b, filepath.Join(rootDir, fmt.Sprintf("file_%s.log", uuid.NewString()))) | ||
file.WriteString(uuid.NewString() + strings.Join(logs[:fileSize.sizes[i%2]], "\n") + "\n") | ||
} | ||
} | ||
|
||
func BenchmarkFileSizeVarying(b *testing.B) { | ||
fileSize := fileSizeBenchmark{ | ||
sizes: [2]int{b.N * 5000, b.N * 10}, // Half the files will be huge, other half will be smaller | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not convinced it makes sense to scale the size of files. A benchmark should be able to characterize a specific task. The way this is defined makes the task somewhat ambiguous and doesn't actually exercise the critical functionality very much. (i.e. only calls I think we should use fixed file sizes and have b.N represent the number of times we add a set of new logs and call poll. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I have misspelled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I probably need to rename it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @djaglowski I have made number of logs configurable and have added some more params to better scale this tests. |
||
} | ||
rootDir := b.TempDir() | ||
cfg := NewConfig().includeDir(rootDir) | ||
cfg.StartAt = "beginning" | ||
cfg.MaxConcurrentFiles = 50 | ||
totalLogs := fileSize.sizes[0]*50 + fileSize.sizes[1]*50 | ||
emitCalls := make(chan *emitParams, totalLogs+10) | ||
|
||
operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) | ||
operator.persister = testutil.NewMockPersister("test") | ||
defer func() { | ||
require.NoError(b, operator.Stop()) | ||
}() | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
var once sync.Once | ||
for i := 0; i < totalLogs; i++ { | ||
once.Do(func() { | ||
// Reset once we get the first log | ||
b.ResetTimer() | ||
}) | ||
<-emitCalls | ||
} | ||
// Stop the timer, as we're measuring log throughput | ||
b.StopTimer() | ||
}() | ||
// create first set of files | ||
fileSize.createFiles(b, rootDir) | ||
operator.poll(context.Background()) | ||
|
||
// create new set of files, call poll() again | ||
fileSize.createFiles(b, rootDir) | ||
operator.poll(context.Background()) | ||
|
||
wg.Wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we generalize this a bit more so we can define multiple scenarios very easily? Something like: