Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix loadbalancer reentrant rlock #10511

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []str
return false, err
}
if len(addresses) == 0 {
logrus.Infof("Waiting for apiserver addresses")
logrus.Infof("Waiting for supervisor to provide apiserver addresses")
return false, nil
}
return true, nil
Expand Down Expand Up @@ -370,10 +370,9 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve configuration from server")
}

// If the supervisor and externally-facing apiserver are not on the same port, tell the proxy where to find the apiserver.
if controlConfig.SupervisorPort != controlConfig.HTTPSPort {
isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{envInfo.NodeIP.String()}[0]))
isIPv6 := utilsnet.IsIPv6(net.ParseIP(util.GetFirstValidIPString(envInfo.NodeIP)))
if err := proxy.SetAPIServerPort(controlConfig.HTTPSPort, isIPv6); err != nil {
return nil, errors.Wrapf(err, "failed to set apiserver port to %d", controlConfig.HTTPSPort)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/k3s-io/k3s/pkg/version"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -167,11 +168,12 @@ func (lb *LoadBalancer) dialContext(ctx context.Context, network, _ string) (net
if server == nil || targetServer == "" {
logrus.Debugf("Nil server for load balancer %s: %s", lb.serviceName, targetServer)
} else if allChecksFailed || server.healthCheck() {
dialTime := time.Now()
conn, err := server.dialContext(ctx, network, targetServer)
if err == nil {
return conn, nil
}
logrus.Debugf("Dial error from load balancer %s: %s", lb.serviceName, err)
logrus.Debugf("Dial error from load balancer %s after %s: %s", lb.serviceName, time.Now().Sub(dialTime), err)
// Don't close connections to the failed server if we're retrying with health checks ignored.
// We don't want to disrupt active connections if it is unlikely they will have anywhere to go.
if !allChecksFailed {
Expand Down
8 changes: 5 additions & 3 deletions pkg/agent/loadbalancer/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ func (lb *LoadBalancer) setServers(serverAddresses []string) bool {
return true
}

// nextServer attempts to get the next server in the loadbalancer server list.
// If another goroutine has already updated the current server address to point at
// a different address than just failed, nothing is changed. Otherwise, a new server address
// is stored to the currentServerAddress field, and returned for use.
// This function must always be called by a goroutine that holds a read lock on the loadbalancer mutex.
func (lb *LoadBalancer) nextServer(failedServer string) (string, error) {
lb.mutex.RLock()
defer lb.mutex.RUnlock()

// note: these fields are not protected by the mutex, so we clamp the index value and update
// the index/current address using local variables, to avoid time-of-check vs time-of-use
// race conditions caused by goroutine A incrementing it in between the time goroutine B
Expand Down
21 changes: 16 additions & 5 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func createProxyAndValidateToken(ctx context.Context, cfg *cmds.Agent) (proxy.Pr
if err := os.MkdirAll(agentDir, 0700); err != nil {
return nil, err
}
isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{cfg.NodeIP.String()}[0]))
isIPv6 := utilsnet.IsIPv6(net.ParseIP(util.GetFirstValidIPString(cfg.NodeIP)))

proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort, isIPv6)
if err != nil {
Expand Down Expand Up @@ -530,20 +530,31 @@ func setupTunnelAndRunAgent(ctx context.Context, nodeConfig *daemonconfig.Node,
}

func waitForAPIServerAddresses(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent, proxy proxy.Proxy) error {
var localSupervisorDefault bool
if addresses := proxy.SupervisorAddresses(); len(addresses) > 0 {
host, _, _ := net.SplitHostPort(addresses[0])
if host == "127.0.0.1" || host == "::1" {
localSupervisorDefault = true
}
}

for {
select {
case <-time.After(5 * time.Second):
logrus.Info("Waiting for apiserver addresses")
logrus.Info("Waiting for control-plane node to register apiserver addresses in etcd")
case addresses := <-cfg.APIAddressCh:
for i, a := range addresses {
host, _, err := net.SplitHostPort(a)
if err == nil {
addresses[i] = net.JoinHostPort(host, strconv.Itoa(nodeConfig.ServerHTTPSPort))
if i == 0 {
proxy.SetSupervisorDefault(addresses[i])
}
}
}
// If this is an etcd-only node that started up using its local supervisor,
brandond marked this conversation as resolved.
Show resolved Hide resolved
// switch to using a control-plane node as the supervisor. Otherwise, leave the
// configured server address as the default.
if localSupervisorDefault && len(addresses) > 0 {
proxy.SetSupervisorDefault(addresses[0])
}
proxy.Update(addresses)
return nil
case <-ctx.Done():
Expand Down
21 changes: 18 additions & 3 deletions pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,33 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
// The loadbalancer is only disabled when there is a local apiserver. Servers without a local
// apiserver load-balance to themselves initially, then switch over to an apiserver node as soon
// as we get some addresses from the code below.
var localSupervisorDefault bool
if addresses := proxy.SupervisorAddresses(); len(addresses) > 0 {
host, _, _ := net.SplitHostPort(addresses[0])
if host == "127.0.0.1" || host == "::1" {
localSupervisorDefault = true
}
}

if proxy.IsSupervisorLBEnabled() && proxy.SupervisorURL() != "" {
logrus.Info("Getting list of apiserver endpoints from server")
// If not running an apiserver locally, try to get a list of apiservers from the server we're
// connecting to. If that fails, fall back to querying the endpoints list from Kubernetes. This
// fallback requires that the server we're joining be running an apiserver, but is the only safe
// thing to do if its supervisor is down-level and can't provide us with an endpoint list.
if addresses := agentconfig.APIServers(ctx, config, proxy); len(addresses) > 0 {
proxy.SetSupervisorDefault(addresses[0])
addresses := agentconfig.APIServers(ctx, config, proxy)
logrus.Infof("Got apiserver addresses from supervisor: %v", addresses)

if len(addresses) > 0 {
if localSupervisorDefault {
proxy.SetSupervisorDefault(addresses[0])
}
proxy.Update(addresses)
} else {
if endpoint, _ := client.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{}); endpoint != nil {
if addresses := util.GetAddresses(endpoint); len(addresses) > 0 {
addresses = util.GetAddresses(endpoint)
logrus.Infof("Got apiserver addresses from kubernetes endpoints: %v", addresses)
if len(addresses) > 0 {
proxy.Update(addresses)
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/daemons/agent/agent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func createRootlessConfig(argsMap map[string]string, controllers map[string]bool

func kubeProxyArgs(cfg *config.Agent) map[string]string {
bindAddress := "127.0.0.1"
isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{cfg.NodeIP}[0]))
if isIPv6 {
if utilsnet.IsIPv6(net.ParseIP(cfg.NodeIP)) {
bindAddress = "::1"
}
argsMap := map[string]string{
Expand Down Expand Up @@ -67,8 +66,7 @@ func kubeProxyArgs(cfg *config.Agent) map[string]string {

func kubeletArgs(cfg *config.Agent) map[string]string {
bindAddress := "127.0.0.1"
isIPv6 := utilsnet.IsIPv6(net.ParseIP([]string{cfg.NodeIP}[0]))
if isIPv6 {
if utilsnet.IsIPv6(net.ParseIP(cfg.NodeIP)) {
bindAddress = "::1"
}
argsMap := map[string]string{
Expand Down
25 changes: 19 additions & 6 deletions pkg/daemons/agent/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package agent

import (
"net"
"os"
"path/filepath"
"strings"

"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
utilsnet "k8s.io/utils/net"
)

const (
Expand All @@ -21,8 +22,7 @@ const (

func kubeProxyArgs(cfg *config.Agent) map[string]string {
bindAddress := "127.0.0.1"
_, IPv6only, _ := util.GetFirstString([]string{cfg.NodeIP})
if IPv6only {
if utilsnet.IsIPv6(net.ParseIP(cfg.NodeIP)) {
bindAddress = "::1"
}
argsMap := map[string]string{
Expand Down Expand Up @@ -95,9 +95,22 @@ func kubeletArgs(cfg *config.Agent) map[string]string {
if cfg.NodeName != "" {
argsMap["hostname-override"] = cfg.NodeName
}
defaultIP, err := net.ChooseHostInterface()
if err != nil || defaultIP.String() != cfg.NodeIP {
argsMap["node-ip"] = cfg.NodeIP

// If the embedded CCM is disabled, don't assume that dual-stack node IPs are safe.
// When using an external CCM, the user wants dual-stack node IPs, they will need to set the node-ip kubelet arg directly.
// This should be fine since most cloud providers have their own way of finding node IPs that doesn't depend on the kubelet
// setting them.
if cfg.DisableCCM {
dualStack, err := utilsnet.IsDualStackIPs(cfg.NodeIPs)
if err == nil && !dualStack {
argsMap["node-ip"] = cfg.NodeIP
}
} else {
// Cluster is using the embedded CCM, we know that the feature-gate will be enabled there as well.
argsMap["feature-gates"] = util.AddFeatureGate(argsMap["feature-gates"], "CloudDualStackNodeIPs=true")
if nodeIPs := util.JoinIPs(cfg.NodeIPs); nodeIPs != "" {
argsMap["node-ip"] = util.JoinIPs(cfg.NodeIPs)
}
}

argsMap["node-labels"] = strings.Join(cfg.NodeLabels, ",")
Expand Down