Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted load balancing policy #1922

Draft
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Draft

Conversation

akhaku
Copy link
Contributor

@akhaku akhaku commented Mar 25, 2024

This PR open-sources the work Joey Lynch and I did and presented at ApacheCon 2022. It requires a bit of work before it's mergeable, but hoping to get some feedback on a couple areas before I put in much more work. A couple major things left to do:

  • Port the tests we added internally. We also have some jmh tests but I'm not sure if those will fit in the driver codebase
  • Pass the local rack through to the driver. Internally we passed it through the context, for OSS perhaps the right thing it to implement something like the optional/required local DC pattern.
  • The documentation could probably use some more work, but I'd love guidance on where to focus that: the implementation class? Or reference.conf?

@akhaku
Copy link
Contributor Author

akhaku commented Mar 25, 2024

Additionally, this will tackle JAVA-3040

@aratno
Copy link
Contributor

aratno commented Mar 25, 2024

The documentation could probably use some more work, but I'd love guidance on where to focus that: the implementation class? Or reference.conf?

I'd put configuration documentation in reference.conf. For more context on why someone would benefit from this LBP over the default, put that in manual/core/load_balancing/README.md.

*
* The default weights are good for the vast majority of use cases, but you can tweak them to get different behavior.
*/
public class RackAwareWeightedLoadBalancingPolicy extends DefaultLoadBalancingPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is going to be most useful in environments like AWS, where Cassandra racks correspond to AWS AZs, and there are relatively few racks, and a high likelihood that an application has some instances running in the same rack as Cassandra. In other environments where racks correspond to actual racks in a datacenter, there are many more racks, and the likelihood of an application running in the same rack is low, there's not as much benefit to prioritizing rack-alignment as heavily.

Does that sound right based on your experience?

It would be useful to have a metric for the count of requests that go to each category (rack-aligned, replica-aligned, instance-starting, instance-unhealthy), so users can know whether they're setting local rack correctly, set weights correctly, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's certainly more valuable for cases where the rack->rack latency is higher. However, being rack-aware is just one part of it, I think the mechanism around preferring nodes with fewer in-flight requests is an improvement over the round-robin mechanism (assuming non-token-aware, or all replicas) in DefaultLoadBalancingPolicy/BasicLoadBalancingPolicy. Additionally, the getAndUpdate in those balancers ends up being a bottleneck in high-throughput situations. It's been a while since our ApacheCon talk and there's no recording but the slides are available here https://www.apachecon.com/acna2022/slides/04-Khaku-Lynch_Improving-Cassandra-Client.pdf and they go into the problems a bit.

My eventual goal is to make this the default load balancing policy in the driver but for now I'll settle for getting it in and making it an option.

Regarding the metrics - that's perhaps a little tricky since we're creating a query plan here rather than firing off requests. Additionally, the scoring/ordering is decoupled from the characteristics that resulted in the score - perhaps some trace logging with the characteristics? In practice we saw latencies drop immediately when we deployed this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the changes outside of rack-awareness make this worth recommending broadly. I wouldn't support making it the default until it's been available for at least a release, but worth pushing it in that direction.


// By default we will only score this many nodes, the rest will get added on without scoring.
// We don't usually need to score every single node if there are more than a few.
static final int DEFAULT_SCORED_PLAN_SIZE = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would make sense to increase this value if using a large cluster and lots of token-unaware queries (ALLOW FILTERING, WHERE TOKEN, WHERE pk IN), since replica-local does not apply but rack-local does, and in a larger cluster the likelihood of an 8-node random sample including a rack-local Cassandra host is lower than some might like. Increasing the scored plan size will increase the likelihood that a rack-local host is used out of the sample.

Does that sound right to you? Any other situations to tune this based on your experience?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair, this default was chosen to fit with our requirements but a larger default may make sense for a more general use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a reasonable default - just thinking through what we should document for tuning.

private final double weightNonRack;
private final double weightNonReplica;
private final double weightStarting;
private final double weightUnhealthy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like something that was lost in the 4.x rewrite was the error-aware LBP: https://github.com/apache/cassandra-java-driver/blob/3.x/driver-core/src/main/java/com/datastax/driver/core/policies/ErrorAwarePolicy.java

Not requesting any change here - just realized that this doesn't exist anymore. Could be useful to incorporate those ideas in the future.

}

protected double getWeightedScore(Node node, Session session, long nowMillis, long nowNanos, boolean isReplica) {
int base = Math.min(32768, 1 + getInFlight(node, session));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getInFlight calls InFlightHandler#getInFlight, which isn't thread-safe. StreamIdGenerator#maxAvailableIds is a plain int, so could be cached on a calling thread. Risks of this value being inaccurate are low, could update maxAvailableIds to be a volatile, either way not a huge concern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxAvailableIds is a final instance variable on StreamIdGenerator so I think we're safe here, unless I misunderstand how that works. And availableIds() returns the value of an AtomicInteger, so I think that means that InFlightHandler#getInFlight is thread-safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - it's final, missed that.

}

// Take a copy of nodes and reference to replicas since the node map is concurrent
Set<Node> dcNodeSet = getLiveNodes().dc(getLocalDatacenter());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user sets a local DC, the current implementation only plans and scores for Cassandra hosts in the local DC. If a user doesn't set a local DC, there's no weighting to encourage the query plan to select a coordinator in the local DC over one in a remote DC, right? If there's no local DC configured, getLocalDatacenter is empty, and dcNodeSet ends up being all nodes.

If a user uses all non-local consistency levels (QUORUM, etc.) and doesn't want all traffic to go to a coordinator in the local DC but does want to encourage the query planner to pick coordinators in the local DC, how would they configure that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the RackAwareWeightedLoadBalancingPolicy extends DefaultLoadBalancingPolicy, which uses MandatoryLocalDcHelper and so requires a local DC to be set. I can however see an argument where it instead extends BasicLoadBalancingPolicy, in a world where RAWLBP is a more general solution with sensible defaults (kinda like UCS is for Cassandra compaction?) and you can tweak weights to get what you want.

Regarding non-local consistency level - if we allow scoring non-local-DC nodes, I think we'd still want to weight the local ones higher and rely on the server-side snitch to figure out a quorum partner if needed.

The weighted load balancing policy uses a number of factors
as weights along with the number of in-flight requests for
each host to score and then sort the list of known live
hosts.
@akhaku akhaku changed the title Rack-aware weighted load balancing policy Weighted load balancing policy Mar 25, 2024
@akhaku akhaku marked this pull request as draft April 9, 2024 18:21
@tolbertam tolbertam self-requested a review April 12, 2024 18:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants