Skip to content

Commit

Permalink
Extend Bucket Runtime Interface for Cross-Cluster Events
Browse files Browse the repository at this point in the history
  • Loading branch information
sujeet01 committed Jul 22, 2024
1 parent f0a0d85 commit 7065fe1
Show file tree
Hide file tree
Showing 31 changed files with 2,723 additions and 127 deletions.
2 changes: 1 addition & 1 deletion broker/bucketbroker/server/bucket_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Server) createIronCoreBucket(ctx context.Context, log logr.Logger, buck

log.V(1).Info("Patching ironcore bucket as created")
if err := apiutils.PatchCreated(ctx, s.client, bucket.Bucket); err != nil {
return fmt.Errorf("error patching ironcore machine as created: %w", err)
return fmt.Errorf("error patching ironcore bucket as created: %w", err)
}

// Reset cleaner since everything from now on operates on a consistent bucket
Expand Down
65 changes: 65 additions & 0 deletions broker/bucketbroker/server/bucket_create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
bucketbrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/bucketbroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/machinebroker/apiutils"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("CreateBucket", func() {
ns, _, srv := SetupTest()
bucketClass := SetupBucketClass("250Mi", "1500")

It("should correctly create a bucket", func(ctx SpecContext) {
By("creating a bucket")
res, err := srv.CreateBucket(ctx, &iri.CreateBucketRequest{
Bucket: &iri.Bucket{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
},
},
Spec: &iri.BucketSpec{
Class: bucketClass.Name,
},
},
})

Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeNil())

By("getting the ironcore bucket")
ironcoreBucket := &storagev1alpha1.Bucket{}
ironcoreBucketKey := client.ObjectKey{Namespace: ns.Name, Name: res.Bucket.Metadata.Id}
Expect(k8sClient.Get(ctx, ironcoreBucketKey, ironcoreBucket)).To(Succeed())

By("inspecting the ironcore bucket")
Expect(ironcoreBucket.Labels).To(Equal(map[string]string{
bucketbrokerv1alpha1.CreatedLabel: "true",
bucketbrokerv1alpha1.ManagerLabel: bucketbrokerv1alpha1.BucketBrokerManager,
}))
encodedIRIAnnotations, err := apiutils.EncodeAnnotationsAnnotation(nil)
Expect(err).NotTo(HaveOccurred())
encodedIRILabels, err := apiutils.EncodeLabelsAnnotation(map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
})
Expect(err).NotTo(HaveOccurred())
Expect(ironcoreBucket.Annotations).To(Equal(map[string]string{
bucketbrokerv1alpha1.AnnotationsAnnotation: encodedIRIAnnotations,
bucketbrokerv1alpha1.LabelsAnnotation: encodedIRILabels,
}))
Expect(ironcoreBucket.Spec.BucketClassRef.Name).To(Equal(bucketClass.Name))

})
})
53 changes: 53 additions & 0 deletions broker/bucketbroker/server/bucket_delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("DeleteBucket", func() {
ns, _, srv := SetupTest()
bucketClass := SetupBucketClass("250Mi", "1500")

It("should correctly delete a bucket", func(ctx SpecContext) {
By("creating a bucket")
createRes, err := srv.CreateBucket(ctx, &iri.CreateBucketRequest{
Bucket: &iri.Bucket{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
},
},
Spec: &iri.BucketSpec{
Class: bucketClass.Name,
},
},
})

Expect(err).NotTo(HaveOccurred())
Expect(createRes).NotTo(BeNil())

By("deleting the bucket")
deleteRes, err := srv.DeleteBucket(ctx, &iri.DeleteBucketRequest{
BucketId: createRes.Bucket.Metadata.Id,
})
Expect(err).NotTo(HaveOccurred())
Expect(deleteRes).NotTo(BeNil())

By("verifying the bucket is deleted")
ironcoreBucket := &storagev1alpha1.Bucket{}
ironcoreBucketKey := client.ObjectKey{Namespace: ns.Name, Name: createRes.Bucket.Metadata.Id}
err = k8sClient.Get(ctx, ironcoreBucketKey, ironcoreBucket)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
})
16 changes: 16 additions & 0 deletions broker/bucketbroker/server/bucket_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
bucketbrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/bucketbroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/apiutils"
"github.com/ironcore-dev/ironcore/broker/common"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -112,6 +113,21 @@ func (s *Server) aggregateIronCoreBucket(
}, nil
}

func (s *Server) getIronCoreBucket(ctx context.Context, id string) (*storagev1alpha1.Bucket, error) {
ironcoreBucket := &storagev1alpha1.Bucket{}
ironcoreBucketKey := client.ObjectKey{Namespace: s.namespace, Name: id}
if err := s.client.Get(ctx, ironcoreBucketKey, ironcoreBucket); err != nil {
if !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("error getting ironcore bucket %s: %w", id, err)
}
return nil, status.Errorf(codes.NotFound, "bucket %s not found", id)
}
if !apiutils.IsManagedBy(ironcoreBucket, bucketbrokerv1alpha1.BucketBrokerManager) || !apiutils.IsCreated(ironcoreBucket) {
return nil, status.Errorf(codes.NotFound, "bucket %s not found", id)
}
return ironcoreBucket, nil
}

func (s *Server) getAggregateIronCoreBucket(ctx context.Context, id string) (*AggregateIronCoreBucket, error) {
ironcoreBucket := &storagev1alpha1.Bucket{}
if err := s.getManagedAndCreated(ctx, id, ironcoreBucket); err != nil {
Expand Down
44 changes: 44 additions & 0 deletions broker/bucketbroker/server/bucket_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("ListBuckets", func() {
_, _, srv := SetupTest()
bucketClass := SetupBucketClass("250Mi", "1500")

It("should correctly list buckets", func(ctx SpecContext) {
By("creating multiple buckets")
const noOfBuckets = 3

buckets := make([]any, noOfBuckets)
for i := 0; i < noOfBuckets; i++ {
res, err := srv.CreateBucket(ctx, &iri.CreateBucketRequest{
Bucket: &iri.Bucket{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
bucketpoolletv1alpha1.BucketUIDLabel: "foobar",
},
},
Spec: &iri.BucketSpec{
Class: bucketClass.Name,
},
},
})
Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeNil())
buckets[i] = res.Bucket
}

By("listing the buckets")
Expect(srv.ListBuckets(ctx, &iri.ListBucketsRequest{})).To(HaveField("Buckets", ConsistOf(buckets...)))
})
})
36 changes: 36 additions & 0 deletions broker/bucketbroker/server/bucketclass_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
)

var _ = Describe("ListBucketClasses", func() {

_, bucketPool, srv := SetupTest()
bucketClass1 := SetupBucketClass("100Mi", "100")
bucketClass2 := SetupBucketClass("200Mi", "200")

It("should correctly list available bucket classes", func(ctx SpecContext) {
By("patching bucket classes in the bucket pool status")
bucketPool.Status.AvailableBucketClasses = []corev1.LocalObjectReference{
{Name: bucketClass1.Name},
{Name: bucketClass2.Name},
}
Expect(k8sClient.Status().Update(ctx, bucketPool)).To(Succeed())

res, err := srv.ListBucketClasses(ctx, &iri.ListBucketClassesRequest{})
Expect(err).NotTo(HaveOccurred())
Expect(res.BucketClasses).To(HaveLen(2))
Expect(res.BucketClasses).To(ContainElements(
&iri.BucketClass{Name: bucketClass1.Name, Capabilities: &iri.BucketClassCapabilities{Tps: 104857600, Iops: 100}},
&iri.BucketClass{Name: bucketClass2.Name, Capabilities: &iri.BucketClassCapabilities{Tps: 209715200, Iops: 200}},
))
})

})
102 changes: 102 additions & 0 deletions broker/bucketbroker/server/event_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/apiutils"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1"
)

const (
InvolvedObjectKind = "Bucket"
InvolvedObjectKindSelector = "involvedObject.kind"
InvolvedObjectAPIVersionSelector = "involvedObject.apiVersion"
)

func (s *Server) listEvents(ctx context.Context) ([]*irievent.Event, error) {
log := ctrl.LoggerFrom(ctx)
bucketEventList := &v1.EventList{}
selectorField := fields.Set{
InvolvedObjectKindSelector: InvolvedObjectKind,
InvolvedObjectAPIVersionSelector: storagev1alpha1.SchemeGroupVersion.String(),
}
if err := s.client.List(ctx, bucketEventList,
client.InNamespace(s.namespace), client.MatchingFieldsSelector{Selector: selectorField.AsSelector()},
); err != nil {
return nil, err
}

var iriEvents []*irievent.Event
for _, bucketEvent := range bucketEventList.Items {
ironcoreBucket, err := s.getIronCoreBucket(ctx, bucketEvent.InvolvedObject.Name)
if err != nil {
log.V(1).Info("Unable to get ironcore bucket", "BucketName", bucketEvent.InvolvedObject.Name)
continue
}
bucketObjectMetadata, err := apiutils.GetObjectMetadata(&ironcoreBucket.ObjectMeta)
if err != nil {
continue
}
iriEvent := &irievent.Event{
Spec: &irievent.EventSpec{
InvolvedObjectMeta: bucketObjectMetadata,
Reason: bucketEvent.Reason,
Message: bucketEvent.Message,
Type: bucketEvent.Type,
EventTime: bucketEvent.LastTimestamp.Unix(),
},
}
iriEvents = append(iriEvents, iriEvent)
}
return iriEvents, nil
}

func (s *Server) filterEvents(events []*irievent.Event, filter *iri.EventFilter) []*irievent.Event {
if filter == nil {
return events
}

var (
res []*irievent.Event
sel = labels.SelectorFromSet(filter.LabelSelector)
)
for _, iriEvent := range events {
if !sel.Matches(labels.Set(iriEvent.Spec.InvolvedObjectMeta.Labels)) {
continue
}

if filter.EventsFromTime > 0 && filter.EventsToTime > 0 {
if iriEvent.Spec.EventTime < filter.EventsFromTime || iriEvent.Spec.EventTime > filter.EventsToTime {
continue
}
}

res = append(res, iriEvent)
}
return res
}

func (s *Server) ListEvents(ctx context.Context, req *iri.ListEventsRequest) (*iri.ListEventsResponse, error) {
iriEvents, err := s.listEvents(ctx)
if err != nil {
return nil, fmt.Errorf("error listing bucket events : %w", err)
}

iriEvents = s.filterEvents(iriEvents, req.Filter)

return &iri.ListEventsResponse{
Events: iriEvents,
}, nil
}
Loading

0 comments on commit 7065fe1

Please sign in to comment.