Skip to content

Commit

Permalink
Support RabbitMQ operator policies (#752)
Browse files Browse the repository at this point in the history
* Bump Golang

Use Golang 1.21 locally on the dev machine and bump Golang to
1.21 in Dockerfile

make manifests

* Support RabbitMQ operator policies

Closes #202

* Add workaround for Go 1.21

since codeql does not (yet) support Go 1.21, see
github/codeql-action#1842

* Fix test flake

* Add missing test

* Apply feedback
  • Loading branch information
ansd authored Jan 29, 2024
1 parent 2f527d4 commit b8b67df
Show file tree
Hide file tree
Showing 53 changed files with 2,738 additions and 432 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:
tags: [ "v*" ]

env:
GO_VERSION: '1.20.x' # Require Go 1.20 minor
GO_VERSION: '1.21.x' # Require Go 1.21 minor

jobs:
unit_integration_tests:
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4

# Manually install the right version of Go
# See https://github.com/github/codeql-action/issues/1842 and https://github.com/github/codeql/issues/13992
- name: Install Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches: [ main ]

env:
GO_VERSION: '1.20.x' # Require Go 1.20.x
GO_VERSION: '1.21.x' # Require Go 1.21.x

jobs:

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM --platform=$BUILDPLATFORM golang:1.20 as builder
FROM --platform=$BUILDPLATFORM golang:1.21 as builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
90 changes: 90 additions & 0 deletions api/v1beta1/operatorpolicy_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package v1beta1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// OperatorPolicySpec defines the desired state of OperatorPolicy
// https://www.rabbitmq.com/parameters.html#operator-policies
type OperatorPolicySpec struct {
// Required property; cannot be updated
// +kubebuilder:validation:Required
Name string `json:"name"`
// Default to vhost '/'; cannot be updated
// +kubebuilder:default:=/
Vhost string `json:"vhost,omitempty"`
// Regular expression pattern used to match queues, e.g. "^my-queue$".
// Required property.
// +kubebuilder:validation:Required
Pattern string `json:"pattern"`
// What this operator policy applies to: 'queues', 'classic_queues', 'quorum_queues', 'streams'.
// Default to 'queues'.
// +kubebuilder:validation:Enum=queues;classic_queues;quorum_queues;streams
// +kubebuilder:default:=queues
ApplyTo string `json:"applyTo,omitempty"`
// Default to '0'.
// In the event that more than one operator policy can match a given queue, the operator policy with the greatest priority applies.
// +kubebuilder:default:=0
Priority int `json:"priority,omitempty"`
// OperatorPolicy definition. Required property.
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Required
Definition *runtime.RawExtension `json:"definition"`
// Reference to the RabbitmqCluster that the operator policy will be created in.
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
}

// OperatorPolicyStatus defines the observed state of OperatorPolicy
type OperatorPolicyStatus struct {
// observedGeneration is the most recent successful generation observed for this OperatorPolicy. It corresponds to the
// OperatorPolicy's generation, which is updated on mutation by the API Server.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
Conditions []Condition `json:"conditions,omitempty"`
}

// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:categories=all;rabbitmq
// +kubebuilder:subresource:status

// OperatorPolicy is the Schema for the operator policies API
type OperatorPolicy struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec OperatorPolicySpec `json:"spec,omitempty"`
Status OperatorPolicyStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// OperatorPolicyList contains a list of OperatorPolicy
type OperatorPolicyList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []OperatorPolicy `json:"items"`
}

func (p *OperatorPolicy) GroupResource() schema.GroupResource {
return schema.GroupResource{
Group: p.GroupVersionKind().Group,
Resource: p.GroupVersionKind().Kind,
}
}

func (p *OperatorPolicy) RabbitReference() RabbitmqClusterReference {
return p.Spec.RabbitmqClusterReference
}

func (p *OperatorPolicy) SetStatusConditions(c []Condition) {
p.Status.Conditions = c
}

func init() {
SchemeBuilder.Register(&OperatorPolicy{}, &OperatorPolicyList{})
}
117 changes: 117 additions & 0 deletions api/v1beta1/operatorpolicy_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package v1beta1

import (
"context"

"k8s.io/apimachinery/pkg/runtime"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("OperatorPolicy", func() {
var (
namespace = "default"
ctx = context.Background()
)

It("creates an operator policy with minimal configurations", func() {
policy := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "test-operator-policy",
Namespace: namespace,
},
Spec: OperatorPolicySpec{
Name: "test-operator-policy",
Pattern: "^some-prefix",
Definition: &runtime.RawExtension{
Raw: []byte(`{"max-length": 10}`),
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &policy)).To(Succeed())
fetched := &OperatorPolicy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: policy.Name,
Namespace: policy.Namespace,
}, fetched)).To(Succeed())
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "some-cluster",
}))
Expect(fetched.Spec.Name).To(Equal("test-operator-policy"))
Expect(fetched.Spec.Vhost).To(Equal("/"))
Expect(fetched.Spec.Pattern).To(Equal("^some-prefix"))
Expect(fetched.Spec.ApplyTo).To(Equal("queues"))
Expect(fetched.Spec.Priority).To(Equal(0))
Expect(fetched.Spec.Definition.Raw).To(Equal([]byte(`{"max-length":10}`)))
})

It("creates operator policy with configurations", func() {
policy := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "random-policy",
Namespace: namespace,
},
Spec: OperatorPolicySpec{
Name: "test-policy",
Vhost: "/hello",
Pattern: "*.",
ApplyTo: "quorum_queues",
Priority: 100,
Definition: &runtime.RawExtension{
Raw: []byte(`{"max-length":10}`),
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &policy)).To(Succeed())
fetched := &OperatorPolicy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: policy.Name,
Namespace: policy.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.Name).To(Equal("test-policy"))
Expect(fetched.Spec.Vhost).To(Equal("/hello"))
Expect(fetched.Spec.Pattern).To(Equal("*."))
Expect(fetched.Spec.ApplyTo).To(Equal("quorum_queues"))
Expect(fetched.Spec.Priority).To(Equal(100))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(
RabbitmqClusterReference{
Name: "random-cluster",
}))
Expect(fetched.Spec.Definition.Raw).To(Equal([]byte(`{"max-length":10}`)))
})

When("creating a policy with an invalid 'ApplyTo' value", func() {
It("fails with validation errors", func() {
policy := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid",
Namespace: namespace,
},
Spec: OperatorPolicySpec{
Name: "test-policy",
Pattern: "a-queue-name",
Definition: &runtime.RawExtension{
Raw: []byte(`{"max-length":10}`),
},
ApplyTo: "yo-yo",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &policy)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &policy)).To(MatchError(`OperatorPolicy.rabbitmq.com "invalid" is invalid: spec.applyTo: Unsupported value: "yo-yo": supported values: "queues", "classic_queues", "quorum_queues", "streams"`))
})
})

})
57 changes: 57 additions & 0 deletions api/v1beta1/operatorpolicy_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package v1beta1

import (
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

func (p *OperatorPolicy) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(p).
Complete()
}

// +kubebuilder:webhook:verbs=create;update,path=/validate-rabbitmq-com-v1beta1-operatorpolicy,mutating=false,failurePolicy=fail,groups=rabbitmq.com,resources=operatorpolicies,versions=v1beta1,name=voperatorpolicy.kb.io,sideEffects=none,admissionReviewVersions=v1

var _ webhook.Validator = &OperatorPolicy{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
// either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both
func (p *OperatorPolicy) ValidateCreate() (admission.Warnings, error) {
return p.Spec.RabbitmqClusterReference.ValidateOnCreate(p.GroupResource(), p.Name)
}

// ValidateUpdate returns error type 'forbidden' for updates on operator policy name, vhost and rabbitmqClusterReference
func (p *OperatorPolicy) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
oldOperatorPolicy, ok := old.(*OperatorPolicy)
if !ok {
return nil, apierrors.NewBadRequest(fmt.Sprintf("expected an operator policy but got a %T", old))
}

detailMsg := "updates on name, vhost and rabbitmqClusterReference are all forbidden"
if p.Spec.Name != oldOperatorPolicy.Spec.Name {
return nil, apierrors.NewForbidden(p.GroupResource(), p.Name,
field.Forbidden(field.NewPath("spec", "name"), detailMsg))
}

if p.Spec.Vhost != oldOperatorPolicy.Spec.Vhost {
return nil, apierrors.NewForbidden(p.GroupResource(), p.Name,
field.Forbidden(field.NewPath("spec", "vhost"), detailMsg))
}

if !oldOperatorPolicy.Spec.RabbitmqClusterReference.Matches(&p.Spec.RabbitmqClusterReference) {
return nil, apierrors.NewForbidden(p.GroupResource(), p.Name,
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), detailMsg))
}
return nil, nil
}

func (p *OperatorPolicy) ValidateDelete() (admission.Warnings, error) {
return nil, nil
}
Loading

0 comments on commit b8b67df

Please sign in to comment.