-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extend Volume Runtime Interface for cross-cluster Events (#1092)
- Loading branch information
1 parent
8402bfa
commit c2f923e
Showing
22 changed files
with
2,271 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and IronCore contributors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package server | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
v1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/labels" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" | ||
"github.com/ironcore-dev/ironcore/broker/volumebroker/apiutils" | ||
irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1" | ||
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" | ||
) | ||
|
||
const ( | ||
InvolvedObjectKind = "Volume" | ||
InvolvedObjectKindSelector = "involvedObject.kind" | ||
InvolvedObjectAPIVersionSelector = "involvedObject.apiVersion" | ||
) | ||
|
||
func (s *Server) listEvents(ctx context.Context) ([]*irievent.Event, error) { | ||
log := ctrl.LoggerFrom(ctx) | ||
volumeEventList := &v1.EventList{} | ||
selectorField := fields.Set{ | ||
InvolvedObjectKindSelector: InvolvedObjectKind, | ||
InvolvedObjectAPIVersionSelector: storagev1alpha1.SchemeGroupVersion.String(), | ||
} | ||
if err := s.client.List(ctx, volumeEventList, | ||
client.InNamespace(s.namespace), client.MatchingFieldsSelector{Selector: selectorField.AsSelector()}, | ||
); err != nil { | ||
return nil, err | ||
} | ||
|
||
var iriEvents []*irievent.Event | ||
for _, volumeEvent := range volumeEventList.Items { | ||
ironcoreVolume, err := s.getIronCoreVolume(ctx, volumeEvent.InvolvedObject.Name) | ||
if err != nil { | ||
log.V(1).Info("Unable to get ironcore volume", "VolumeName", volumeEvent.InvolvedObject.Name) | ||
continue | ||
} | ||
volumeObjectMetadata, err := apiutils.GetObjectMetadata(&ironcoreVolume.ObjectMeta) | ||
if err != nil { | ||
continue | ||
} | ||
iriEvent := &irievent.Event{ | ||
Spec: &irievent.EventSpec{ | ||
InvolvedObjectMeta: volumeObjectMetadata, | ||
Reason: volumeEvent.Reason, | ||
Message: volumeEvent.Message, | ||
Type: volumeEvent.Type, | ||
EventTime: volumeEvent.LastTimestamp.Unix(), | ||
}, | ||
} | ||
iriEvents = append(iriEvents, iriEvent) | ||
} | ||
return iriEvents, nil | ||
} | ||
|
||
func (s *Server) filterEvents(events []*irievent.Event, filter *iri.EventFilter) []*irievent.Event { | ||
if filter == nil { | ||
return events | ||
} | ||
|
||
var ( | ||
res []*irievent.Event | ||
sel = labels.SelectorFromSet(filter.LabelSelector) | ||
) | ||
for _, iriEvent := range events { | ||
if !sel.Matches(labels.Set(iriEvent.Spec.InvolvedObjectMeta.Labels)) { | ||
continue | ||
} | ||
|
||
if filter.EventsFromTime > 0 && filter.EventsToTime > 0 { | ||
if iriEvent.Spec.EventTime < filter.EventsFromTime || iriEvent.Spec.EventTime > filter.EventsToTime { | ||
continue | ||
} | ||
} | ||
|
||
res = append(res, iriEvent) | ||
} | ||
return res | ||
} | ||
|
||
func (s *Server) ListEvents(ctx context.Context, req *iri.ListEventsRequest) (*iri.ListEventsResponse, error) { | ||
iriEvents, err := s.listEvents(ctx) | ||
if err != nil { | ||
return nil, fmt.Errorf("error listing volume events : %w", err) | ||
} | ||
|
||
iriEvents = s.filterEvents(iriEvents, req.Filter) | ||
|
||
return &iri.ListEventsResponse{ | ||
Events: iriEvents, | ||
}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and IronCore contributors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package server_test | ||
|
||
import ( | ||
"time" | ||
|
||
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" | ||
irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1" | ||
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1" | ||
|
||
irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1" | ||
volumepoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/volumepoollet/api/v1alpha1" | ||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
var _ = Describe("ListEvents", func() { | ||
ns, srv := SetupTest() | ||
volumeClass := SetupVolumeClass() | ||
|
||
It("should correctly list events", func(ctx SpecContext) { | ||
By("creating volume") | ||
Expect(storagev1alpha1.AddToScheme(scheme.Scheme)).To(Succeed()) | ||
|
||
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ | ||
Scheme: scheme.Scheme, | ||
}) | ||
Expect(err).ToNot(HaveOccurred()) | ||
|
||
res, err := srv.CreateVolume(ctx, &iri.CreateVolumeRequest{ | ||
Volume: &iri.Volume{ | ||
Metadata: &irimeta.ObjectMetadata{ | ||
Labels: map[string]string{ | ||
volumepoolletv1alpha1.VolumeUIDLabel: "foobar", | ||
}, | ||
}, | ||
Spec: &iri.VolumeSpec{ | ||
Class: volumeClass.Name, | ||
Resources: &iri.VolumeResources{ | ||
StorageBytes: 100, | ||
}, | ||
}, | ||
}, | ||
}) | ||
|
||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(res).NotTo(BeNil()) | ||
|
||
By("getting the ironcore volume") | ||
ironcoreVolume := &storagev1alpha1.Volume{} | ||
ironcoreVolumeKey := client.ObjectKey{Namespace: ns.Name, Name: res.Volume.Metadata.Id} | ||
Expect(k8sClient.Get(ctx, ironcoreVolumeKey, ironcoreVolume)).To(Succeed()) | ||
|
||
By("generating the volume events") | ||
eventGeneratedTime := time.Now() | ||
eventRecorder := k8sManager.GetEventRecorderFor("test-recorder") | ||
eventRecorder.Event(ironcoreVolume, corev1.EventTypeNormal, "testing", "this is test event") | ||
|
||
By("listing the volume events with no filters") | ||
Eventually(func(g Gomega) []*irievent.Event { | ||
resp, err := srv.ListEvents(ctx, &iri.ListEventsRequest{}) | ||
g.Expect(err).NotTo(HaveOccurred()) | ||
return resp.Events | ||
}).Should(ConsistOf( | ||
HaveField("Spec", SatisfyAll( | ||
HaveField("InvolvedObjectMeta.Id", Equal(ironcoreVolume.Name)), | ||
HaveField("Reason", Equal("testing")), | ||
HaveField("Message", Equal("this is test event")), | ||
HaveField("Type", Equal(corev1.EventTypeNormal)), | ||
)), | ||
), | ||
) | ||
|
||
By("listing the volume events with matching label and time filters") | ||
resp, err := srv.ListEvents(ctx, &iri.ListEventsRequest{Filter: &iri.EventFilter{ | ||
LabelSelector: map[string]string{volumepoolletv1alpha1.VolumeUIDLabel: "foobar"}, | ||
EventsFromTime: eventGeneratedTime.Unix(), | ||
EventsToTime: time.Now().Unix(), | ||
}}) | ||
|
||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
Expect(resp.Events).To(ConsistOf( | ||
HaveField("Spec", SatisfyAll( | ||
HaveField("InvolvedObjectMeta.Id", Equal(ironcoreVolume.Name)), | ||
HaveField("Reason", Equal("testing")), | ||
HaveField("Message", Equal("this is test event")), | ||
HaveField("Type", Equal(corev1.EventTypeNormal)), | ||
)), | ||
), | ||
) | ||
|
||
By("listing the volume events with non matching label filter") | ||
resp, err = srv.ListEvents(ctx, &iri.ListEventsRequest{Filter: &iri.EventFilter{ | ||
LabelSelector: map[string]string{"foo": "bar"}, | ||
EventsFromTime: eventGeneratedTime.Unix(), | ||
EventsToTime: time.Now().Unix(), | ||
}}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
Expect(resp.Events).To(BeEmpty()) | ||
|
||
By("listing the volume events with matching label filter and non matching time filter") | ||
resp, err = srv.ListEvents(ctx, &iri.ListEventsRequest{Filter: &iri.EventFilter{ | ||
LabelSelector: map[string]string{volumepoolletv1alpha1.VolumeUIDLabel: "foobar"}, | ||
EventsFromTime: eventGeneratedTime.Add(-10 * time.Minute).Unix(), | ||
EventsToTime: eventGeneratedTime.Add(-5 * time.Minute).Unix(), | ||
}}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
Expect(resp.Events).To(BeEmpty()) | ||
}) | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package server_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ironcore-dev/controller-utils/buildutils" | ||
"github.com/ironcore-dev/controller-utils/modutils" | ||
corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1" | ||
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1" | ||
"github.com/ironcore-dev/ironcore/broker/common/idgen" | ||
"github.com/ironcore-dev/ironcore/broker/volumebroker/server" | ||
utilsenvtest "github.com/ironcore-dev/ironcore/utils/envtest" | ||
"github.com/ironcore-dev/ironcore/utils/envtest/apiserver" | ||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
"k8s.io/client-go/rest" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/envtest" | ||
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega" | ||
logf "sigs.k8s.io/controller-runtime/pkg/log" | ||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
) | ||
|
||
var ( | ||
cfg *rest.Config | ||
testEnv *envtest.Environment | ||
testEnvExt *utilsenvtest.EnvironmentExtensions | ||
k8sClient client.Client | ||
) | ||
|
||
const ( | ||
eventuallyTimeout = 3 * time.Second | ||
pollingInterval = 50 * time.Millisecond | ||
consistentlyDuration = 1 * time.Second | ||
apiServiceTimeout = 5 * time.Minute | ||
) | ||
|
||
func TestServer(t *testing.T) { | ||
SetDefaultConsistentlyPollingInterval(pollingInterval) | ||
SetDefaultEventuallyPollingInterval(pollingInterval) | ||
SetDefaultEventuallyTimeout(eventuallyTimeout) | ||
SetDefaultConsistentlyDuration(consistentlyDuration) | ||
|
||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "Server Suite") | ||
} | ||
|
||
var _ = BeforeSuite(func() { | ||
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) | ||
|
||
var err error | ||
By("bootstrapping test environment") | ||
testEnv = &envtest.Environment{} | ||
testEnvExt = &utilsenvtest.EnvironmentExtensions{ | ||
APIServiceDirectoryPaths: []string{ | ||
modutils.Dir("github.com/ironcore-dev/ironcore", "config", "apiserver", "apiservice", "bases"), | ||
}, | ||
ErrorIfAPIServicePathIsMissing: true, | ||
} | ||
|
||
cfg, err = utilsenvtest.StartWithExtensions(testEnv, testEnvExt) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(cfg).NotTo(BeNil()) | ||
|
||
DeferCleanup(utilsenvtest.StopWithExtensions, testEnv, testEnvExt) | ||
|
||
Expect(storagev1alpha1.AddToScheme(scheme.Scheme)).To(Succeed()) | ||
|
||
// Init package-level k8sClient | ||
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(k8sClient).NotTo(BeNil()) | ||
SetClient(k8sClient) | ||
|
||
apiSrv, err := apiserver.New(cfg, apiserver.Options{ | ||
MainPath: "github.com/ironcore-dev/ironcore/cmd/ironcore-apiserver", | ||
BuildOptions: []buildutils.BuildOption{buildutils.ModModeMod}, | ||
ETCDServers: []string{testEnv.ControlPlane.Etcd.URL.String()}, | ||
Host: testEnvExt.APIServiceInstallOptions.LocalServingHost, | ||
Port: testEnvExt.APIServiceInstallOptions.LocalServingPort, | ||
CertDir: testEnvExt.APIServiceInstallOptions.LocalServingCertDir, | ||
}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
Expect(apiSrv.Start()).To(Succeed()) | ||
DeferCleanup(apiSrv.Stop) | ||
|
||
Expect(utilsenvtest.WaitUntilAPIServicesReadyWithTimeout(apiServiceTimeout, testEnvExt, k8sClient, scheme.Scheme)).To(Succeed()) | ||
}) | ||
|
||
func SetupTest() (*corev1.Namespace, *server.Server) { | ||
var ( | ||
ns = &corev1.Namespace{} | ||
srv = &server.Server{} | ||
volumePool = &storagev1alpha1.VolumePool{} | ||
) | ||
|
||
BeforeEach(func(ctx SpecContext) { | ||
*ns = corev1.Namespace{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
GenerateName: "test-ns-", | ||
}, | ||
} | ||
Expect(k8sClient.Create(ctx, ns)).To(Succeed(), "failed to create test namespace") | ||
DeferCleanup(k8sClient.Delete, ns) | ||
|
||
By("creating a volume pool") | ||
*volumePool = storagev1alpha1.VolumePool{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
GenerateName: "volumepool-", | ||
Labels: map[string]string{ | ||
"pool": "test-pool", | ||
}, | ||
}, | ||
Spec: storagev1alpha1.VolumePoolSpec{ | ||
ProviderID: "network-id", | ||
}, | ||
} | ||
Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create test volume pool") | ||
DeferCleanup(k8sClient.Delete, volumePool) | ||
|
||
newSrv, err := server.New(cfg, server.Options{ | ||
Namespace: ns.Name, | ||
VolumePoolName: volumePool.Name, | ||
VolumePoolSelector: map[string]string{ | ||
"pool": "test-pool", | ||
}, | ||
IDGen: idgen.Default, | ||
}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
*srv = *newSrv | ||
}) | ||
|
||
return ns, srv | ||
} | ||
|
||
func SetupVolumeClass() *storagev1alpha1.VolumeClass { | ||
volumeClass := &storagev1alpha1.VolumeClass{} | ||
|
||
BeforeEach(func(ctx SpecContext) { | ||
*volumeClass = storagev1alpha1.VolumeClass{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
GenerateName: "volume-class-", | ||
}, | ||
Capabilities: corev1alpha1.ResourceList{ | ||
corev1alpha1.ResourceIOPS: resource.MustParse("250Mi"), | ||
corev1alpha1.ResourceTPS: resource.MustParse("1500"), | ||
}, | ||
} | ||
Expect(k8sClient.Create(ctx, volumeClass)).To(Succeed()) | ||
DeferCleanup(func(ctx context.Context) error { | ||
return client.IgnoreNotFound(k8sClient.Delete(ctx, volumeClass)) | ||
}) | ||
}) | ||
|
||
return volumeClass | ||
} |
Oops, something went wrong.