Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: make memberlist TCP Transport non blocking #192

Closed
pracucci opened this issue Jul 18, 2022 · 10 comments · Fixed by #525
Closed

Proposal: make memberlist TCP Transport non blocking #192

pracucci opened this issue Jul 18, 2022 · 10 comments · Fixed by #525

Comments

@pracucci
Copy link
Contributor

This proposal is based on a theory I have, but haven't proved if it can significantly improve the messages propagation yet.

The dskit's memberlist client propagate changes (e.g. hash ring changes) using broadcast messages:

func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) {
l := len(message)
b := ringBroadcast{
key: key,
content: content,
version: version,
msg: message,
finished: func(b ringBroadcast) {
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
},
logger: m.logger,
}
m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))
m.broadcasts.QueueBroadcast(b)
}

Messages are push to a queue, along with messages received from other nodes and enqueue for re-broadcasting, and then a single goroutine periodically pulls messages from the queue and send them to some other (random) nodes:
https://github.com/grafana/memberlist/blob/09ffed8adbbed3ee3ace7de977c5e1cade90da87/state.go#L604-L626

The gossip() function, responsible to pick up messages from the queue and send them to some random nodes, is called at a regular interval, configued by -memberlist.gossip-interval (200ms by default):
https://github.com/grafana/memberlist/blob/09ffed8adbbed3ee3ace7de977c5e1cade90da87/state.go#L131-L135

For each gossip interval, the gossip() function picks up messages up to UDPBufferSize, which is hardcoded to 10MB in dskit (because dskit uses a custom transport based on TCP):

// Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet".
// As we don't use UDP for sending packets, we can use higher value here.
mlCfg.UDPBufferSize = 10 * 1024 * 1024

We assume that we broadcast new messages and re-broadcast received message every -memberlist.gossip-interval=200ms but that's not guaranteed: the gossip() function make take significantly longer, because the dskit's custom TCP transport blocks on each call to rawSendMsgPacket(). In particular, the call to rawSendMsgPacket() blocks for the whole duration it takes to the TCP transport to create a new TCP connection, write the data and close it:

func (t *TCPTransport) writeTo(b []byte, addr string) error {
// Open connection, write packet header and data, data hash, close. Simple.

For example, if it can't reach a node and the connection times out the gossiping is blocked for 5 seconds (when running with the default config -memberlist.packet-dial-timeout=5s).

Proposal

The gossip() in memberlist library assumes an UDP transport, where sending packet is a "fire and forget", and basically doesn't block for a long time. On the other side, the dskit's TCP transport blocks even if it swallows any error:

// WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors,
// but memberlist library doesn't seem to cope with that very well. That is why we return nil instead.
return time.Now(), nil

In pratice, it's currently the worst of the two worlds: with TCP we can detect more delivery errors at the cost of blocking the call, but we never return the error.

I propose to make TCP transport's WriteTo() non-blocking. It preserves the current "fire and forget" behaviour without blocking the gossiping of messages to other nodes if a node is unresponsive.

@pracucci
Copy link
Contributor Author

Looking at timer_memberlist_gossip metric

The duration of gossip() execution is tracked by timer_memberlist_gossip summary metric. Since it's configured with a max age of 10s, but in our infrastructure we use a greater scrape interval, we're missing some data points but it gives good insights anyway:

Screenshot 2022-07-18 at 14 24 39

At time 11:22 UTC we started a Mimir cluster rolling update. As you can see, the gossip() duration spikes (both max and average computed on the 99th percentile over the 10s observation window).

@pracucci
Copy link
Contributor Author

I propose to make TCP transport's WriteTo() non-blocking. It preserves the current "fire and forget" behaviour without blocking the gossiping of messages to other nodes if a node is unresponsive.

On Slack, @dimitarvdimitrov pointed out that we still need to wait until messages are successfully sent on shutdown, when we send the "dead node" (leave) message. He's right. Since we're in control of the TCP transport, I think this is something we can easily do.

@bboreham
Copy link
Contributor

What happens when the TCP send buffer fills up?

@stevesg
Copy link
Contributor

stevesg commented Jul 18, 2022

I don't know if using non-blocking writes will help - I believe we create and close a connection for every WriteTo. I don't know off the top of my head what happens to the bytes in the buffer, whether closing the socket blocks or the bytes are discarded.

@pstibrany
Copy link
Member

pstibrany commented Jul 18, 2022

What happens when the TCP send buffer fills up?

In our current implementation we open new TCP connection for each memberlist "packet" or "stream". But gossiping works in a single goroutine (as this issue points out), so if remote side doesn't read the data quickly, gossiping gets stuck.

However remote side creates listener in a goroutine, and reads entire "packet" fully, before passing it to memberlist.

@pracucci
Copy link
Contributor Author

I don't know if using non-blocking writes will help - I believe we create and close a connection for every WriteTo

Exactly, and that's a blocking operation. The whole gossiping from a node blocks on each of such operation.

@stevesg
Copy link
Contributor

stevesg commented Jul 18, 2022

Ah I see - the connect/disconnect is async too.

It would be more efficient to use UDP when possible, opening a new connection for every packet is quite wasteful.

If we go this route, we would have to limit the number of concurrent operations, otherwise we could risk holding a large number of TCP connections open. If unbounded, then a system with e.g. N memberlist members would be able to open N TCP connections.

Looking at it from a different angle: If we're comfortable opening N connections, then why not hold the connections open to save the connect/disconnect time.

@pracucci
Copy link
Contributor Author

If we go this route, we would have to limit the number of concurrent operations, otherwise we could risk holding a large number of TCP connections open.

Sure. It should be limited, and block once over the limit.

Looking at it from a different angle: If we're comfortable opening N connections, then why not hold the connections open to save the connect/disconnect time.

💯 agree and I would like to discuss it in a separate issue. I think that's another issue we have to address, but IMO we may suffer by a blocking implementation anyway, that's why I think making the transport WriteTo() async is desired anyway.

@pracucci
Copy link
Contributor Author

Looking at it from a different angle: If we're comfortable opening N connections, then why not hold the connections open to save the connect/disconnect time.

Opened a dedicated issue: #193

@pracucci
Copy link
Contributor Author

pracucci commented Aug 3, 2022

A PoC of this proposal was experimented by @dimitarvdimitrov in grafana/mimir#1597 and reported as a significant improvement to reduce propagation times during rollouts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants