Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Bucket Runtime Interface for Cross-Cluster Events #1091

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broker/bucketbroker/server/bucket_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Server) createIronCoreBucket(ctx context.Context, log logr.Logger, buck

log.V(1).Info("Patching ironcore bucket as created")
if err := apiutils.PatchCreated(ctx, s.client, bucket.Bucket); err != nil {
return fmt.Errorf("error patching ironcore machine as created: %w", err)
return fmt.Errorf("error patching ironcore bucket as created: %w", err)
}

// Reset cleaner since everything from now on operates on a consistent bucket
Expand Down
65 changes: 65 additions & 0 deletions broker/bucketbroker/server/bucket_create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
bucketbrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/bucketbroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/machinebroker/apiutils"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("CreateBucket", func() {
ns, _, srv := SetupTest()
bucketClass := SetupBucketClass("250Mi", "1500")

It("should correctly create a bucket", func(ctx SpecContext) {
By("creating a bucket")
res, err := srv.CreateBucket(ctx, &iri.CreateBucketRequest{
Bucket: &iri.Bucket{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
},
},
Spec: &iri.BucketSpec{
Class: bucketClass.Name,
},
},
})

Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeNil())

By("getting the ironcore bucket")
ironcoreBucket := &storagev1alpha1.Bucket{}
ironcoreBucketKey := client.ObjectKey{Namespace: ns.Name, Name: res.Bucket.Metadata.Id}
Expect(k8sClient.Get(ctx, ironcoreBucketKey, ironcoreBucket)).To(Succeed())

By("inspecting the ironcore bucket")
Expect(ironcoreBucket.Labels).To(Equal(map[string]string{
bucketbrokerv1alpha1.CreatedLabel: "true",
bucketbrokerv1alpha1.ManagerLabel: bucketbrokerv1alpha1.BucketBrokerManager,
}))
encodedIRIAnnotations, err := apiutils.EncodeAnnotationsAnnotation(nil)
Expect(err).NotTo(HaveOccurred())
encodedIRILabels, err := apiutils.EncodeLabelsAnnotation(map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
})
Expect(err).NotTo(HaveOccurred())
Expect(ironcoreBucket.Annotations).To(Equal(map[string]string{
bucketbrokerv1alpha1.AnnotationsAnnotation: encodedIRIAnnotations,
bucketbrokerv1alpha1.LabelsAnnotation: encodedIRILabels,
}))
Expect(ironcoreBucket.Spec.BucketClassRef.Name).To(Equal(bucketClass.Name))

})
})
53 changes: 53 additions & 0 deletions broker/bucketbroker/server/bucket_delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("DeleteBucket", func() {
ns, _, srv := SetupTest()
bucketClass := SetupBucketClass("250Mi", "1500")

It("should correctly delete a bucket", func(ctx SpecContext) {
By("creating a bucket")
createRes, err := srv.CreateBucket(ctx, &iri.CreateBucketRequest{
Bucket: &iri.Bucket{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
},
},
Spec: &iri.BucketSpec{
Class: bucketClass.Name,
},
},
})

Expect(err).NotTo(HaveOccurred())
Expect(createRes).NotTo(BeNil())

By("deleting the bucket")
deleteRes, err := srv.DeleteBucket(ctx, &iri.DeleteBucketRequest{
BucketId: createRes.Bucket.Metadata.Id,
})
Expect(err).NotTo(HaveOccurred())
Expect(deleteRes).NotTo(BeNil())

By("verifying the bucket is deleted")
ironcoreBucket := &storagev1alpha1.Bucket{}
ironcoreBucketKey := client.ObjectKey{Namespace: ns.Name, Name: createRes.Bucket.Metadata.Id}
err = k8sClient.Get(ctx, ironcoreBucketKey, ironcoreBucket)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
})
16 changes: 16 additions & 0 deletions broker/bucketbroker/server/bucket_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
bucketbrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/bucketbroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/apiutils"
"github.com/ironcore-dev/ironcore/broker/common"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -112,6 +113,21 @@ func (s *Server) aggregateIronCoreBucket(
}, nil
}

func (s *Server) getIronCoreBucket(ctx context.Context, id string) (*storagev1alpha1.Bucket, error) {
ironcoreBucket := &storagev1alpha1.Bucket{}
ironcoreBucketKey := client.ObjectKey{Namespace: s.namespace, Name: id}
if err := s.client.Get(ctx, ironcoreBucketKey, ironcoreBucket); err != nil {
if !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("error getting ironcore bucket %s: %w", id, err)
}
return nil, status.Errorf(codes.NotFound, "bucket %s not found", id)
}
if !apiutils.IsManagedBy(ironcoreBucket, bucketbrokerv1alpha1.BucketBrokerManager) || !apiutils.IsCreated(ironcoreBucket) {
return nil, status.Errorf(codes.NotFound, "bucket %s not found", id)
}
return ironcoreBucket, nil
}

func (s *Server) getAggregateIronCoreBucket(ctx context.Context, id string) (*AggregateIronCoreBucket, error) {
ironcoreBucket := &storagev1alpha1.Bucket{}
if err := s.getManagedAndCreated(ctx, id, ironcoreBucket); err != nil {
Expand Down
44 changes: 44 additions & 0 deletions broker/bucketbroker/server/bucket_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("ListBuckets", func() {
_, _, srv := SetupTest()
bucketClass := SetupBucketClass("250Mi", "1500")

It("should correctly list buckets", func(ctx SpecContext) {
By("creating multiple buckets")
const noOfBuckets = 3

buckets := make([]any, noOfBuckets)
for i := 0; i < noOfBuckets; i++ {
res, err := srv.CreateBucket(ctx, &iri.CreateBucketRequest{
Bucket: &iri.Bucket{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
},
},
Spec: &iri.BucketSpec{
Class: bucketClass.Name,
},
},
})
Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeNil())
buckets[i] = res.Bucket
}

By("listing the buckets")
Expect(srv.ListBuckets(ctx, &iri.ListBucketsRequest{})).To(HaveField("Buckets", ConsistOf(buckets...)))
})
})
36 changes: 36 additions & 0 deletions broker/bucketbroker/server/bucketclass_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
)

var _ = Describe("ListBucketClasses", func() {

_, bucketPool, srv := SetupTest()
bucketClass1 := SetupBucketClass("100Mi", "100")
bucketClass2 := SetupBucketClass("200Mi", "200")

It("should correctly list available bucket classes", func(ctx SpecContext) {
By("patching bucket classes in the bucket pool status")
bucketPool.Status.AvailableBucketClasses = []corev1.LocalObjectReference{
{Name: bucketClass1.Name},
{Name: bucketClass2.Name},
}
Expect(k8sClient.Status().Update(ctx, bucketPool)).To(Succeed())

res, err := srv.ListBucketClasses(ctx, &iri.ListBucketClassesRequest{})
Expect(err).NotTo(HaveOccurred())
Expect(res.BucketClasses).To(HaveLen(2))
Expect(res.BucketClasses).To(ContainElements(
&iri.BucketClass{Name: bucketClass1.Name, Capabilities: &iri.BucketClassCapabilities{Tps: 104857600, Iops: 100}},
&iri.BucketClass{Name: bucketClass2.Name, Capabilities: &iri.BucketClassCapabilities{Tps: 209715200, Iops: 200}},
))
})

})
102 changes: 102 additions & 0 deletions broker/bucketbroker/server/event_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/apiutils"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1"
)

const (
InvolvedObjectKind = "Bucket"
InvolvedObjectKindSelector = "involvedObject.kind"
InvolvedObjectAPIVersionSelector = "involvedObject.apiVersion"
)

func (s *Server) listEvents(ctx context.Context) ([]*irievent.Event, error) {
log := ctrl.LoggerFrom(ctx)
bucketEventList := &v1.EventList{}
selectorField := fields.Set{
InvolvedObjectKindSelector: InvolvedObjectKind,
InvolvedObjectAPIVersionSelector: storagev1alpha1.SchemeGroupVersion.String(),
}
if err := s.client.List(ctx, bucketEventList,
client.InNamespace(s.namespace), client.MatchingFieldsSelector{Selector: selectorField.AsSelector()},
); err != nil {
return nil, err
}

var iriEvents []*irievent.Event
for _, bucketEvent := range bucketEventList.Items {
ironcoreBucket, err := s.getIronCoreBucket(ctx, bucketEvent.InvolvedObject.Name)
if err != nil {
log.V(1).Info("Unable to get ironcore bucket", "BucketName", bucketEvent.InvolvedObject.Name)
continue
}
bucketObjectMetadata, err := apiutils.GetObjectMetadata(&ironcoreBucket.ObjectMeta)
if err != nil {
continue
}
iriEvent := &irievent.Event{
Spec: &irievent.EventSpec{
InvolvedObjectMeta: bucketObjectMetadata,
Reason: bucketEvent.Reason,
Message: bucketEvent.Message,
Type: bucketEvent.Type,
EventTime: bucketEvent.LastTimestamp.Unix(),
},
}
iriEvents = append(iriEvents, iriEvent)
}
return iriEvents, nil
}

func (s *Server) filterEvents(events []*irievent.Event, filter *iri.EventFilter) []*irievent.Event {
if filter == nil {
return events
}

var (
res []*irievent.Event
sel = labels.SelectorFromSet(filter.LabelSelector)
)
for _, iriEvent := range events {
if !sel.Matches(labels.Set(iriEvent.Spec.InvolvedObjectMeta.Labels)) {
continue
}

if filter.EventsFromTime > 0 && filter.EventsToTime > 0 {
if iriEvent.Spec.EventTime < filter.EventsFromTime || iriEvent.Spec.EventTime > filter.EventsToTime {
continue
}
}

res = append(res, iriEvent)
}
return res
}

func (s *Server) ListEvents(ctx context.Context, req *iri.ListEventsRequest) (*iri.ListEventsResponse, error) {
iriEvents, err := s.listEvents(ctx)
if err != nil {
return nil, fmt.Errorf("error listing bucket events : %w", err)
}

iriEvents = s.filterEvents(iriEvents, req.Filter)

return &iri.ListEventsResponse{
Events: iriEvents,
}, nil
}
Loading