-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pkg/stanza/fileconsumer] Add ability to read files asynchronously #25884
[pkg/stanza/fileconsumer] Add ability to read files asynchronously #25884
Conversation
84a9adc
to
38a697d
Compare
38a697d
to
1e3c11e
Compare
ca1d7a2
to
93dc34a
Compare
@djaglowski PR is up for review ;) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @VihasMakwana. This is my initial pass at the review. I'm still trying to wrap my head around some aspects of this implementation but I wanted to leave initial thoughts.
I mentioned here that I think it's important for us to have benchmarks in our codebase that demonstrate the improvement we want to see from the change, but we also need to have some confidence that we're not meaningfully regressing in other cases such as those where files are typically about the same size.
func (m *Manager) isCurrentlyConsuming(fp *fingerprint.Fingerprint) bool { | ||
m.trieLock.RLock() | ||
defer m.trieLock.RUnlock() | ||
return m.trie.HasKey(fp.FirstBytes) | ||
} | ||
|
||
func (m *Manager) removePath(fp *fingerprint.Fingerprint) { | ||
m.trieLock.Lock() | ||
defer m.trieLock.Unlock() | ||
m.trie.Delete(fp.FirstBytes) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just move the locking into the trie?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we can. But I didn't go with it, as I was planning to add Trie in the current model (synchronous) as well in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a whole different thing, we can discuss it in a different issue.
@djaglowski I have added the benchmark test case. |
type fileSizeBenchmark struct { | ||
sizes [2]int | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we generalize this a bit more so we can define multiple scenarios very easily? Something like:
testCases := []fileSizeBenchmark{
{
name: "varying_sizes",
sizes: map[int]int { 10: 25, 5000: 25 },
maxConcurrent: 50,
},
{
name: "same_size_small",
sizes: map[int]int { 10: 100 },
maxConcurrent: 50,
},
{
name: "same_size_small_throttled",
sizes: map[int]int { 10: 100 },
maxConcurrent: 10,
},
{
name: "same_size_large",
sizes: map[int]int { 5000: 50 },
maxConcurrent: 50,
},
{
name: "same_size_large",
sizes: map[int]int { 5000: 50 },
maxConcurrent: 5,
},
}
|
||
func BenchmarkFileSizeVarying(b *testing.B) { | ||
fileSize := fileSizeBenchmark{ | ||
sizes: [2]int{b.N * 5000, b.N * 10}, // Half the files will be huge, other half will be smaller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced it makes sense to scale the size of files. A benchmark should be able to characterize a specific task.
The way this is defined makes the task somewhat ambiguous and doesn't actually exercise the critical functionality very much. (i.e. only calls poll
twice)
I think we should use fixed file sizes and have b.N represent the number of times we add a set of new logs and call poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have misspelled sizes
.
those are actually number of logs added to files.
if those two are number of logs, as you said, is that fine by you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably need to rename it to logs
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@djaglowski I have made number of logs configurable and have added some more params to better scale this tests.
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" | ||
) | ||
|
||
type readerWrapper struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be a little more readable as readRequest
or something else more descriptive about its purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readerEnvelope
loops catchy tbh. I have such namings in some client libraries that utilize channels.
Sounds fine to you? We can do either of them tbh
operator.poll(context.Background()) | ||
operator.poll(context.Background()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test fail without these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, they do. As per the current implementation, roller.readLostFiles
works on per consume
calls, not on per poll
calls. So, per every batch, we call readLostFiles
for the previous batch. All this is done in one single poll()
.
However, for the new implementation, we need to determine if it's an old reader or not.
It's done on a generation
basis https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/25884/files#diff-a918db427e06075a5fa059d7893f54556bfa1504726ba77b3491b644a64ab818R163.
It will only detect an old reader if it's 3 poll
cycles old ;(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, you need 3 poll() calls.
for _, r := range oldReaders { | ||
if m.isCurrentlyConsuming(r.Fingerprint) { | ||
r.Close() | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we are not currently reading the file, but we did read the file again since this reader was created?
This is actually somewhat expected because we'll often see the same file in each poll interval. By the time we're discarding these old readers, we're probably read that file a couple more times.
Is there any chance that telling the old reader to read is going to start from an out of date offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current implementation, we only read again if we've determined that the file has been moved out of the matcher path. This implementation just tries to read regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation also reads it after we've determined that it's been moved out of the path based on generation
. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/25884/files#diff-a918db427e06075a5fa059d7893f54556bfa1504726ba77b3491b644a64ab818R163
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we are not currently reading the file, but we did read the file again since this reader was created?
I don't think so if that's possible. Because m.clearOldReadersConcurrent(ctx)
updates the knownFiles
and clears out old readers before the readers for current poll cycles are created.
The current poll()
will begin only when the previous poll cycle has submitted all the readers to the thread pool and the trie will be up to date.
So, there's no chance that we'd add any new file to read between the previous poll()
and current clearOldReadersConcurrent()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so if that's possible. Because m.clearOldReadersConcurrent(ctx) updates the knownFiles and clears out old readers before the readers for current poll cycles are created.
But it only removes readers from 3 poll cycles ago, so readers from the previous cycle are still there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so if that's possible. Because m.clearOldReadersConcurrent(ctx) updates the knownFiles and clears out old readers before the readers for current poll cycles are created.
But it only removes readers from 3 poll cycles ago, so readers from the previous cycle are still there.
I still think this is a problem. When we ReadToEnd
as the file is discarded, it will start from an out of date offset. This will cause duplicate ingestion of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dan, if you don't mind, can you give me a walk through with an example? I think we'll be able to sort it out fairly quickly then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll try to be concise so hopefully this is clear enough.
Let's make the situation unrealistically simple and say that one line is written to a file in between each poll cycle. At each time, we create a reader that reads to the end of the file. Each of these is a reader for the same file, but they each belong to a different generation and have a different offset.
Time | Actual Content | Offset After Reading |
---|---|---|
t0 | foo0\n |
5 |
t1 | foo0\nfoo1\n |
10 |
t2 | foo0\nfoo1\nfoo2\n |
15 |
t3 | foo0\nfoo1\nfoo2\nfoo3\n |
20 |
So at t3
, when we decide to discard the reader from t0
, if we ReadToEnd
on that reader, we will read foo1\n
, foo2\n...
again, even though foo1\n
was already read at t1
, foo2\n
was read at t2
, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. What I can do is, I will create a test case which simulates this and check it out. Should be simple.
Thanks 😊
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a tweak in logic and respecting the max_concurrent_files
, we don't hold readers open for 3 poll()
cycles anymore. So, I think we're good now.
To discuss the above-mentioned scenario, we remove a reader from m.knownFiles
if we detect a match here.
Also, while copying a reader, we don't copy the generation
.
So, every time a reader is copied, we remove the old reader from m.knownFiles
and create a fresh reader with generation
set to 0
.
The scenario would look something like this with knownFiles:
Time | Actual Content | Offset After Reading | knownFiles (before poll()) | knownFiles (after poll()) |
---|---|---|---|---|
t0 | foo0\n | 5 | [] | [reader{offset: 5, generation: 0}] |
t1 | foo0\nfoo1\n | 10 | [reader{offset: 5, generation: 0}] | [reader{offset: 10 generation: 0}] |
t2 | foo0\nfoo1\nfoo2\n | 15 | [reader{offset: 15, generation: 0}] | [reader{offset: 15, generation: 0}] |
t3 | foo0\nfoo1\nfoo2\nfoo3\n | 20 | [reader{offset: 20, generation: 0}] | [reader{offset: 20 generation: 0}] |
Note: It is assumed that we finish reading asynchronously before the next poll() is executed.
Co-authored-by: Daniel Jaglowski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this implementation is respecting max_concurrent_files
. It uses this setting to create the number of workers, which represents the number of files that can simultaneously be read. However, we have 3 generations of files sitting open in knownFiles
without any regard for this setting. The setting is intended to maximize the number of open files, not the number of workers / actively read files.
Okay, I got your point. |
By the way, do you think reducing the lost reader's generation check from 3 cycles to 2 cycles makes sense to you? It might, because if a file is unavailable for 2 straight polls(), it is unlikely it will available in 3rd. |
Probably, but I recall experiencing increased instability when dropping this in the past. Ultimately, I am hopeful we will find a design that gets away from this notion entirely. |
@djaglowski I have re-introduced batching in this implementation. Respecting max_concurrent_files skipped my mind, kind apologies. I have introduced a new helper to take care of copy-truncate duplications here. I have also tweaked the logic to clear old readers to respect the At the beginning of the batch, we close all the |
In the meantime, I'm working to reduce redundancy in test cases. |
0de6126
to
cf0ad9c
Compare
13b8064
to
0752cc3
Compare
0752cc3
to
cae3290
Compare
} | ||
|
||
func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { | ||
m.knownFilesLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@djaglowski I have made clearOldReadersConcurrent()
a kind of blocking call based on the observations made while running benchmarks.
We wait for all aggregated readers to finish reading before moving forward.
Observations:
- If the number of files matched is significantly higher than
maxConcurrentFiles
then we are likely going to batch the files more than a couple of times. - This causes
workers()
to overwhelm and they'll all be busy. - If we try to send already finished readers to the
workers()
anyway, it will cause a bottleneck, and benchmarks show a significant delay in receiving logs. - It's better to create separate goroutines to process finished readers and let the
workers()
loosen themselves up for the upcomingpoll()
cycle. - In this way, we respect
max_concurrent_files
,workers()
are happy, all's good. The benchmark performs as expected.
@djaglowski does the overall PR look stable to you, as it respects current parameters and passes all the test cases? |
@djaglowski ping ^ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VihasMakwana, thanks for iterating on this. It seems like the design has changed quite a bit and I'm struggling to wrap my head around all the nuances. As much as I'd like to see this move forward, I'm uncomfortable with how fluid the design seems to be and I lack confidence that we're making the right changes. I think a large part of the problem is that we're having to rework several overlapping concerns at the same time. Do you see any opportunities to separate concerns into individual PRs?
Okay. On a quick note, I do think making individual PRs will help moving forward. I can think of following:
|
I'm open to this, though I'm not sure I understand exactly what you have in mind. More generally, I think what we need is not necessary a direct path to the design used in this PR, but rather a path that changes the way we manage other concerns in a way that this PR does not need to worry about them. For example, we have all these different sets of files, most of which don't necessarily need to be a particular concern of an asynchronous solution.
In other words, can we redesign the way we manage any of these in such a way that it is agnostic to the implementation of the others? |
Okay. If I understand this correctly, what we require is a path of restructuring the PR which addresses individual concerns independently, without relying on others as you said. |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Closed as inactive. Feel free to reopen if this PR is still being worked on. |
Description: Added a new feature gate that enables a thread pool mechanism to respect the
poll_interval
parameter.Current Scenario:
If a file takes longer than poll_interval to consume, the current implementation would block until it consumes entirely. In other words, it doesn't respect poll_interval.
Improvisation using thread pooling:
In a thread pool model, the backend will queue the files as it proceeds and won't wait for them to consume, all the reading will be asynchronous.
Link to tracking Issue: #18908
Testing: Nothing new added, existing ones are modified as per the feature gate