Skip to content

Commit

Permalink
controlplane/control: Support port updates
Browse files Browse the repository at this point in the history
This commit adds support for updating import target ports.

Signed-off-by: Or Ozeri <[email protected]>
  • Loading branch information
orozery committed Feb 28, 2024
1 parent 5db1161 commit a03a864
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cmd/cl-controlplane/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (o *Options) Run() error {
return fmt.Errorf("cannot create authz controllers: %w", err)
}

controlManager := control.NewManager(mgr.GetClient())
controlManager := control.NewManager(mgr.GetClient(), o.CRDMode)

xdsManager := xds.NewManager()
xds.RegisterService(
Expand Down
6 changes: 6 additions & 0 deletions config/operator/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ rules:
- patch
- update
- watch
- apiGroups:
- clusterlink.net
resources:
- imports
verbs:
- update
- apiGroups:
- clusterlink.net
resources:
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/clusterlink.net/v1alpha1/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ImportSpec struct {
// Port of the imported service.
Port uint16 `json:"port"`
// TargetPort of the imported service.
// This is the internal (non user-facing) listening port used by the dataplane pods.
TargetPort uint16 `json:"targetPort,omitempty"`
// Sources to import from.
Sources []ImportSource `json:"sources"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/bootstrap/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
{{ if .crdMode }}
- apiGroups: ["clusterlink.net"]
resources: ["imports"]
verbs: ["update"]
{{ end }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
69 changes: 48 additions & 21 deletions pkg/controlplane/control/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
// This includes target port generation for imported services, as well as
// k8s service creation per imported service.
type Manager struct {
client client.Client
ports *portManager
client client.Client
crdMode bool
ports *portManager

logger *logrus.Entry
}
Expand Down Expand Up @@ -103,14 +104,6 @@ func (m *Manager) DeleteLegacyExport(namespace string, exportSpec *api.ExportSpe
func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error {
m.logger.Infof("Adding import '%s/%s'.", imp.Namespace, imp.Name)

// TODO: port manager should map ports to imports, and be able to detect conflicts
port, err := m.ports.Lease(imp.Spec.TargetPort)
if err != nil {
return fmt.Errorf("cannot generate listening port: %w", err)
}

imp.Spec.TargetPort = port

newService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: imp.Name,
Expand All @@ -130,26 +123,59 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) error {
}

var oldService v1.Service
err = m.client.Get(
var create bool
err := m.client.Get(
ctx,
types.NamespacedName{
Name: imp.Name,
Namespace: imp.Namespace,
},
&oldService)
if err != nil {
if errors.IsNotFound(err) {
return m.client.Create(ctx, newService)
if !errors.IsNotFound(err) {
return err
}

return err
create = true
}

if serviceChanged(&oldService, newService) {
return m.client.Update(ctx, newService)
// if service exists, and import specifies a random (0) target port,
// then use existing service target port instead of allocating a new port
if !create && len(oldService.Spec.Ports) == 1 && imp.Spec.TargetPort == 0 {
imp.Spec.TargetPort = uint16(oldService.Spec.Ports[0].TargetPort.IntVal)
}

return nil
newPort := imp.Spec.TargetPort == 0

fullName := imp.Namespace + "/" + imp.Name
port, err := m.ports.Lease(fullName, imp.Spec.TargetPort)
if err != nil {
return fmt.Errorf("cannot generate listening port: %w", err)
}

if newPort {
imp.Spec.TargetPort = port
newService.Spec.Ports[0].TargetPort = intstr.FromInt32(int32(port))

if m.crdMode {
if err := m.client.Update(ctx, imp); err != nil {
m.ports.Release(port)
return err
}
}
}

if create {
err = m.client.Create(ctx, newService)
} else if serviceChanged(&oldService, newService) {
err = m.client.Update(ctx, newService)
}

if err != nil && newPort {
m.ports.Release(port)
}

return err
}

// DeleteImport removes the listening socket of a previously imported service.
Expand Down Expand Up @@ -193,12 +219,13 @@ func serviceChanged(svc1, svc2 *v1.Service) bool {
}

// NewManager returns a new control manager.
func NewManager(cl client.Client) *Manager {
func NewManager(cl client.Client, crdMode bool) *Manager {
logger := logrus.WithField("component", "controlplane.control.manager")

return &Manager{
client: cl,
ports: newPortManager(),
logger: logger,
client: cl,
crdMode: crdMode,
ports: newPortManager(),
logger: logger,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// portManager leases ports for use by imported services.
type portManager struct {
lock sync.Mutex
leasedPorts map[uint16]struct{}
leasedPorts map[uint16]string

logger *logrus.Entry
}
Expand Down Expand Up @@ -74,8 +74,8 @@ func (m *portManager) getRandomFreePort() uint16 {
return port
}

// Lease marks a port as taken. If port is 0, some random free port is returned.
func (m *portManager) Lease(port uint16) (uint16, error) {
// Lease marks a port as taken by the given name. If port is 0, some random free port is returned.
func (m *portManager) Lease(name string, port uint16) (uint16, error) {
m.logger.Infof("Leasing: %d.", port)

m.lock.Lock()
Expand All @@ -89,13 +89,13 @@ func (m *portManager) Lease(port uint16) (uint16, error) {
port = m.getRandomFreePort()
m.logger.Infof("Generated port: %d.", port)
} else {
if _, ok := m.leasedPorts[port]; ok {
return 0, fmt.Errorf("port %d is already leased", port)
if leaseName, ok := m.leasedPorts[port]; ok && leaseName != name {
return 0, fmt.Errorf("port %d is already leased to '%s'", port, leaseName)
}
}

// mark port is leased
m.leasedPorts[port] = struct{}{}
m.leasedPorts[port] = name

return port, nil
}
Expand All @@ -120,7 +120,7 @@ func newPortManager() *portManager {
).Info("Initialized.")

return &portManager{
leasedPorts: make(map[uint16]struct{}),
leasedPorts: make(map[uint16]string),
logger: logger,
}
}
6 changes: 6 additions & 0 deletions pkg/operator/controller/instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type InstanceReconciler struct {
// +kubebuilder:rbac:groups="",resources=services;serviceaccounts,verbs=list;get;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=nodes,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=list;get;watch
// +kubebuilder:rbac:groups=clusterlink.net,resources=imports,verbs=update
// +kubebuilder:rbac:groups="apps",resources=deployments,verbs=list;get;watch;create;update;patch;delete
//nolint:lll // Ignore long line warning for Kubebuilder command.
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings,verbs=list;get;watch;create;update;patch;delete
Expand Down Expand Up @@ -456,6 +457,11 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name
Resources: []string{"pods"},
Verbs: []string{"get", "list", "watch"},
},
{
APIGroups: []string{"clusterlink.net"},
Resources: []string{"imports"},
Verbs: []string{"update"},
},
},
}

Expand Down

0 comments on commit a03a864

Please sign in to comment.