From a03a864845ae88afbe84f5e491ed0dd3c7e5e3b5 Mon Sep 17 00:00:00 2001 From: Or Ozeri Date: Thu, 22 Feb 2024 14:56:04 +0200 Subject: [PATCH] controlplane/control: Support port updates This commit adds support for updating import target ports. Signed-off-by: Or Ozeri --- cmd/cl-controlplane/app/server.go | 2 +- config/operator/rbac/role.yaml | 6 ++ pkg/apis/clusterlink.net/v1alpha1/import.go | 1 + pkg/bootstrap/platform/k8s.go | 5 ++ pkg/controlplane/control/manager.go | 69 +++++++++++++------ .../control/{portmanager.go => port.go} | 14 ++-- .../controller/instance_controller.go | 6 ++ 7 files changed, 74 insertions(+), 29 deletions(-) rename pkg/controlplane/control/{portmanager.go => port.go} (87%) diff --git a/cmd/cl-controlplane/app/server.go b/cmd/cl-controlplane/app/server.go index bf5ea8367..416bf4311 100644 --- a/cmd/cl-controlplane/app/server.go +++ b/cmd/cl-controlplane/app/server.go @@ -175,7 +175,7 @@ func (o *Options) Run() error { return fmt.Errorf("cannot create authz controllers: %w", err) } - controlManager := control.NewManager(mgr.GetClient()) + controlManager := control.NewManager(mgr.GetClient(), o.CRDMode) xdsManager := xds.NewManager() xds.RegisterService( diff --git a/config/operator/rbac/role.yaml b/config/operator/rbac/role.yaml index 7ffaadc81..30b65199d 100644 --- a/config/operator/rbac/role.yaml +++ b/config/operator/rbac/role.yaml @@ -57,6 +57,12 @@ rules: - patch - update - watch +- apiGroups: + - clusterlink.net + resources: + - imports + verbs: + - update - apiGroups: - clusterlink.net resources: diff --git a/pkg/apis/clusterlink.net/v1alpha1/import.go b/pkg/apis/clusterlink.net/v1alpha1/import.go index 57e86093b..ec61e21aa 100644 --- a/pkg/apis/clusterlink.net/v1alpha1/import.go +++ b/pkg/apis/clusterlink.net/v1alpha1/import.go @@ -44,6 +44,7 @@ type ImportSpec struct { // Port of the imported service. Port uint16 `json:"port"` // TargetPort of the imported service. + // This is the internal (non user-facing) listening port used by the dataplane pods. TargetPort uint16 `json:"targetPort,omitempty"` // Sources to import from. Sources []ImportSource `json:"sources"` diff --git a/pkg/bootstrap/platform/k8s.go b/pkg/bootstrap/platform/k8s.go index ce3232067..1b86cb5cb 100644 --- a/pkg/bootstrap/platform/k8s.go +++ b/pkg/bootstrap/platform/k8s.go @@ -276,6 +276,11 @@ rules: - apiGroups: [""] resources: ["pods"] verbs: ["get", "list", "watch"] +{{ if .crdMode }} +- apiGroups: ["clusterlink.net"] + resources: ["imports"] + verbs: ["update"] +{{ end }} --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/controlplane/control/manager.go b/pkg/controlplane/control/manager.go index 3b1e7528b..b124209d7 100644 --- a/pkg/controlplane/control/manager.go +++ b/pkg/controlplane/control/manager.go @@ -36,8 +36,9 @@ import ( // This includes target port generation for imported services, as well as // k8s service creation per imported service. type Manager struct { - client client.Client - ports *portManager + client client.Client + crdMode bool + ports *portManager logger *logrus.Entry } @@ -103,14 +104,6 @@ func (m *Manager) DeleteLegacyExport(namespace string, exportSpec *api.ExportSpe func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error { m.logger.Infof("Adding import '%s/%s'.", imp.Namespace, imp.Name) - // TODO: port manager should map ports to imports, and be able to detect conflicts - port, err := m.ports.Lease(imp.Spec.TargetPort) - if err != nil { - return fmt.Errorf("cannot generate listening port: %w", err) - } - - imp.Spec.TargetPort = port - newService := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: imp.Name, @@ -130,7 +123,8 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error { } var oldService v1.Service - err = m.client.Get( + var create bool + err := m.client.Get( ctx, types.NamespacedName{ Name: imp.Name, @@ -138,18 +132,50 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error { }, &oldService) if err != nil { - if errors.IsNotFound(err) { - return m.client.Create(ctx, newService) + if !errors.IsNotFound(err) { + return err } - return err + create = true } - if serviceChanged(&oldService, newService) { - return m.client.Update(ctx, newService) + // if service exists, and import specifies a random (0) target port, + // then use existing service target port instead of allocating a new port + if !create && len(oldService.Spec.Ports) == 1 && imp.Spec.TargetPort == 0 { + imp.Spec.TargetPort = uint16(oldService.Spec.Ports[0].TargetPort.IntVal) } - return nil + newPort := imp.Spec.TargetPort == 0 + + fullName := imp.Namespace + "/" + imp.Name + port, err := m.ports.Lease(fullName, imp.Spec.TargetPort) + if err != nil { + return fmt.Errorf("cannot generate listening port: %w", err) + } + + if newPort { + imp.Spec.TargetPort = port + newService.Spec.Ports[0].TargetPort = intstr.FromInt32(int32(port)) + + if m.crdMode { + if err := m.client.Update(ctx, imp); err != nil { + m.ports.Release(port) + return err + } + } + } + + if create { + err = m.client.Create(ctx, newService) + } else if serviceChanged(&oldService, newService) { + err = m.client.Update(ctx, newService) + } + + if err != nil && newPort { + m.ports.Release(port) + } + + return err } // DeleteImport removes the listening socket of a previously imported service. @@ -193,12 +219,13 @@ func serviceChanged(svc1, svc2 *v1.Service) bool { } // NewManager returns a new control manager. -func NewManager(cl client.Client) *Manager { +func NewManager(cl client.Client, crdMode bool) *Manager { logger := logrus.WithField("component", "controlplane.control.manager") return &Manager{ - client: cl, - ports: newPortManager(), - logger: logger, + client: cl, + crdMode: crdMode, + ports: newPortManager(), + logger: logger, } } diff --git a/pkg/controlplane/control/portmanager.go b/pkg/controlplane/control/port.go similarity index 87% rename from pkg/controlplane/control/portmanager.go rename to pkg/controlplane/control/port.go index 75d2c9cb1..6a64f52b9 100644 --- a/pkg/controlplane/control/portmanager.go +++ b/pkg/controlplane/control/port.go @@ -38,7 +38,7 @@ const ( // portManager leases ports for use by imported services. type portManager struct { lock sync.Mutex - leasedPorts map[uint16]struct{} + leasedPorts map[uint16]string logger *logrus.Entry } @@ -74,8 +74,8 @@ func (m *portManager) getRandomFreePort() uint16 { return port } -// Lease marks a port as taken. If port is 0, some random free port is returned. -func (m *portManager) Lease(port uint16) (uint16, error) { +// Lease marks a port as taken by the given name. If port is 0, some random free port is returned. +func (m *portManager) Lease(name string, port uint16) (uint16, error) { m.logger.Infof("Leasing: %d.", port) m.lock.Lock() @@ -89,13 +89,13 @@ func (m *portManager) Lease(port uint16) (uint16, error) { port = m.getRandomFreePort() m.logger.Infof("Generated port: %d.", port) } else { - if _, ok := m.leasedPorts[port]; ok { - return 0, fmt.Errorf("port %d is already leased", port) + if leaseName, ok := m.leasedPorts[port]; ok && leaseName != name { + return 0, fmt.Errorf("port %d is already leased to '%s'", port, leaseName) } } // mark port is leased - m.leasedPorts[port] = struct{}{} + m.leasedPorts[port] = name return port, nil } @@ -120,7 +120,7 @@ func newPortManager() *portManager { ).Info("Initialized.") return &portManager{ - leasedPorts: make(map[uint16]struct{}), + leasedPorts: make(map[uint16]string), logger: logger, } } diff --git a/pkg/operator/controller/instance_controller.go b/pkg/operator/controller/instance_controller.go index 5d229357d..3dcfa0205 100644 --- a/pkg/operator/controller/instance_controller.go +++ b/pkg/operator/controller/instance_controller.go @@ -71,6 +71,7 @@ type InstanceReconciler struct { // +kubebuilder:rbac:groups="",resources=services;serviceaccounts,verbs=list;get;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=nodes,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=pods,verbs=list;get;watch +// +kubebuilder:rbac:groups=clusterlink.net,resources=imports,verbs=update // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=list;get;watch;create;update;patch;delete //nolint:lll // Ignore long line warning for Kubebuilder command. // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings,verbs=list;get;watch;create;update;patch;delete @@ -456,6 +457,11 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name Resources: []string{"pods"}, Verbs: []string{"get", "list", "watch"}, }, + { + APIGroups: []string{"clusterlink.net"}, + Resources: []string{"imports"}, + Verbs: []string{"update"}, + }, }, }