Skip to content

Commit

Permalink
Merge pull request #323 from orozery/xds
Browse files Browse the repository at this point in the history
controlplane/xds: Introduce Manager
  • Loading branch information
orozery authored Feb 21, 2024
2 parents b689d8e + 4bf4cca commit 290562f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 73 deletions.
8 changes: 5 additions & 3 deletions cmd/cl-controlplane/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (o *Options) Run() error {
runnableManager.AddServer(grpcServerAddress, grpcServer)
runnableManager.AddServer(controlplaneServerListenAddress, sniProxy)

xdsManager := xds.NewManager()
xds.RegisterService(
context.Background(), xdsManager, grpcServer.GetGRPCServer())

// open store
kvStore, err := bolt.Open(StoreFile)
if err != nil {
Expand All @@ -178,14 +182,12 @@ func (o *Options) Run() error {

storeManager := kv.NewManager(kvStore)

cp, err := controlplane.NewInstance(parsedCertData, storeManager, namespace)
cp, err := controlplane.NewInstance(parsedCertData, storeManager, xdsManager, namespace)
if err != nil {
return err
}

authz.RegisterHandlers(cp, &httpServer.Server)
xds.RegisterService(
context.Background(), cp, grpcServer.GetGRPCServer())
cprest.RegisterHandlers(cp, httpServer)

return runnableManager.Run()
Expand Down
96 changes: 63 additions & 33 deletions pkg/controlplane/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ import (
"fmt"
"sync"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/lestrrat-go/jwx/jwk"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/clusterlink-net/clusterlink/cmd/cl-dataplane/app"
"github.com/clusterlink-net/clusterlink/pkg/api"
"github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1"
"github.com/clusterlink-net/clusterlink/pkg/controlplane/peer"
cpstore "github.com/clusterlink-net/clusterlink/pkg/controlplane/store"
"github.com/clusterlink-net/clusterlink/pkg/controlplane/xds"
"github.com/clusterlink-net/clusterlink/pkg/platform/k8s"
"github.com/clusterlink-net/clusterlink/pkg/policyengine"
"github.com/clusterlink-net/clusterlink/pkg/policyengine/policytypes"
Expand All @@ -41,7 +44,8 @@ const (

// Instance of a controlplane, where all API servers delegate their requested actions to.
type Instance struct {
peerTLS *tls.ParsedCertData
namespace string
peerTLS *tls.ParsedCertData

peers *cpstore.Peers
exports *cpstore.Exports
Expand All @@ -53,7 +57,7 @@ type Instance struct {
peerLock sync.RWMutex
peerClient map[string]*peer.Client

xdsManager *xdsManager
xdsManager *xds.Manager
ports *portManager
policyDecider policyengine.PolicyDecider
platform *k8s.Platform
Expand All @@ -66,6 +70,35 @@ type Instance struct {
logger *logrus.Entry
}

func toK8SImport(imp *cpstore.Import, namespace string) *v1alpha1.Import {
return &v1alpha1.Import{
ObjectMeta: metav1.ObjectMeta{
Name: imp.Name,
Namespace: namespace,
},
Spec: v1alpha1.ImportSpec{
Port: imp.ImportSpec.Service.Port,
TargetPort: imp.Port,
},
}
}

func toK8SPeer(pr *cpstore.Peer) *v1alpha1.Peer {
k8sPeer := &v1alpha1.Peer{
ObjectMeta: metav1.ObjectMeta{Name: pr.Name},
Spec: v1alpha1.PeerSpec{
Gateways: make([]v1alpha1.Endpoint, len(pr.Gateways)),
},
}

for i, gw := range pr.PeerSpec.Gateways {
k8sPeer.Spec.Gateways[i].Host = gw.Host
k8sPeer.Spec.Gateways[i].Port = gw.Port
}

return k8sPeer
}

// CreatePeer defines a new route target for egress dataplane connections.
func (cp *Instance) CreatePeer(pr *cpstore.Peer) error {
cp.logger.Infof("Creating peer '%s'.", pr.Name)
Expand All @@ -83,7 +116,7 @@ func (cp *Instance) CreatePeer(pr *cpstore.Peer) error {
cp.peerClient[pr.Name] = client
cp.peerLock.Unlock()

if err := cp.xdsManager.AddPeer(pr); err != nil {
if err := cp.xdsManager.AddPeer(toK8SPeer(pr)); err != nil {
// practically impossible
return err
}
Expand Down Expand Up @@ -120,7 +153,7 @@ func (cp *Instance) UpdatePeer(pr *cpstore.Peer) error {
cp.peerClient[pr.Name] = client
cp.peerLock.Unlock()

if err := cp.xdsManager.AddPeer(pr); err != nil {
if err := cp.xdsManager.AddPeer(toK8SPeer(pr)); err != nil {
// practically impossible
return err
}
Expand Down Expand Up @@ -192,12 +225,8 @@ func (cp *Instance) CreateExport(export *cpstore.Export) error {
}
}

if err := cp.xdsManager.AddExport(export); err != nil {
// practically impossible
return err
}

return nil
return cp.xdsManager.AddLegacyExport(
export.Name, cp.namespace, export.Service.Host, export.Service.Port)
}

// UpdateExport updates a new route target for ingress dataplane connections.
Expand Down Expand Up @@ -225,12 +254,7 @@ func (cp *Instance) UpdateExport(export *cpstore.Export) error {
cp.platform.UpdateExternalService(exportPrefix+export.Name, eSpec.Service.Host, eSpec.ExternalService)
}

if err := cp.xdsManager.AddExport(export); err != nil {
// practically impossible
return err
}

return nil
return cp.xdsManager.AddLegacyExport(export.Name, cp.namespace, export.Service.Host, export.Service.Port)
}

// GetExport returns an existing export.
Expand Down Expand Up @@ -259,7 +283,11 @@ func (cp *Instance) DeleteExport(name string) (*cpstore.Export, error) {
}
}

if err := cp.xdsManager.DeleteExport(name); err != nil {
namespacedName := types.NamespacedName{
Name: name,
Namespace: cp.namespace,
}
if err := cp.xdsManager.DeleteExport(namespacedName); err != nil {
// practically impossible
return export, err
}
Expand Down Expand Up @@ -293,7 +321,8 @@ func (cp *Instance) CreateImport(imp *cpstore.Import) error {
}
}

if err := cp.xdsManager.AddImport(imp); err != nil {
k8sImp := toK8SImport(imp, cp.namespace)
if err := cp.xdsManager.AddImport(k8sImp); err != nil {
// practically impossible
return err
}
Expand All @@ -318,7 +347,8 @@ func (cp *Instance) UpdateImport(imp *cpstore.Import) error {
return err
}

if err := cp.xdsManager.AddImport(imp); err != nil {
k8sImp := toK8SImport(imp, cp.namespace)
if err := cp.xdsManager.AddImport(k8sImp); err != nil {
// practically impossible
return err
}
Expand Down Expand Up @@ -346,7 +376,11 @@ func (cp *Instance) DeleteImport(name string) (*cpstore.Import, error) {
return nil, nil
}

if err := cp.xdsManager.DeleteImport(name); err != nil {
namespacedName := types.NamespacedName{
Name: name,
Namespace: cp.namespace,
}
if err := cp.xdsManager.DeleteImport(namespacedName); err != nil {
// practically impossible
return imp, err
}
Expand Down Expand Up @@ -540,16 +574,6 @@ func (cp *Instance) GetAllLBPolicies() []*cpstore.LBPolicy {
return cp.lbPolicies.GetAll()
}

// GetXDSClusterManager returns the xDS cluster manager.
func (cp *Instance) GetXDSClusterManager() cache.Cache {
return cp.xdsManager.clusters
}

// GetXDSListenerManager returns the xDS listener manager.
func (cp *Instance) GetXDSListenerManager() cache.Cache {
return cp.xdsManager.listeners
}

// init initializes the controlplane manager.
func (cp *Instance) init() error {
// generate the JWK key
Expand Down Expand Up @@ -630,7 +654,12 @@ func (cp *Instance) generateJWK() error {
}

// NewInstance returns a new controlplane instance.
func NewInstance(peerTLS *tls.ParsedCertData, storeManager store.Manager, namespace string) (*Instance, error) {
func NewInstance(
peerTLS *tls.ParsedCertData,
storeManager store.Manager,
xdsManager *xds.Manager,
namespace string,
) (*Instance, error) {
logger := logrus.WithField("component", "controlplane")

// initialize platform
Expand Down Expand Up @@ -676,6 +705,7 @@ func NewInstance(peerTLS *tls.ParsedCertData, storeManager store.Manager, namesp
logger.Infof("Loaded %d load-balancing policies.", lbPolicies.Len())

cp := &Instance{
namespace: namespace,
peerTLS: peerTLS,
peerClient: make(map[string]*peer.Client),
peers: peers,
Expand All @@ -684,7 +714,7 @@ func NewInstance(peerTLS *tls.ParsedCertData, storeManager store.Manager, namesp
bindings: bindings,
acPolicies: acPolicies,
lbPolicies: lbPolicies,
xdsManager: newXDSManager(),
xdsManager: xdsManager,
ports: newPortManager(),
policyDecider: policyengine.NewPolicyHandler(),
platform: pp,
Expand Down
Loading

0 comments on commit 290562f

Please sign in to comment.