Skip to content

Commit

Permalink
Use Typha's discovery library.
Browse files Browse the repository at this point in the history
(cherry picked from commit 82d17c8)
  • Loading branch information
fasaxc committed Dec 7, 2020
1 parent eb0c16a commit 55a557a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 75 deletions.
8 changes: 8 additions & 0 deletions config/config_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/projectcalico/libcalico-go/lib/numorstring"

"github.com/projectcalico/felix/idalloc"
"github.com/projectcalico/typha/pkg/discovery"
)

var (
Expand Down Expand Up @@ -759,6 +760,13 @@ func (config *Config) OverrideParam(name, value string) (bool, error) {
return config.UpdateFrom(config.internalOverrides, InternalOverride)
}

func (config *Config) TyphaDiscoveryOpts() []discovery.Option {
return []discovery.Option{
discovery.WithAddrOverride(config.TyphaAddr),
discovery.WithKubeService(config.TyphaK8sNamespace, config.TyphaK8sServiceName),
}
}

func New() *Config {
if knownParams == nil {
loadParams()
Expand Down
91 changes: 16 additions & 75 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/projectcalico/felix/buildinfo"
"github.com/projectcalico/felix/calc"
"github.com/projectcalico/felix/config"
_ "github.com/projectcalico/felix/config"
dp "github.com/projectcalico/felix/dataplane"
"github.com/projectcalico/felix/jitter"
"github.com/projectcalico/felix/logutils"
"github.com/projectcalico/felix/policysync"
"github.com/projectcalico/felix/proto"
"github.com/projectcalico/felix/statusrep"
"github.com/projectcalico/felix/usagerep"
"github.com/projectcalico/libcalico-go/lib/apiconfig"
apiv3 "github.com/projectcalico/libcalico-go/lib/apis/v3"
"github.com/projectcalico/libcalico-go/lib/backend"
Expand All @@ -65,7 +53,20 @@ import (
"github.com/projectcalico/libcalico-go/lib/options"
"github.com/projectcalico/libcalico-go/lib/set"
"github.com/projectcalico/pod2daemon/binder"
"github.com/projectcalico/typha/pkg/discovery"
"github.com/projectcalico/typha/pkg/syncclient"

"github.com/projectcalico/felix/buildinfo"
"github.com/projectcalico/felix/calc"
"github.com/projectcalico/felix/config"
_ "github.com/projectcalico/felix/config"
dp "github.com/projectcalico/felix/dataplane"
"github.com/projectcalico/felix/jitter"
"github.com/projectcalico/felix/logutils"
"github.com/projectcalico/felix/policysync"
"github.com/projectcalico/felix/proto"
"github.com/projectcalico/felix/statusrep"
"github.com/projectcalico/felix/usagerep"
)

const (
Expand Down Expand Up @@ -1206,68 +1207,8 @@ func (fc *DataplaneConnector) Start() {
go fc.handleWireguardStatUpdateFromDataplane()
}

var ErrServiceNotReady = errors.New("Kubernetes service missing IP or port.")

func discoverTyphaAddr(configParams *config.Config, k8sClientSet kubernetes.Interface) (string, error) {
if configParams.TyphaAddr != "" {
// Explicit address; trumps other sources of config.
return configParams.TyphaAddr, nil
}

if configParams.TyphaK8sServiceName == "" {
// No explicit address, and no service name, not using Typha.
return "", nil
}

if k8sClientSet == nil {
return "", errors.New("failed to look up Typha, no Kubernetes client available")
}

// If we get here, we need to look up the Typha service endpoints using the k8s API.
epClient := k8sClientSet.CoreV1().Endpoints(configParams.TyphaK8sNamespace)
eps, err := epClient.Get(configParams.TyphaK8sServiceName, metav1.GetOptions{})
if err != nil {
log.WithError(err).Error("Unable to get Typha service endpoints from Kubernetes.")
return "", err
}

candidates := set.New()

for _, subset := range eps.Subsets {
var portForOurVersion int32
for _, port := range subset.Ports {
if port.Name == "calico-typha" {
portForOurVersion = port.Port
break
}
}

if portForOurVersion == 0 {
continue
}

// If we get here, this endpoint supports the typha port we're looking for.
for _, h := range subset.Addresses {
typhaAddr := net.JoinHostPort(h.IP, fmt.Sprint(portForOurVersion))
candidates.Add(typhaAddr)
}
}

if candidates.Len() == 0 {
log.Error("Didn't find any ready Typha instances.")
return "", ErrServiceNotReady
}

var addrs []string
candidates.Iter(func(item interface{}) error {
typhaAddr := item.(string)
addrs = append(addrs, typhaAddr)
return nil
})
log.WithField("addrs", addrs).Info("Found ready Typha addresses.")
n := rand.Intn(len(addrs))
chosenAddr := addrs[n]
log.WithField("choice", chosenAddr).Info("Chose Typha to connect to.")

return chosenAddr, nil
typhaDiscoveryOpts := configParams.TyphaDiscoveryOpts()
typhaDiscoveryOpts = append(typhaDiscoveryOpts, discovery.WithKubeClient(k8sClientSet))
return discovery.DiscoverTyphaAddr(typhaDiscoveryOpts...)
}

0 comments on commit 55a557a

Please sign in to comment.