-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(plugin-server): Preserve distinct ID locality on overflow rerouting #20945
Conversation
6e3c7b7
to
0de49eb
Compare
0de49eb
to
bedda6b
Compare
@@ -255,7 +255,7 @@ async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[] | |||
queue.pluginsServer.kafkaProducer.produce({ | |||
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, | |||
value: message.value, | |||
key: null, // No locality guarantees in overflow | |||
key: message.key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message.key
can be empty if capture-side overflow detection triggers.
As we want to evaluate impact and probably don't want to invest a lot for now, I'm fine with:
- checking how much capture-side detection triggers (on both new and old capture) vs plugin-server-side
- disable capture-side detection on both captures for now while we evalutate this
Last monday's incident has shown us that we can read & unmarshall really fast (1.6M/minute with 8 historical pods dropping on token), so capture-side might not be really necessary.
The alternative would be to re-compute a key if missing, but then that's a third copy of that code to maintain, I'd rather avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to hold off on merging this for a bit — I had assumed there was already a method to turn off overflow routing wholesale on both captures but it doesn't look like that was a valid assumption for me to make. Seems like it'd make sense to take care of that first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be turned off in python, but you are right that it cannot be turned off in rust yet. I'd put very high thresholds in the rust config while working on a PR to add a boolean config to completely disable it.
BTW, this capture-side detection is kind of a scalability time bomb as its memory usage is O(active distinct_id), so it needs to eventually be phased out anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be turned off in python, but you are right that it cannot be turned off in rust yet.
Ah, I was thinking we'd want to bypass this entire conditional, but maybe that's overkill.
BTW, this capture-side detection is kind of a scalability time bomb as its memory usage is O(active distinct_id), so it needs to eventually be phased out anyway.
Good point, I hadn't really considered that — thanks for mentioning it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, we'd need to skip the check against LIKELY_ANONYMOUS_IDS too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ability to turn off random partitioning completely in old capture is here: #21168 (My initial thought was to keep that PR separate, but in retrospect, I suppose this could have been part of this change too.)
plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts
Outdated
Show resolved
Hide resolved
Well written description. Seems like a win to me. In the future, with more reliability visibility, we should (auto) blackhole extremely loud distinct_ids to alleviate the risk of single partition becoming a huge issue. |
… reverse compatibility and no surprises during deploy
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length) | ||
await Promise.all( | ||
kafkaMessages.map((message) => | ||
queue.pluginsServer.kafkaProducer.produce({ | ||
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, | ||
value: message.value, | ||
key: null, // No locality guarantees in overflow | ||
key: useRandomPartitioner ? undefined : message.key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a change here where we set it to undefined
instead of null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, this was the issue.
The produce
call here eventually forwards to a HighLevelProducer
instance, which does some extra stuff with the key
in produce. If the key
ends up being undefined
it just quietly doesn't produce the message, and the callback is never called.
This is also the behavior that I saw testing this manually in the node REPL:
> const { HighLevelProducer } = require('node-rdkafka')
undefined
> const p = new HighLevelProducer({'bootstrap.servers': 'kafka:9092'})
undefined
> p.connect()
// truncated
> > p.produce('garbage-topic', null, Buffer.from('message'), 'key', undefined, (...a) => { console.log('callback:', a) })
undefined
> callback: [ null, 1 ]
> p.produce('garbage-topic', null, Buffer.from('message'), undefined, undefined, (...a) => { console.log('callback:', a) })
undefined
// nothing ever happens here
What I figure happened is that any message that should have been routed to overflow never resolved it's promise, and the consumers simply stopped making forward progress once they saw a batch containing one of those messages.
The key
property is typed as MessageKey
, which does include undefined, and the HighLevelProducer.produce
signature accepts any
, which explains why this wasn't caught by the type checker.
…low rerouting (#20945) Turned off by default for backwards compatibility for now.
Problem
When messages are rerouted from the main topic to the overflow topic, we've historically stripped them of their
key
so that they are uniformly distributed over all partitions within the overflow topic, rather than being targeted at a single partition. This is helpful for distributing CPU-intensive workloads or other workloads that are not reliant on shared state, as it allows the independently parallelizable aspects of the processing loop to be performed independently, resulting in overall throughput improvements.However, the hotspots in our workload these days seem to typically be person property updates, which require row level locks in Postgres to be held while performing row updates. This can cause problems when paired with messages that are uniformly distributed over all of the partitions in the overflow topic:
Changes
Preserve the message key when publishing to the overflow topic, so that semantic routing/locality is preserved over distinct IDs.
This isn't a perfect solution, it does have it's tradeoffs:
Advantages
Disadvantages
retention.bytes
to avoid losing messages to retention if one partition is extremely overloaded)Does this work well for both Cloud and self-hosted?
Yep!
How did you test this code?
Updated tests.