diff --git a/.golangci.yaml b/.golangci.yaml index cf3087a..edbce10 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -14,8 +14,6 @@ linters-settings: disabled-checks: - unnamedResult - hugeParam - govet: - check-shadowing: true nolintlint: require-explanation: true require-specific: true diff --git a/.husky/hooks/pre-push b/.husky/hooks/pre-push index ac93d21..e0d0e98 100755 --- a/.husky/hooks/pre-push +++ b/.husky/hooks/pre-push @@ -4,7 +4,7 @@ if [[ "$SKIP_GIT_PUSH_HOOK" ]]; then exit 0; fi set -e -if [[ -n "$(git status --short)" ]]; then +if git status --short | grep -qv "??"; then git stash function unstash() { git reset --hard diff --git a/api/v1/diskconfig_types.go b/api/v1/diskconfig_types.go index 1aec062..d4689a8 100644 --- a/api/v1/diskconfig_types.go +++ b/api/v1/diskconfig_types.go @@ -18,6 +18,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -35,11 +36,11 @@ type DiskConfigSpec struct { // Capacity represents the desired capacity of the underlying volume. //+kubebuilder:default:="1Gi" - //+kubebuilder:validation:Pattern:="^(\\d+)(m|Mi|g|Gi|t|Ti|p|Pi)$" //+kubebuilder:validation:Optional - Capacity string `json:"capacity,omitempty" yaml:"capacity,omitempty"` + Capacity resource.Quantity `json:"capacity,omitempty" yaml:"capacity,omitempty"` // MountPointPattern is the mount point of the disk. %d is optional and represents disk number in order. Will be automatically appended for second drive if missing. + // Reserved characters: ><|:&.+*!?^$()[]{}, only 1 %d allowed. //+kubebuilder:default:="/media/discoblocks/-%d" //+kubebuilder:validation:Pattern:="^/(.*)" //+kubebuilder:validation:Optional @@ -52,7 +53,7 @@ type DiskConfigSpec struct { AccessModes []corev1.PersistentVolumeAccessMode `json:"accessModes,omitempty" yaml:"accessModes,omitempty"` // AvailabilityMode defines the desired number of instances. - //+kubebuilder:default:="Multiple" + //+kubebuilder:default:="ReadWriteOnce" //+kubebuilder:validation:Optional AvailabilityMode AvailabilityMode `json:"availabilityMode,omitempty" yaml:"availabilityMode,omitempty"` @@ -78,18 +79,27 @@ type Policy struct { UpscaleTriggerPercentage uint8 `json:"upscaleTriggerPercentage,omitempty" yaml:"upscaleTriggerPercentage,omitempty"` // MaximumCapacityOfDisks defines maximum capacity of a disk. - //+kubebuilder:validation:Pattern:="^(\\d+)(m|Mi|g|Gi|t|Ti|p|Pi)$" - //+kubebuilder:default:="100Gi" + //+kubebuilder:default:="1000Gi" //+kubebuilder:validation:Optional - MaximumCapacityOfDisk string `json:"maximumCapacityOfDisk,omitempty" yaml:"maximumCapacityOfDisk,omitempty"` + MaximumCapacityOfDisk resource.Quantity `json:"maximumCapacityOfDisk,omitempty" yaml:"maximumCapacityOfDisk,omitempty"` // MaximumCapacityOfDisks defines maximum number of a disks. - //+kubebuilder:default:=10 + //+kubebuilder:default:=1 //+kubebuilder:validation:Minimum:=1 - //+kubebuilder:validation:Maximum:=1000 + //+kubebuilder:validation:Maximum:=150 //+kubebuilder:validation:Optional MaximumNumberOfDisks uint8 `json:"maximumNumberOfDisks,omitempty" yaml:"maximumNumberOfDisks,omitempty"` + // ExtendCapacity represents the capacity to extend with. + //+kubebuilder:default:="1Gi" + //+kubebuilder:validation:Optional + ExtendCapacity resource.Quantity `json:"extendCapacity,omitempty" yaml:"extendCapacity,omitempty"` + + // CoolDown defines temporary pause of scaling. + //+kubebuilder:default:="5m" + //+kubebuilder:validation:Optional + CoolDown metav1.Duration `json:"coolDown,omitempty" yaml:"coolDown,omitempty"` + // Pause disables autoscaling of disks. //+kubebuilder:default:=false //+kubebuilder:validation:Optional @@ -120,12 +130,12 @@ const ( Deleting Phase = "Deleting" ) -// +kubebuilder:validation:Enum=Singleton;Multiple +// +kubebuilder:validation:Enum=ReadWriteSame;ReadWriteOnce type AvailabilityMode string const ( - Singleton AvailabilityMode = "Singleton" - Multiple AvailabilityMode = "Multiple" + ReadWriteSame AvailabilityMode = "ReadWriteSame" + ReadWriteOnce AvailabilityMode = "ReadWriteOnce" ) //+kubebuilder:object:root=true diff --git a/api/v1/diskconfig_webhook.go b/api/v1/diskconfig_webhook.go index a281580..694e901 100644 --- a/api/v1/diskconfig_webhook.go +++ b/api/v1/diskconfig_webhook.go @@ -20,13 +20,14 @@ import ( "errors" "fmt" "reflect" + "regexp" + "strings" "time" "github.com/ondat/discoblocks/pkg/drivers" "golang.org/x/net/context" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -35,7 +36,9 @@ import ( ) // log is for logging in this package -var diskConfigLog = logf.Log.WithName("DiskConfigWebhook") +var diskConfigLog = logf.Log.WithName("v1.DiskConfigWebhook") + +var reservedCharacters = regexp.MustCompile(`[>|<|||:|&|.|\+|\*|!|\?|\^|\$|\(|\)|\[|\]|\{|\}]`) // SetupWebhookWithManager sets up the webhook with the Manager. func (r *DiskConfig) SetupWebhookWithManager(mgr ctrl.Manager) error { @@ -68,44 +71,30 @@ func (r *DiskConfig) validate(old runtime.Object) error { logger.Info("Validate update...") defer logger.Info("Validated") - // TODO remove once we generate detault if r.Spec.StorageClassName == "" { logger.Info("StorageClass name is invalid") return errors.New("invalid StorageClass name") } - if _, err := resource.ParseQuantity(r.Spec.Policy.MaximumCapacityOfDisk); err != nil { - logger.Info("Max capacity is invalid") - return errors.New("invalid max capacity") - } - - newCapacity, err := resource.ParseQuantity(r.Spec.Capacity) - if err != nil { - logger.Info("Capacity is invalid") - return errors.New("invalid new capacity") - } - - maxCapacity, err := resource.ParseQuantity(r.Spec.Policy.MaximumCapacityOfDisk) - if err != nil { - logger.Info("Max capacity is invalid") - return errors.New("invalid max capacity") - } - - if maxCapacity.CmpInt64(0) != 0 && maxCapacity.Cmp(newCapacity) == -1 { + if r.Spec.Policy.MaximumCapacityOfDisk.CmpInt64(0) != 0 && r.Spec.Policy.MaximumCapacityOfDisk.Cmp(r.Spec.Capacity) == -1 { logger.Info("Capacity is more then max") return errors.New("invalid new capacity, more then max") } + if err := validateMountPattern(r.Spec.MountPointPattern); err != nil { + logger.Info("Invalid mount pattern", "error", err.Error()) + return err + } + if old != nil { oldDC, ok := old.(*DiskConfig) if !ok { - err = errors.New("invalid old object") + err := errors.New("invalid old object") logger.Error(err, "this should not happen") return err } if !reflect.DeepEqual(oldDC.Spec.AccessModes, r.Spec.AccessModes) { - // TODO count PVCs by label, if 0 mode is ok to change logger.Info("AccessModes is immutable") return errors.New("access modes is immutable field") } @@ -116,20 +105,11 @@ func (r *DiskConfig) validate(old runtime.Object) error { } if oldDC.Spec.MountPointPattern != r.Spec.MountPointPattern { - // TODO count PVCs by label, if 0 mode is ok to change logger.Info("Mount pattern of StorageClass is immutable") return errors.New("mount point pattern is immutable field") } - var oldCapacity resource.Quantity - oldCapacity, err = resource.ParseQuantity(oldDC.Spec.Capacity) - if err != nil { - err = errors.New("invalid old capacity") - logger.Error(err, "this should not happen") - return err - } - - if oldCapacity.CmpInt64(0) != 0 && oldCapacity.Cmp(newCapacity) == 1 { + if oldDC.Spec.Capacity.CmpInt64(0) != 0 && oldDC.Spec.Capacity.Cmp(r.Spec.Capacity) == 1 { logger.Info("Shrinking disk is not supported") return errors.New("shrinking disk is not supported") } @@ -146,7 +126,7 @@ func (r *DiskConfig) validate(old runtime.Object) error { logger.Info("Fetch StorageClass...") sc := storagev1.StorageClass{} - if err = diskConfigWebhookDependencies.client.Get(ctx, types.NamespacedName{Name: r.Spec.StorageClassName}, &sc); err != nil { + if err := diskConfigWebhookDependencies.client.Get(ctx, types.NamespacedName{Name: r.Spec.StorageClassName}, &sc); err != nil { if apierrors.IsNotFound(err) { logger.Info("StorageClass not found") } else { @@ -169,7 +149,7 @@ func (r *DiskConfig) validate(old runtime.Object) error { valid, err := driver.IsStorageClassValid(&sc) if err != nil { - logger.Error(err, "Failed to call driver") + logger.Error(err, "Failed to call driver", "method", "IsStorageClassValid") return fmt.Errorf("failed to call driver: %w", err) } else if !valid { logger.Info("Invalid StorageClass", "error", err.Error()) @@ -185,3 +165,15 @@ func (r *DiskConfig) ValidateDelete() error { return nil } + +func validateMountPattern(pattern string) error { + if strings.Count(pattern, "%d") > 1 { + return errors.New("invalid mount pattern, only one %d allowed") + } + + if reservedCharacters.MatchString(pattern) { + return errors.New("invalid mount pattern, contains reserved characters") + } + + return nil +} diff --git a/api/v1/webhook_suite_test.go b/api/v1/webhook_suite_test.go index dc61027..86195b1 100644 --- a/api/v1/webhook_suite_test.go +++ b/api/v1/webhook_suite_test.go @@ -59,7 +59,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - ctx, cancel = context.WithCancel(context.TODO()) + ctx, cancel = context.WithCancel(context.Background()) By("bootstrapping test environment") testEnv = &envtest.Environment{ diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 5bacc94..47bedf4 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -89,6 +89,7 @@ func (in *DiskConfigList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DiskConfigSpec) DeepCopyInto(out *DiskConfigSpec) { *out = *in + out.Capacity = in.Capacity.DeepCopy() if in.AccessModes != nil { in, out := &in.AccessModes, &out.AccessModes *out = make([]corev1.PersistentVolumeAccessMode, len(*in)) @@ -106,7 +107,7 @@ func (in *DiskConfigSpec) DeepCopyInto(out *DiskConfigSpec) { (*out)[key] = val } } - out.Policy = in.Policy + in.Policy.DeepCopyInto(&out.Policy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DiskConfigSpec. @@ -151,6 +152,9 @@ func (in *DiskConfigStatus) DeepCopy() *DiskConfigStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Policy) DeepCopyInto(out *Policy) { *out = *in + out.MaximumCapacityOfDisk = in.MaximumCapacityOfDisk.DeepCopy() + out.ExtendCapacity = in.ExtendCapacity.DeepCopy() + out.CoolDown = in.CoolDown } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Policy. diff --git a/config/crd/bases/discoblocks.ondat.io_diskconfigs.yaml b/config/crd/bases/discoblocks.ondat.io_diskconfigs.yaml index 946b526..bb0f65d 100644 --- a/config/crd/bases/discoblocks.ondat.io_diskconfigs.yaml +++ b/config/crd/bases/discoblocks.ondat.io_diskconfigs.yaml @@ -44,23 +44,27 @@ spec: type: string type: array availabilityMode: - default: Multiple + default: ReadWriteOnce description: AvailabilityMode defines the desired number of instances. enum: - - Singleton - - Multiple + - ReadWriteSame + - ReadWriteOnce type: string capacity: + anyOf: + - type: integer + - type: string default: 1Gi description: Capacity represents the desired capacity of the underlying volume. - pattern: ^(\d+)(m|Mi|g|Gi|t|Ti|p|Pi)$ - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true mountPointPattern: default: /media/discoblocks/-%d - description: MountPointPattern is the mount point of the disk. %d + description: 'MountPointPattern is the mount point of the disk. %d is optional and represents disk number in order. Will be automatically - appended for second drive if missing. + appended for second drive if missing. Reserved characters: ><|:&.+*!?^$()[]{}, + only 1 %d allowed.' pattern: ^/(.*) type: string nodeSelector: @@ -118,17 +122,33 @@ spec: policy: description: Policy contains the disk scale policies. properties: + coolDown: + default: 5m + description: CoolDown defines temporary pause of scaling. + type: string + extendCapacity: + anyOf: + - type: integer + - type: string + default: 1Gi + description: ExtendCapacity represents the capacity to extend + with. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true maximumCapacityOfDisk: - default: 100Gi + anyOf: + - type: integer + - type: string + default: 1000Gi description: MaximumCapacityOfDisks defines maximum capacity of a disk. - pattern: ^(\d+)(m|Mi|g|Gi|t|Ti|p|Pi)$ - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true maximumNumberOfDisks: - default: 10 + default: 1 description: MaximumCapacityOfDisks defines maximum number of a disks. - maximum: 1000 + maximum: 150 minimum: 1 type: integer pause: diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index 17e808c..cfc4df4 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -79,7 +79,7 @@ patches: path: "/spec/template/spec/containers/0/env/0" value: name: SUPPORTED_CSI_DRIVERS - value: "ebs.csi.aws.com" + value: "ebs.csi.aws.com,csi.storageos.com" target: kind: Deployment namespace: system diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 77f266a..09b492d 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -60,7 +60,7 @@ spec: resources: limits: cpu: 500m - memory: 256Mi + memory: 512Mi requests: cpu: 10m memory: 64Mi diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 3676fa4..01df2c6 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -39,11 +39,18 @@ rules: - persistentvolumeclaims/finalizers verbs: - update +- apiGroups: + - "" + resources: + - persistentvolumes + verbs: + - list - apiGroups: - "" resources: - pod verbs: + - delete - get - apiGroups: - "" @@ -67,6 +74,15 @@ rules: verbs: - list - watch +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - list + - watch - apiGroups: - discoblocks.ondat.io resources: @@ -120,3 +136,11 @@ rules: - storageclasses/finalizers verbs: - update +- apiGroups: + - storage.k8s.io + resources: + - volumeattachments + verbs: + - create + - list + - watch diff --git a/config/samples/core_v1_pod.yaml b/config/samples/core_v1_pod.yaml new file mode 100644 index 0000000..2690703 --- /dev/null +++ b/config/samples/core_v1_pod.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + discoblocks: diskconfig-sample + name: diskconfig-sample +spec: + # hostPID: true + containers: + - name: alpine + image: alpine:3.15.4 + command: + - sleep + - infinity + volumeMounts: + - mountPath: /host + name: host + securityContext: + privileged: true + volumes: + - hostPath: + path: / + name: host \ No newline at end of file diff --git a/config/samples/discoblocks.ondat.io_v1_diskconfig-csi.storageos.com.yaml b/config/samples/discoblocks.ondat.io_v1_diskconfig-csi.storageos.com.yaml index 3995cc9..2bb4b19 100644 --- a/config/samples/discoblocks.ondat.io_v1_diskconfig-csi.storageos.com.yaml +++ b/config/samples/discoblocks.ondat.io_v1_diskconfig-csi.storageos.com.yaml @@ -1,19 +1,18 @@ apiVersion: discoblocks.ondat.io/v1 kind: DiskConfig metadata: - name: diskconfig-sample-storageos - namespace: default + name: diskconfig-sample spec: storageClassName: storageos capacity: 1Gi + availabilityMode: ReadWriteSame # %d is optional, but will automatically appended for second drive if missing mountPointPattern: /media/discoblocks/sample-%d - nodeSelector: - matchLabels: - kubernetes.io/os: linux podSelector: discoblocks: diskconfig-sample policy: upscaleTriggerPercentage: 50 maximumCapacityOfDisk: 2Gi maximumNumberOfDisks: 3 + coolDown: 20s + pause: false diff --git a/config/samples/discoblocks.ondat.io_v1_diskconfig-ebs.csi.aws.com.yaml b/config/samples/discoblocks.ondat.io_v1_diskconfig-ebs.csi.aws.com.yaml index c9abf23..015b725 100644 --- a/config/samples/discoblocks.ondat.io_v1_diskconfig-ebs.csi.aws.com.yaml +++ b/config/samples/discoblocks.ondat.io_v1_diskconfig-ebs.csi.aws.com.yaml @@ -1,19 +1,21 @@ apiVersion: discoblocks.ondat.io/v1 kind: DiskConfig metadata: - name: diskconfig-sample-aws-ebs - namespace: default + name: diskconfig-sample spec: storageClassName: ebs-sc capacity: 1Gi + availabilityMode: ReadWriteSame # %d is optional, but will automatically appended for second drive if missing mountPointPattern: /media/discoblocks/sample-%d nodeSelector: matchLabels: kubernetes.io/os: linux podSelector: - discoblocks: diskconfig-sample + discoblocks: diskconfig-sample policy: upscaleTriggerPercentage: 50 maximumCapacityOfDisk: 2Gi maximumNumberOfDisks: 3 + coolDown: 1m + pause: false diff --git a/config/samples/pod.yaml b/config/samples/pod.yaml deleted file mode 100644 index 298ce95..0000000 --- a/config/samples/pod.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - labels: - discoblocks: diskconfig-sample - name: aws-ebs-pod - namespace: default -spec: - containers: - - name: aws-ebs-cont - image: alpine:3.15.4 - command: - - sleep - - infinity diff --git a/controllers/diskconfig_controller.go b/controllers/diskconfig_controller.go index 49f9d6c..25f114a 100644 --- a/controllers/diskconfig_controller.go +++ b/controllers/diskconfig_controller.go @@ -92,7 +92,7 @@ func (r *DiskConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) case config.DeletionTimestamp != nil: logger.Info("DiskConfig delete in progress") - logger.Info("Updating phase to Deleting...") + logger.Info("Update phase to Deleting...") config.Status.Phase = discoblocksondatiov1.Deleting if err = r.Client.Status().Update(ctx, &config); err != nil { @@ -103,7 +103,7 @@ func (r *DiskConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{RequeueAfter: time.Second}, nil } - logger.Info("Updating phase to Running...") + logger.Info("Update phase to Running...") config.Status.Phase = discoblocksondatiov1.Running if err = r.Client.Status().Update(ctx, &config); err != nil { @@ -114,7 +114,7 @@ func (r *DiskConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) var result ctrl.Result result, err = r.reconcileUpdate(ctx, &config, logger.WithValues("mode", "update")) if err == nil { - logger.Info("Updating phase to Ready...") + logger.Info("Update phase to Ready...") config.Status.Phase = discoblocksondatiov1.Ready if err = r.Client.Status().Update(ctx, &config); err != nil { @@ -137,13 +137,12 @@ func (r *DiskConfigReconciler) reconcileDelete(ctx context.Context, configName, scFinalizer := utils.RenderFinalizer(configName, configNamespace) for i := range scList.Items { - if !controllerutil.ContainsFinalizer(&scList.Items[i], scFinalizer) { + if scList.Items[i].DeletionTimestamp != nil || !controllerutil.ContainsFinalizer(&scList.Items[i], scFinalizer) { continue } controllerutil.RemoveFinalizer(&scList.Items[i], scFinalizer) - //nolint:govet // logger is ok to shadowing logger := logger.WithValues("sc_name", scList.Items[i].Name) logger.Info("Remove StorageClass finalizer...", "finalizer", scFinalizer) @@ -153,6 +152,14 @@ func (r *DiskConfigReconciler) reconcileDelete(ctx context.Context, configName, } } + finalizer := utils.RenderFinalizer(configName) + + logger.Info("Update PVCs...") + + sem := utils.CreateSemaphore(concurrency, time.Nanosecond) + errChan := make(chan error) + wg := sync.WaitGroup{} + logger.Info("Fetch PVCs...") label, err := labels.NewRequirement("discoblocks", selection.Equals, []string{configName}) @@ -171,36 +178,6 @@ func (r *DiskConfigReconciler) reconcileDelete(ctx context.Context, configName, return ctrl.Result{}, fmt.Errorf("unable to list PVCs: %w", err) } - finalizer := utils.RenderFinalizer(configName) - - logger.Info("Delete Service...") - - service := corev1.Service{} - if err := r.Get(ctx, types.NamespacedName{Name: configName, Namespace: configNamespace}, &service); err != nil && !apierrors.IsNotFound(err) { - logger.Info("Unable to fetch Service", "error", err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to fetch Service: %w", err) - } else if err == nil { - if controllerutil.ContainsFinalizer(&service, finalizer) { - controllerutil.RemoveFinalizer(&service, finalizer) - - if err = r.Client.Update(ctx, &service); err != nil { - logger.Info("Unable to update Service", "error", err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to update Service: %w", err) - } - } - - if err = r.Client.Delete(ctx, &service); err != nil { - logger.Info("Unable to delete Service", "error", err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to delete Service: %w", err) - } - } - - logger.Info("Update PVCs...") - - sem := utils.CreateSemaphore(concurrency, time.Nanosecond) - errChan := make(chan error) - wg := sync.WaitGroup{} - for i := range pvcList.Items { if !controllerutil.ContainsFinalizer(&pvcList.Items[i], utils.RenderFinalizer(pvcList.Items[i].Labels["discoblocks"])) { logger.Info("PVC not managed by", "config", pvcList.Items[i].Labels["discoblocks"]) @@ -223,8 +200,6 @@ func (r *DiskConfigReconciler) reconcileDelete(ctx context.Context, configName, if controllerutil.ContainsFinalizer(&pvcList.Items[i], finalizer) { controllerutil.RemoveFinalizer(&pvcList.Items[i], finalizer) - - //nolint:govet // logger is ok to shadowing logger := logger.WithValues("pvc_name", pvcList.Items[i].Name, "pvc_namespace", pvcList.Items[i].Namespace) logger.Info("Update PVC finalizer...", "finalizer", finalizer) @@ -264,7 +239,6 @@ func (r *DiskConfigReconciler) reconcileUpdate(ctx context.Context, config *disc sc := storagev1.StorageClass{} if err := r.Get(ctx, types.NamespacedName{Name: config.Spec.StorageClassName}, &sc); err != nil { if apierrors.IsNotFound(err) { - // TODO create default storageclass logger.Info("StorageClass not found") return ctrl.Result{RequeueAfter: time.Minute}, nil } @@ -287,33 +261,20 @@ func (r *DiskConfigReconciler) reconcileUpdate(ctx context.Context, config *disc } } - service, err := utils.RenderMetricsService(config.Name, config.Namespace) - if err != nil { - logger.Error(err, "Failed to render Service") - return ctrl.Result{}, fmt.Errorf("unable to render Service: %w", err) - } - - controllerutil.AddFinalizer(service, utils.RenderFinalizer(config.Name)) - service.Labels = map[string]string{ - "discoblocks": config.Name, - } - service.Spec.Selector = map[string]string{ - "discoblocks/metrics": config.Name, - } - - logger.Info("Create Service...") - if err = r.Client.Create(ctx, service); err != nil { - if !apierrors.IsAlreadyExists(err) { - logger.Info("Failed to create Service", "error", err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to create Service: %w", err) - } - - logger.Info("Service already exists") - } - return ctrl.Result{}, nil } +// SetupWithManager sets up the controller with the Manager. +func (r *DiskConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&discoblocksondatiov1.DiskConfig{}). + WithEventFilter(diskConfigEventFilter{logger: mgr.GetLogger().WithName("DiskConfigReconciler")}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(r) +} + type diskConfigEventFilter struct { logger logr.Logger } @@ -339,7 +300,6 @@ func (ef diskConfigEventFilter) Update(e event.UpdateEvent) bool { return false } - // TODO maybe there is a mor eperformant way to compare newRawSpec, err := json.Marshal(newObj.Spec) if err != nil { ef.logger.Error(errors.New("invalid content"), "Unable to marshal new object") @@ -358,14 +318,3 @@ func (ef diskConfigEventFilter) Update(e event.UpdateEvent) bool { func (ef diskConfigEventFilter) Generic(_ event.GenericEvent) bool { return false } - -// SetupWithManager sets up the controller with the Manager. -func (r *DiskConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&discoblocksondatiov1.DiskConfig{}). - WithEventFilter(diskConfigEventFilter{logger: mgr.GetLogger().WithName("DiskConfigReconciler")}). - WithOptions(controller.Options{ - MaxConcurrentReconciles: 1, - }). - Complete(r) -} diff --git a/controllers/job_controller.go b/controllers/job_controller.go new file mode 100644 index 0000000..7ffe30f --- /dev/null +++ b/controllers/job_controller.go @@ -0,0 +1,124 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + + "github.com/go-logr/logr" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// JobReconciler reconciles a Job object +type JobReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// Modify the Reconcile function to compare the state specified by +// the DiskConfig object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile +func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithName("JobReconciler").WithValues("name", req.Name, "namespace", req.Name) + + logger.Info("Reconcile job...") + defer logger.Info("Reconciled") + + label, err := labels.NewRequirement("job-name", selection.Equals, []string{req.Name}) + if err != nil { + logger.Error(err, "Unable to parse Job label selector") + return ctrl.Result{}, nil + } + jobSelector := labels.NewSelector().Add(*label) + + logger.Info("Fetch Pods...") + + podList := corev1.PodList{} + if err = r.List(ctx, &podList, &client.ListOptions{ + Namespace: req.Namespace, + LabelSelector: jobSelector, + }); err != nil { + logger.Info("Failed to list Jobs", "error", err.Error()) + return ctrl.Result{}, fmt.Errorf("unable to list Jobs: %w", err) + } + + for i := range podList.Items { + logger.Info("Delete Pod...", "name", podList.Items[i].Name) + + if err := r.Client.Delete(ctx, &podList.Items[i]); err != nil { + if apierrors.IsNotFound(err) { + continue + } + + logger.Info("Failed to delete pod", "name", podList.Items[i].Name, "error", err.Error()) + return ctrl.Result{}, fmt.Errorf("unable to delete pod %s: %w", podList.Items[i].Name, err) + } + } + + logger.Info("Delete Job...") + + return ctrl.Result{}, r.Client.Delete(ctx, &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: req.Name, + Namespace: req.Namespace, + }, + }) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *JobReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&batchv1.Job{}). + WithEventFilter(jobEventFilter{logger: mgr.GetLogger().WithName("JobReconciler")}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(r) +} + +type jobEventFilter struct { + logger logr.Logger +} + +func (ef jobEventFilter) Create(_ event.CreateEvent) bool { + return false +} + +func (ef jobEventFilter) Delete(_ event.DeleteEvent) bool { + return false +} + +func (ef jobEventFilter) Update(e event.UpdateEvent) bool { + newObj, ok := e.ObjectNew.(*batchv1.Job) + if !ok { + ef.logger.Error(errors.New("unsupported type"), "Unable to cast old object") + return false + } + + if app, ok := newObj.Labels["app"]; !ok || app != "discoblocks" { + return false + } + + return newObj.Status.CompletionTime != nil && newObj.Status.Succeeded == 1 +} + +func (ef jobEventFilter) Generic(_ event.GenericEvent) bool { + return false +} diff --git a/controllers/node_controller.go b/controllers/node_controller.go new file mode 100644 index 0000000..a0a916f --- /dev/null +++ b/controllers/node_controller.go @@ -0,0 +1,107 @@ +package controllers + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// NodeReconciler reconciles a Node object +type NodeReconciler struct { + nodes map[string]string + nodesLock chan bool + client.Client + Scheme *runtime.Scheme +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// Modify the Reconcile function to compare the state specified by +// the DiskConfig object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile +func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, nil +} + +// GetNodesByIP returns the actual set of nodes +func (r *NodeReconciler) GetNodesByIP() map[string]string { + r.nodesLock <- true + defer func() { + <-r.nodesLock + }() + + nodes := map[string]string{} + for k, v := range r.nodes { + nodes[v] = k + } + + return nodes +} + +// SetupWithManager sets up the controller with the Manager. +func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.nodes = map[string]string{} + r.nodesLock = make(chan bool, 1) + + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Node{}). + Watches(&source.Kind{Type: &corev1.Node{}}, nodeEventHandler{r}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(r) +} + +type nodeEventHandler struct { + *NodeReconciler +} + +func (eh nodeEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) { + eh.nodesLock <- true + defer func() { + <-eh.nodesLock + }() + + node, ok := e.Object.(*corev1.Node) + if !ok { + panic("Invalid Node object type") + } + + var nodeName, nodeIP string + for i := range node.Status.Addresses { + if node.Status.Addresses[i].Type == corev1.NodeHostName { + nodeName = node.Status.Addresses[i].Address + } else if node.Status.Addresses[i].Type == corev1.NodeInternalIP { + nodeIP = node.Status.Addresses[i].Address + } + } + + eh.nodes[nodeName] = nodeIP +} + +// Update detects StorageOS related config changes. +func (eh nodeEventHandler) Update(e event.UpdateEvent, r workqueue.RateLimitingInterface) { + eh.Create(event.CreateEvent{Object: e.ObjectNew}, r) +} + +func (eh nodeEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) { + eh.nodesLock <- true + defer func() { + <-eh.nodesLock + }() + + delete(eh.nodes, e.Object.GetName()) +} + +func (eh nodeEventHandler) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {} diff --git a/controllers/pod_controller.go b/controllers/pod_controller.go new file mode 100644 index 0000000..677d381 --- /dev/null +++ b/controllers/pod_controller.go @@ -0,0 +1,141 @@ +package controllers + +import ( + "context" + "errors" + "strings" + + "github.com/go-logr/logr" + "github.com/ondat/discoblocks/pkg/utils" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// PodReconciler reconciles a Pod object +type PodReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// Modify the Reconcile function to compare the state specified by +// the DiskConfig object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile +func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logf.FromContext(ctx).WithName("PodReconciler").WithValues("name", req.Name, "namespace", req.Name) + + logger.Info("Reconcile pod...") + defer logger.Info("Reconciled") + + logger.Info("Fetch Pod...") + + pod := &corev1.Pod{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, pod); err != nil { + if apierrors.IsNotFound(err) { + logger.Error(err, "Failed to find Pod") + return ctrl.Result{}, nil + } + + logger.Error(err, "Failed to get Pod") + return ctrl.Result{}, err + } + + serviceName, err := utils.RenderResourceName(true, req.Name, req.Namespace) + if err != nil { + logger.Error(err, "Failed to render resource name") + return ctrl.Result{}, nil + } + + service, err := utils.RenderMetricsService(serviceName, req.Namespace) + if err != nil { + logger.Error(err, "Failed to render Service") + return ctrl.Result{}, nil + } + + service.Labels = map[string]string{ + "discoblocks": req.Name, + } + for k, v := range pod.Labels { + if strings.HasPrefix(k, "discoblocks/") { + service.Labels[k] = v + } + } + + service.Spec.Selector = map[string]string{ + "discoblocks-metrics": req.Name, + } + + service.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: pod.APIVersion, + Kind: pod.Kind, + Name: pod.Name, + UID: pod.UID, + }, + } + + logger.Info("Create Service...") + + if err = r.Client.Create(ctx, service); err != nil { + if !apierrors.IsAlreadyExists(err) { + logger.Error(err, "Failed to create Service") + return ctrl.Result{}, err + } + + logger.Info("Service already exists") + } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + WithEventFilter(podEventFilter{logger: mgr.GetLogger().WithName("PodReconciler")}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(r) +} + +type podEventFilter struct { + logger logr.Logger +} + +func (ef podEventFilter) Create(e event.CreateEvent) bool { + obj, ok := e.Object.(*corev1.Pod) + if !ok { + ef.logger.Error(errors.New("unsupported type"), "Unable to cast old object") + return false + } + + _, ok = obj.Labels["discoblocks-metrics"] + + return ok +} + +func (ef podEventFilter) Delete(_ event.DeleteEvent) bool { + return false +} + +func (ef podEventFilter) Update(_ event.UpdateEvent) bool { + return false +} + +func (ef podEventFilter) Generic(_ event.GenericEvent) bool { + return false +} diff --git a/controllers/pvc_controller.go b/controllers/pvc_controller.go index ce906fc..348b47b 100644 --- a/controllers/pvc_controller.go +++ b/controllers/pvc_controller.go @@ -22,15 +22,21 @@ import ( "fmt" "io" "net/http" + "sort" "strings" + "sync" "time" "github.com/go-logr/logr" discoblocksondatiov1 "github.com/ondat/discoblocks/api/v1" + "github.com/ondat/discoblocks/pkg/drivers" "github.com/ondat/discoblocks/pkg/utils" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" @@ -43,8 +49,14 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" ) +type nodeCache interface { + GetNodesByIP() map[string]string +} + // PVCReconciler reconciles a PVC object type PVCReconciler struct { + NodeCache nodeCache + InProgress sync.Map client.Client Scheme *runtime.Scheme } @@ -114,9 +126,7 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R config.Status.PersistentVolumeClaims[pvc.Name] = pvc.Status.Phase } - // TODO update conditions - - logger.Info("Updating DiskConfig status...") + logger.Info("Update DiskConfig status...") if err := r.Client.Status().Update(ctx, &config); err != nil { logger.Info("Unable to update PVC status", "error", err.Error()) @@ -128,11 +138,12 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, nil } +// MonitorVolumes monitors volumes periodycally //nolint:gocyclo // It is complex we know func (r *PVCReconciler) MonitorVolumes() { logger := logf.Log.WithName("VolumeMonitor") - logger.Info("Monitoring Volumes...") + logger.Info("Monitor Volumes...") defer logger.Info("Monitor done") ctx, cancel := context.WithTimeout(context.Background(), time.Minute-time.Second) @@ -145,6 +156,8 @@ func (r *PVCReconciler) MonitorVolumes() { } endpointSelector := labels.NewSelector().Add(*label) + logger.Info("Fetch Endpoints...") + endpoints := corev1.EndpointsList{} if err := r.Client.List(ctx, &endpoints, &client.ListOptions{ LabelSelector: endpointSelector, @@ -156,7 +169,9 @@ func (r *PVCReconciler) MonitorVolumes() { discoblocks := map[types.NamespacedName][]string{} metrics := map[types.NamespacedName][]string{} for i := range endpoints.Items { - // TODO detect not managed, finalizer like PVC if possible + if endpoints.Items[i].DeletionTimestamp != nil { + continue + } for _, ss := range endpoints.Items[i].Subsets { for _, ip := range ss.Addresses { @@ -165,24 +180,32 @@ func (r *PVCReconciler) MonitorVolumes() { if _, ok := discoblocks[podName]; !ok { discoblocks[podName] = []string{} } - discoblocks[podName] = append(discoblocks[podName], endpoints.Items[i].Labels["discoblocks"]) + for k, v := range endpoints.Items[i].Labels { + if strings.HasPrefix(k, "discoblocks/") { + discoblocks[podName] = append(discoblocks[podName], v) + } + } - //nolint:govet // logger is ok to shadowing - logger := logger.WithValues("pod_name", podName.String(), "ep_name", endpoints.Items[i].Name, "ip", ip.IP) + logger := logger.WithValues("pod_name", podName.String(), "ep_name", endpoints.Items[i].Name, "IP", ip.IP) - // TODO https support would be nice req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:9100/metrics", ip.IP), http.NoBody) if err != nil { logger.Error(err, "Request error") continue } - // TODO shorter context would be great per request - resp, err := http.DefaultClient.Do(req.WithContext(ctx)) + const div = 4 + callCtx, cancel := context.WithTimeout(ctx, time.Minute/div) + + logger.Info("Call Endpoint...") + + resp, err := http.DefaultClient.Do(req.WithContext(callCtx)) if err != nil { + cancel() logger.Error(err, "Connection error") continue } + cancel() rawBody, err := io.ReadAll(resp.Body) if err != nil { @@ -195,10 +218,23 @@ func (r *PVCReconciler) MonitorVolumes() { } for _, line := range strings.Split(string(rawBody), "\n") { - if strings.HasPrefix(line, "#") || !strings.Contains(line, "node_filesystem_avail_bytes") { + if !strings.HasPrefix(line, "node_filesystem_avail_bytes") { + continue + } + + mf, err := utils.ParsePrometheusMetric(line) + if err != nil { + logger.Error(err, "Failed to parse metrics") + continue + } + + if _, ok := mf["node_filesystem_avail_bytes"]; !ok { + logger.Error(err, "Failed to find node_filesystem_avail_bytes", "metric", line) continue } + logger.Info("Metric found", "content", line) + if _, ok := metrics[podName]; !ok { metrics[podName] = []string{} } @@ -216,9 +252,10 @@ func (r *PVCReconciler) MonitorVolumes() { diskConfigCache := map[types.NamespacedName]discoblocksondatiov1.DiskConfig{} for podName, diskConfigNames := range discoblocks { - //nolint:govet // logger is ok to shadowing logger := logger.WithValues("pod_name", podName.String()) + logger.Info("Fetch Pod...") + pod := corev1.Pod{} if err := r.Client.Get(ctx, podName, &pod); err != nil { logger.Error(err, "Failed to fetch pod error") @@ -228,11 +265,12 @@ func (r *PVCReconciler) MonitorVolumes() { for _, diskConfigName := range diskConfigNames { diskConfigName := types.NamespacedName{Namespace: pod.Namespace, Name: diskConfigName} - //nolint:govet // logger is ok to shadowing logger := logger.WithValues("dc_name", diskConfigName.String()) config, ok := diskConfigCache[diskConfigName] if !ok { + logger.Info("Fetch DiskConfig...") + config = discoblocksondatiov1.DiskConfig{} if err := r.Client.Get(ctx, diskConfigName, &config); err != nil { logger.Error(err, "Failed to fetch DiskConfig error") @@ -246,6 +284,65 @@ func (r *PVCReconciler) MonitorVolumes() { continue } + last, loaded := r.InProgress.Load(config.Name) + if loaded && last.(time.Time).Add(config.Spec.Policy.CoolDown.Duration).After(time.Now()) { + logger.Info("Autoscaling cooldown") + continue + } + + label, err := labels.NewRequirement("discoblocks", selection.Equals, []string{config.Name}) + if err != nil { + logger.Error(err, "Unable to parse PVC label selector") + continue + } + pvcSelector := labels.NewSelector().Add(*label) + + logger.Info("Fetch PVCs...") + + pvcs := corev1.PersistentVolumeClaimList{} + if err = r.Client.List(ctx, &pvcs, &client.ListOptions{ + Namespace: config.Namespace, + LabelSelector: pvcSelector, + }); err != nil { + logger.Error(err, "Unable to fetch PVCs") + continue + } + + livePVCs := []*corev1.PersistentVolumeClaim{} + for i := range pvcs.Items { + if pvcs.Items[i].DeletionTimestamp != nil || + !controllerutil.ContainsFinalizer(&pvcs.Items[i], utils.RenderFinalizer(config.Name)) || + pvcs.Items[i].Status.ResizeStatus != nil && *pvcs.Items[i].Status.ResizeStatus != corev1.PersistentVolumeClaimNoExpansionInProgress { + continue + } + + livePVCs = append(livePVCs, &pvcs.Items[i]) + + logger.Info("Volume found", "pvc_name", pvcs.Items[i].Name) + } + + if len(livePVCs) == 0 { + logger.Error(err, "Unable to find any PVC") + continue + } + + sort.Slice(livePVCs, func(i, j int) bool { + return livePVCs[i].CreationTimestamp.UnixNano() < livePVCs[j].CreationTimestamp.UnixNano() + }) + + const hundred = 100 + + lastPVC := livePVCs[len(livePVCs)-1] + actualCapacity := lastPVC.Spec.Resources.Requests[corev1.ResourceStorage] + treshold := actualCapacity.AsApproximateFloat64() * float64(config.Spec.Policy.UpscaleTriggerPercentage) / hundred + + lastDiskDetails := struct { + metrics string + mountpoint string + }{ + metrics: "", + mountpoint: "", + } for _, metric := range metrics[podName] { mf, err := utils.ParsePrometheusMetric(metric) if err != nil { @@ -253,113 +350,388 @@ func (r *PVCReconciler) MonitorVolumes() { continue } - if _, ok := mf["node_filesystem_avail_bytes"]; !ok { - logger.Error(err, "Failed to find metric", "metric", metric) - continue - } - - mountpoint := "" for _, m := range mf["node_filesystem_avail_bytes"].Metric { for _, l := range m.Label { - if *l.Name == "mountpoint" { - mountpoint = *l.Value + if l.Name == nil || l.Value == nil || *l.Name != "mountpoint" || + utils.GetMountPointIndex(config.Spec.MountPointPattern, config.Name, *l.Value) < 0 || + utils.CompareStringNaturalOrder(*l.Value, lastDiskDetails.mountpoint) { + continue + } + + lastDiskDetails = struct { + metrics string + mountpoint string + }{ + metrics: metric, + mountpoint: *l.Value, } } } - if mountpoint == "" { - logger.Error(err, "Failed to find mountpoint") + } + + if lastDiskDetails.metrics == "" { + logger.Error(err, "Unable to find metrics") + continue + } + + logger.Info("Last PVC metric", "metric", lastDiskDetails.metrics) + + logger = logger.WithValues("last_mountpoint", lastDiskDetails.mountpoint) + + available, err := utils.ParsePrometheusMetricValue(lastDiskDetails.metrics) + if err != nil { + logger.Error(err, "Metric is invalid") + continue + } + + logger.Info("Capacities", "actual", fmt.Sprintf("%.2f", actualCapacity.AsApproximateFloat64()), "available", fmt.Sprintf("%.2f", available), "treshold", fmt.Sprintf("%.2f", treshold)) + + if available > treshold { + logger.Info("Disk size ok") + continue + } + + newCapacity := config.Spec.Policy.ExtendCapacity + newCapacity.Add(actualCapacity) + + logger = logger.WithValues("new_capacity", newCapacity.String(), "max_capacity", config.Spec.Policy.MaximumCapacityOfDisk.String(), "no_disks", len(livePVCs), "max_disks", config.Spec.Policy.MaximumNumberOfDisks) + + logger.Info("Find Node name") + + nodeName := r.NodeCache.GetNodesByIP()[pod.Status.HostIP] + if nodeName == "" { + logger.Error(errors.New("node not found: "+pod.Status.HostIP), "Node not found", "IP", pod.Status.HostIP) + continue + } + + logger = logger.WithValues("node_name", nodeName) + + if newCapacity.Cmp(config.Spec.Policy.MaximumCapacityOfDisk) == 1 { + if config.Spec.Policy.MaximumNumberOfDisks > 0 && len(livePVCs) >= int(config.Spec.Policy.MaximumNumberOfDisks) { + logger.Info("Already maximum number of disks", "number", config.Spec.Policy.MaximumNumberOfDisks) continue } - if mountpoint != utils.RenderMountPoint(config.Spec.MountPointPattern, config.Name, 0) { - continue + logger.Info("New disk needed") + + nextIndex := 1 + if last := utils.GetMountPointIndex(config.Spec.MountPointPattern, config.Name, lastDiskDetails.mountpoint); last >= 0 { + nextIndex = last + 1 } - var pvcName types.NamespacedName - for i := range pod.Spec.Containers[0].VolumeMounts { - vm := pod.Spec.Containers[0].VolumeMounts[i] + logger.Info("Next index", "index", nextIndex) - if vm.MountPath == mountpoint { - pvcName = types.NamespacedName{Namespace: pod.Namespace, Name: vm.Name} - break + containerIDs := []string{} + for i := range pod.Status.ContainerStatuses { + cID := pod.Status.ContainerStatuses[i].ContainerID + for _, prefix := range []string{"containerd://", "docker://"} { + cID = strings.TrimPrefix(cID, prefix) } - } - if pvcName.Name == "" { - logger.Error(err, "Volume not found") - continue - } - // TODO maybe cache them and resize to the biggest in one step - pvc := corev1.PersistentVolumeClaim{} - if err = r.Client.Get(ctx, pvcName, &pvc); err != nil { - logger.Error(err, "Failed to fetch PVC") - continue + containerIDs = append(containerIDs, cID) } - logger = logger.WithValues("pvc_name", pvc.Name) - if !controllerutil.ContainsFinalizer(&pvc, utils.RenderFinalizer(config.Name)) { - logger.Info("PVC not managed by", "config", pvc.Labels["discoblocks"]) - continue - } + r.InProgress.Store(config.Name, time.Now()) - // TODO abort if resizing by condition or pvc.Status.ResizeStatus + go r.createPVC(&config, livePVCs[0], containerIDs, nodeName, nextIndex, logger) - available, err := utils.ParsePrometheusMetricValue(metric) - if err != nil { - logger.Error(err, "Metric is invalid") - continue - } + continue + } - maxCapacity, err := resource.ParseQuantity(config.Spec.Policy.MaximumCapacityOfDisk) - if err != nil { - logger.Error(err, "Max capacity is invalid") - continue - } + logger.Info("Resize needed") - const hundred = 100 + r.InProgress.Store(config.Name, time.Now()) - actualCapacity := pvc.Status.Capacity.Storage() - treshold := actualCapacity.AsApproximateFloat64() * float64(config.Spec.Policy.UpscaleTriggerPercentage) / hundred + go r.resizePVC(&config, newCapacity, lastPVC, nodeName, logger) + } + } +} - logger.Info("Capacities", "available", available, "treshold", treshold, "actual", actualCapacity.AsApproximateFloat64(), "max", maxCapacity.AsApproximateFloat64()) +func (r *PVCReconciler) createPVC(config *discoblocksondatiov1.DiskConfig, parentPVC *corev1.PersistentVolumeClaim, containerIDs []string, nodeName string, nextIndex int, logger logr.Logger) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() - if treshold > actualCapacity.AsApproximateFloat64()-available { - logger.Info("Disk size ok") - continue - } + logger.Info("Fetch StorageClass...") - if actualCapacity.Cmp(maxCapacity) == 0 { - logger.Info("New disk needed") - continue - } + sc := storagev1.StorageClass{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: config.Spec.StorageClassName}, &sc); err != nil { + if apierrors.IsNotFound(err) { + logger.Error(err, "StorageClass not found", "name", config.Spec.StorageClassName) + return + } + logger.Error(err, "Unable to fetch StorageClass") + return + } + logger = logger.WithValues("provisioner", sc.Provisioner) - logger.Info("Resize needed") - newCapacity, err := resource.ParseQuantity("1Gi") - if err != nil { - logger.Error(err, "Extend capacity is invalid") - continue - } - newCapacity.Add(*actualCapacity) + driver := drivers.GetDriver(sc.Provisioner) + if driver == nil { + logger.Error(errors.New("driver not found: "+sc.Provisioner), "Driver not found") + return + } - if maxCapacity.Cmp(newCapacity) == -1 { - logger.Info("Set to max capacity") + prefix := utils.GetNamePrefix(discoblocksondatiov1.ReadWriteOnce, "") - newCapacity = maxCapacity - } + pvc, err := utils.NewPVC(config, prefix, driver) + if err != nil { + logger.Error(err, "Unable to construct new PVC") + return + } + logger = logger.WithValues("pvc_name", pvc.Name) + + pvc.Labels["discoblocks-parent"] = parentPVC.Name + pvc.Labels["discoblocks-index"] = fmt.Sprintf("%d", nextIndex) + + pvc.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: parentPVC.APIVersion, + Kind: parentPVC.Kind, + Name: parentPVC.Name, + UID: parentPVC.UID, + }, + } - logger.Info("Updating PVC...", "capacity", actualCapacity.AsApproximateFloat64()) + logger.Info("Create PVC...") - pvc.Spec.Resources.Requests[corev1.ResourceStorage] = newCapacity + if err = r.Create(ctx, pvc); err != nil { + logger.Error(err, "Failed to create PVC") + return + } - if err = r.Update(ctx, &pvc); err != nil { - logger.Error(err, "Failed to update PVC") - continue - } + waitCtx, cancel := context.WithTimeout(context.Background(), time.Hour) + defer cancel() + + logger.Info("Wait PVC...") + +WAIT_PVC: + for { + select { + case <-waitCtx.Done(): + logger.Error(waitCtx.Err(), "PVC creation wait timeout") + return + default: + if err = r.Get(ctx, types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}, pvc); err == nil && + pvc.Spec.VolumeName != "" { + break WAIT_PVC + } + + <-time.NewTicker(time.Second).C + } + } + + vaName, err := utils.RenderResourceName(true, config.Name, pvc.Name, pvc.Namespace) + if err != nil { + logger.Error(err, "Failed to render VolumeAttachment name") + return + } + + logger.Info("Find PersistentVolume...") + + pv, err := r.getPersistentVolume(ctx, pvc.Name) + if err != nil { + logger.Error(err, "Failed to find PersistentVolume") + return + } else if pv.Spec.CSI == nil { + logger.Error(err, "Failed to find pv.spec.csi") + return + } + + volumeAttachment := &storagev1.VolumeAttachment{ + ObjectMeta: metav1.ObjectMeta{ + Name: vaName, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: pv.APIVersion, + Kind: pv.Kind, + Name: pv.Name, + UID: pv.UID, + }, + }, + }, + Spec: storagev1.VolumeAttachmentSpec{ + Attacher: sc.Provisioner, + Source: storagev1.VolumeAttachmentSource{ + PersistentVolumeName: &pvc.Spec.VolumeName, + }, + NodeName: nodeName, + }, + } + + logger.Info("Create VolumeAttachment...", "attacher", sc.Provisioner, "node_name", nodeName) + + if err = r.Create(ctx, volumeAttachment); err != nil { + logger.Error(err, "Failed to create volume attachment") + return + } + + waitForMeta, err := driver.WaitForVolumeAttachmentMeta() + if err != nil { + logger.Error(err, "Failed to call driver", "method", "WaitForVolumeAttachmentMeta") + return + } + + logger.Info("Wait VolumeAttachment...", "waitForMeta", waitForMeta) + + var dev string +WAIT_VA: + for { + select { + case <-waitCtx.Done(): + logger.Error(waitCtx.Err(), "VolumeAttachment creation wait timeout") + return + default: + if err = r.Get(ctx, types.NamespacedName{Name: vaName}, volumeAttachment); err != nil || + !volumeAttachment.Status.Attached || + waitForMeta != "" && volumeAttachment.Status.AttachmentMetadata[waitForMeta] == "" { + <-time.NewTicker(time.Second).C + + continue } + + dev = volumeAttachment.Status.AttachmentMetadata[waitForMeta] + + break WAIT_VA + } + } + + mountpoint := utils.RenderMountPoint(config.Spec.MountPointPattern, config.Name, nextIndex) + + mountJob, err := utils.RenderHostJob(pvc.Name, pvc.Namespace, nodeName, dev, pv.Spec.CSI.FSType, mountpoint, containerIDs, driver.GetMountCommand) + if err != nil { + logger.Error(err, "Unable to render mount job") + return + } else if mountJob == nil { + return + } + + logger.Info("Create mount Job", "containers", containerIDs, "mountpoint", mountpoint) + + if err := r.Create(ctx, mountJob); err != nil { + logger.Error(err, "Failed to create mount job") + return + } +} + +func (r *PVCReconciler) resizePVC(config *discoblocksondatiov1.DiskConfig, capacity resource.Quantity, pvc *corev1.PersistentVolumeClaim, nodeName string, logger logr.Logger) { + logger.Info("Update PVC...", "capacity", capacity.AsApproximateFloat64()) + + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = capacity + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + if err := r.Update(ctx, pvc); err != nil { + logger.Error(err, "Failed to update PVC") + return + } + + if _, ok := pvc.Labels["discoblocks-parent"]; !ok { + logger.Info("Parent PVC is managed by CSI driver") + return + } + + sc := storagev1.StorageClass{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: config.Spec.StorageClassName}, &sc); err != nil { + if apierrors.IsNotFound(err) { + logger.Error(err, "StorageClass not found", "name", config.Spec.StorageClassName) + return } + logger.Error(err, "Unable to fetch StorageClass") + return + } + logger = logger.WithValues("provisioner", sc.Provisioner) + + driver := drivers.GetDriver(sc.Provisioner) + if driver == nil { + logger.Error(errors.New("driver not found: "+sc.Provisioner), "Driver not found") + return + } + + if c, err := driver.GetResizeCommand(); err != nil { + logger.Error(err, "Failed to call GetResizeCommand") + return + } else if c == "" { + return + } + + waitForMeta, err := driver.WaitForVolumeAttachmentMeta() + if err != nil { + logger.Error(err, "Failed to call driver", "method", "WaitForVolumeAttachmentMeta") + return + } + + dev := "" + if waitForMeta != "" { + volumeAttachment := &storagev1.VolumeAttachment{} + + vaName, err := utils.RenderResourceName(true, config.Name, pvc.Name, pvc.Namespace) + if err != nil { + logger.Error(err, "Failed to render VolumeAttachment name") + return + } + + logger.Info("Fetch VolumeAttachment...") + + if err = r.Get(ctx, types.NamespacedName{Name: vaName}, volumeAttachment); err != nil { + logger.Error(err, "Failed to fetch VolumeAttachment") + return + } + + if m, ok := volumeAttachment.Status.AttachmentMetadata[waitForMeta]; !ok || m == "" { + logger.Error(errors.New("failed to find VolumeAttachment meta"), "Failed to find VolumeAttachment meta") + return + } + + dev = volumeAttachment.Status.AttachmentMetadata[waitForMeta] + } + + logger.Info("Determine file-system...") + + logger.Info("Find PersistentVolume...") + + pv, err := r.getPersistentVolume(ctx, pvc.Name) + if err != nil { + logger.Error(err, "Failed to find PersistentVolume") + return + } else if pv.Spec.CSI == nil { + logger.Error(err, "Failed to find pv.spec.csi") + return + } + + resizeJob, err := utils.RenderHostJob(pvc.Name, pvc.Namespace, nodeName, dev, pv.Spec.CSI.FSType, "", []string{}, driver.GetResizeCommand) + if err != nil { + logger.Error(err, "Unable to render mount job") + return + } else if resizeJob == nil { + return + } + + logger.Info("Create resize Job") + + if err := r.Create(ctx, resizeJob); err != nil { + logger.Error(err, "Failed to create resize job") + return } } +func (r *PVCReconciler) getPersistentVolume(ctx context.Context, pvcName string) (*corev1.PersistentVolume, error) { + pvList := corev1.PersistentVolumeList{} + if err := r.List(ctx, &pvList, &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("spec.claimRef.name", pvcName), + }); err != nil { + return nil, fmt.Errorf("failed to list PVs: %w", err) + } + + switch { + case len(pvList.Items) == 0: + return nil, errors.New("failed to find PV") + case len(pvList.Items) > 1: + return nil, errors.New("more than one PV attached to PVC") + } + + return &pvList.Items[0], nil +} + type pvcEventFilter struct { logger logr.Logger } @@ -409,7 +781,8 @@ func (r *PVCReconciler) SetupWithManager(mgr ctrl.Manager) (chan<- bool, error) closeChan := make(chan bool) go func() { - ticker := time.NewTicker(time.Minute) + const two = 2 + ticker := time.NewTicker(time.Minute / two) defer ticker.Stop() for { @@ -422,6 +795,24 @@ func (r *PVCReconciler) SetupWithManager(mgr ctrl.Manager) (chan<- bool, error) } }() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + if err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.PersistentVolume{}, "spec.claimRef.name", func(rawObj client.Object) []string { + pv, ok := rawObj.(*corev1.PersistentVolume) + if !ok { + return nil + } + + if pv.Spec.ClaimRef == nil { + return nil + } + + return []string{pv.Spec.ClaimRef.Name} + }); err != nil { + return nil, err + } + return closeChan, ctrl.NewControllerManagedBy(mgr). For(&corev1.PersistentVolumeClaim{}). WithEventFilter(pvcEventFilter{logger: mgr.GetLogger().WithName("PVCReconciler")}). diff --git a/drivers/csi.storageos.com/go.mod b/drivers/csi.storageos.com/go.mod index 7d7890d..0471206 100644 --- a/drivers/csi.storageos.com/go.mod +++ b/drivers/csi.storageos.com/go.mod @@ -1,3 +1,5 @@ module csi.storageos.com go 1.18 + +require github.com/valyala/fastjson v1.6.3 diff --git a/drivers/csi.storageos.com/go.sum b/drivers/csi.storageos.com/go.sum index e69de29..3706301 100644 --- a/drivers/csi.storageos.com/go.sum +++ b/drivers/csi.storageos.com/go.sum @@ -0,0 +1,2 @@ +github.com/valyala/fastjson v1.6.3 h1:tAKFnnwmeMGPbwJ7IwxcTPCNr3uIzoIj3/Fh90ra4xc= +github.com/valyala/fastjson v1.6.3/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= diff --git a/drivers/csi.storageos.com/main.go b/drivers/csi.storageos.com/main.go index 79bd532..1da4cc4 100644 --- a/drivers/csi.storageos.com/main.go +++ b/drivers/csi.storageos.com/main.go @@ -3,12 +3,22 @@ package main import ( "fmt" "os" + + "github.com/valyala/fastjson" ) func main() {} //export IsStorageClassValid func IsStorageClassValid() { + json := []byte(os.Getenv("STORAGE_CLASS_JSON")) + + if !fastjson.Exists(json, "allowVolumeExpansion") || !fastjson.GetBool(json, "allowVolumeExpansion") { + fmt.Fprint(os.Stderr, "only allowVolumeExpansion true is supported") + fmt.Fprint(os.Stdout, false) + return + } + fmt.Fprint(os.Stdout, true) } @@ -37,3 +47,26 @@ func GetCSIDriverNamespace() { func GetCSIDriverPodLabels() { fmt.Fprint(os.Stdout, `{ "app": "storageos", "app.kubernetes.io/component": "csi" }`) } + +//export GetMountCommand +func GetMountCommand() { + fmt.Fprint(os.Stdout, `DEV=$(chroot /host ls /var/lib/storageos/volumes/ -Atr | tail -1) && +chroot /host nsenter --target 1 --mount mkdir -p /var/lib/kubelet/plugins/kubernetes.io/csi/pv/${PVC_NAME} && +chroot /host nsenter --target 1 --mount mount /var/lib/storageos/volumes/${DEV} /var/lib/kubelet/plugins/kubernetes.io/csi/pv/${PVC_NAME} && +DEV_MAJOR=$(chroot /host nsenter --target 1 --mount cat /proc/self/mountinfo | grep ${DEV} | awk '{print $3}' | awk '{split($0,a,":"); print a[1]}') && +DEV_MINOR=$(chroot /host nsenter --target 1 --mount cat /proc/self/mountinfo | grep ${DEV} | awk '{print $3}' | awk '{split($0,a,":"); print a[2]}') && +for CONTAINER_ID in ${CONTAINER_IDS}; do + PID=$(docker inspect -f '{{.State.Pid}}' ${CONTAINER_ID} || crictl inspect --output go-template --template '{{.info.pid}}' ${CONTAINER_ID}) && + chroot /host nsenter --target ${PID} --mount mkdir -p ${DEV} ${MOUNT_POINT} && + chroot /host nsenter --target ${PID} --mount mknod ${DEV}/mount b ${DEV_MAJOR} ${DEV_MINOR} && + chroot /host nsenter --target ${PID} --mount mount ${DEV}/mount ${MOUNT_POINT} +done`) +} + +//export GetResizeCommand +func GetResizeCommand() {} + +//export WaitForVolumeAttachmentMeta +func WaitForVolumeAttachmentMeta() { + fmt.Fprint(os.Stdout, "") +} diff --git a/drivers/ebs.csi.aws.com/main.go b/drivers/ebs.csi.aws.com/main.go index 3f4542c..b36ac9e 100644 --- a/drivers/ebs.csi.aws.com/main.go +++ b/drivers/ebs.csi.aws.com/main.go @@ -13,8 +13,8 @@ func main() {} func IsStorageClassValid() { json := []byte(os.Getenv("STORAGE_CLASS_JSON")) - if fastjson.Exists(json, "volumeBindingMode") && fastjson.GetString(json, "volumeBindingMode") != "WaitForFirstConsumer" { - fmt.Fprint(os.Stderr, "only volumeBindingMode WaitForFirstConsumer is supported") + if fastjson.Exists(json, "volumeBindingMode") && fastjson.GetString(json, "volumeBindingMode") != "Immediate" { + fmt.Fprint(os.Stderr, "only volumeBindingMode Immediate is supported") fmt.Fprint(os.Stdout, false) return } @@ -51,5 +51,40 @@ func GetCSIDriverNamespace() { //export GetCSIDriverPodLabels func GetCSIDriverPodLabels() { - fmt.Fprint(os.Stdout, `{ "app.kubernetes.io/component": "ebs-csi-controller", "app.kubernetes.io/name": "aws-ebs-csi-driver" }`) + fmt.Fprint(os.Stdout, `{ "app": "ebs-csi-controller" }`) +} + +//export GetMountCommand +func GetMountCommand() { + fmt.Fprint(os.Stdout, `DEV=$(chroot /host nsenter --target 1 --mount readlink -f ${DEV} | sed "s|.*/||") && +chroot /host nsenter --target 1 --mount mkfs.${FS} /dev/${DEV} && +chroot /host nsenter --target 1 --mount mkdir -p /var/lib/kubelet/plugins/kubernetes.io/csi/pv/${PVC_NAME} && +chroot /host nsenter --target 1 --mount mount /dev/${DEV} /var/lib/kubelet/plugins/kubernetes.io/csi/pv/${PVC_NAME} && +DEV_MAJOR=$(chroot /host nsenter --target 1 --mount cat /proc/self/mountinfo | grep ${DEV} | awk '{print $3}' | awk '{split($0,a,":"); print a[1]}') && +DEV_MINOR=$(chroot /host nsenter --target 1 --mount cat /proc/self/mountinfo | grep ${DEV} | awk '{print $3}' | awk '{split($0,a,":"); print a[2]}') && +for CONTAINER_ID in ${CONTAINER_IDS}; do + PID=$(docker inspect -f '{{.State.Pid}}' ${CONTAINER_ID} || crictl inspect --output go-template --template '{{.info.pid}}' ${CONTAINER_ID}) && + chroot /host nsenter --target ${PID} --mount mkdir -p /dev ${MOUNT_POINT} && + chroot /host nsenter --target ${PID} --pid --mount mknod /dev/${DEV} b ${DEV_MAJOR} ${DEV_MINOR} && + chroot /host nsenter --target ${PID} --mount mount /dev/${DEV} ${MOUNT_POINT} +done`) +} + +//export GetResizeCommand +func GetResizeCommand() { + fmt.Fprint(os.Stdout, `DEV=$(chroot /host nsenter --target 1 --mount readlink -f ${DEV}) && +( + ([ "${FS}" = "ext3" ] && chroot /host nsenter --target 1 --mount resize2fs ${DEV}) || + ([ "${FS}" = "ext4" ] && chroot /host nsenter --target 1 --mount resize2fs ${DEV}) || + ([ "${FS}" = "xfs" ] && chroot /host nsenter --target 1 --mount xfs_growfs -d ${DEV}) || + ([ "${FS}" = "btrfs" ] && chroot /host nsenter --target 1 --mount btrfs filesystem resize max ${DEV}) || + echo unsupported file-system $FS +) + `) + fmt.Fprint(os.Stdout, `DEV=$(chroot /host nsenter --target 1 --mount readlink -f ${DEV}) && chroot /host nsenter --target 1 --mount resize2fs ${DEV}`) +} + +//export WaitForVolumeAttachmentMeta +func WaitForVolumeAttachmentMeta() { + fmt.Fprint(os.Stdout, "devicePath") } diff --git a/go.mod b/go.mod index 86a9149..531c99c 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ replace ( require ( github.com/go-logr/logr v1.2.3 + github.com/moby/moby v20.10.18+incompatible github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.19.0 github.com/prometheus/client_model v0.2.0 diff --git a/go.sum b/go.sum index a071c84..7f8ffe6 100644 --- a/go.sum +++ b/go.sum @@ -456,6 +456,8 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ= +github.com/moby/moby v20.10.18+incompatible h1:aAQ5lDb+SDrhVDnoMbR3kSzswd+41X34pex8VRJXvHg= +github.com/moby/moby v20.10.18+incompatible/go.mod h1:fDXVQ6+S340veQPv35CzDahGBmHsiclFwfEygB/TWMc= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/sys/mountinfo v0.4.1/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A= github.com/moby/term v0.0.0-20201216013528-df9cb8a40635/go.mod h1:FBS0z0QWA44HXygs7VXDUOGoN/1TV3RuWkLO04am3wc= diff --git a/go.work.sum b/go.work.sum index 184290c..e7b96ff 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,2 +1,11 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/container-storage-interface/spec v1.5.0 h1:lvKxe3uLgqQeVQcrnL2CPQKISoKjTJxojEs9cBk+HXo= +github.com/containerd/containerd v1.4.11 h1:QCGOUN+i70jEEL/A6JVIbhy4f4fanzAzSR4kNG7SlcE= +github.com/containerd/fifo v1.0.0 h1:6PirWBr9/L7GDamKr+XM0IeUFXu5mf3M/BPpH9gaLBU= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8= +github.com/docker/docker v20.10.7+incompatible h1:Z6O9Nhsjv+ayUEeI1IojKbYcsGdgYSNqxe1s2MYzUhQ= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= +golang.org/x/exp v0.0.0-20210220032938-85be41e4509f h1:GrkO5AtFUU9U/1f5ctbIBXtBGeSJbWwIYfIsTcFMaX4= diff --git a/main.go b/main.go index 75c0e27..cffd48b 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "os" "strconv" "strings" + "sync" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -56,14 +57,17 @@ var ( //+kubebuilder:rbac:groups=discoblocks.ondat.io,resources=diskconfigs,verbs=get;list;watch;create;update;delete //+kubebuilder:rbac:groups=discoblocks.ondat.io,resources=diskconfigs/status,verbs=update //+kubebuilder:rbac:groups=discoblocks.ondat.io,resources=diskconfigs/finalizers,verbs=update -//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update -//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims/finalizers,verbs=update +//+kubebuilder:rbac:groups="storage.k8s.io",resources=volumeattachments,verbs=create;list;watch //+kubebuilder:rbac:groups="storage.k8s.io",resources=storageclasses,verbs=get;update //+kubebuilder:rbac:groups="storage.k8s.io",resources=storageclasses/finalizers,verbs=update +//+kubebuilder:rbac:groups="batch",resources=jobs,verbs=create;list;watch;delete +//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update +//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims/finalizers,verbs=update +//+kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=list //+kubebuilder:rbac:groups="",resources=services,verbs=create;update;delete //+kubebuilder:rbac:groups="",resources=services/finalizers,verbs=update //+kubebuilder:rbac:groups="",resources=endpoints,verbs=list;watch -//+kubebuilder:rbac:groups="",resources=pod,verbs=get +//+kubebuilder:rbac:groups="",resources=pod,verbs=get;delete // indirect rbac //+kubebuilder:rbac:groups="",resources=namespaces;services;pods;persistentvolumes;replicationcontrollers,verbs=list;watch @@ -110,6 +114,31 @@ func main() { os.Exit(1) } + if err = (&controllers.PodReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Pod") + os.Exit(1) + } + + if err = (&controllers.JobReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Job") + os.Exit(1) + } + + nodeReconciler := &controllers.NodeReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + } + if err = nodeReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Node") + os.Exit(1) + } + if err = (&controllers.DiskConfigReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -118,10 +147,11 @@ func main() { os.Exit(1) } - // TODO close not handled if _, err = (&controllers.PVCReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + NodeCache: nodeReconciler, + InProgress: sync.Map{}, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PVC") os.Exit(1) @@ -150,7 +180,7 @@ func main() { setupLog.Error(err, "unable to set up health check") os.Exit(1) } - // TODO proper ready check would be nice + if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up ready check") os.Exit(1) @@ -169,7 +199,7 @@ func main() { os.Exit(1) }() - setupLog.Info("starting manager") + setupLog.Info("Start manager") if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) diff --git a/mutators/pod_webhooks.go b/mutators/pod_webhooks.go index 4880234..459351c 100644 --- a/mutators/pod_webhooks.go +++ b/mutators/pod_webhooks.go @@ -6,23 +6,30 @@ import ( "errors" "fmt" "net/http" + "sort" + "strconv" + "strings" + "sync" "time" + "github.com/moby/moby/pkg/namesgenerator" discoblocksondatiov1 "github.com/ondat/discoblocks/api/v1" "github.com/ondat/discoblocks/pkg/drivers" "github.com/ondat/discoblocks/pkg/utils" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) // log is for logging in this package -var podMutatorLog = logf.Log.WithName("PodMutator") +var podMutatorLog = logf.Log.WithName("mutators.PodMutator") type PodMutator struct { Client client.Client @@ -33,8 +40,9 @@ type PodMutator struct { //+kubebuilder:webhook:path=/mutate-v1-pod,mutating=true,sideEffects=none,failurePolicy=fail,groups="",resources=pods,verbs=create,versions=v1,admissionReviewVersions=v1,name=mpod.kb.io // Handle pod mutation +//nolint:gocyclo // It is complex we know func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admission.Response { - logger := podMutatorLog.WithValues("name", req.Name, "namespace", req.Namespace) + logger := podMutatorLog.WithValues("req_name", req.Name, "req_namespace", req.Namespace) logger.Info("Handling...") defer logger.Info("Handled") @@ -44,6 +52,10 @@ func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admissio return admission.Errored(http.StatusBadRequest, fmt.Errorf("unable to decode request: %w", err)) } + // In some cases decoded pod doesn't have these fields + pod.Name = req.Name + pod.Namespace = req.Namespace + ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() @@ -56,6 +68,10 @@ func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admissio return admission.Errored(http.StatusInternalServerError, fmt.Errorf("unable to fetch configs: %w", err)) } + if len(diskConfigs.Items) == 0 { + return admission.Allowed("DiskConfig not found in namespace: " + pod.Namespace) + } + errorMode := func(code int32, reason string, err error) admission.Response { if a.strict { return admission.Errored(code, err) @@ -64,35 +80,62 @@ func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admissio return admission.Allowed(reason) } + nameGeneratorOnce := sync.Once{} + volumes := map[string]string{} for i := range diskConfigs.Items { + if diskConfigs.Items[i].DeletionTimestamp != nil { + continue + } + config := diskConfigs.Items[i] if !utils.IsContainsAll(pod.Labels, config.Spec.PodSelector) { continue } - if pod.Labels == nil { - pod.Labels = map[string]string{} + logger := logger.WithValues("name", config.Name, "sc_name", config.Spec.StorageClassName) + + if pod.Spec.HostPID && !config.Spec.Policy.Pause { + msg := "Autoscaling and Pod.Spec.HostPID are not supported together" + logger.Info(msg) + return errorMode(http.StatusBadRequest, msg, errors.New(strings.ToLower(msg))) } - pod.Labels["discoblocks/metrics"] = config.Name - //nolint:govet // logger is ok to shadowing - logger := logger.WithValues("name", config.Name, "sc_name", config.Spec.StorageClassName) - logger.Info("Attach volume to workload...") + if pod.Name == "" { + var nameGenErr error + nameGeneratorOnce.Do(func() { + if len(pod.OwnerReferences) == 0 { + pod.Name = fmt.Sprintf("%s-%d", namesgenerator.GetRandomName(0), time.Now().UnixNano()) + } else { + nameParts := []string{pod.OwnerReferences[0].Name} + for _, r := range pod.OwnerReferences { + nameParts = append(nameParts, r.Name) + } + nameParts = append(nameParts, fmt.Sprintf("%d", time.Now().UnixNano())) + + pod.Name, nameGenErr = utils.RenderResourceName(false, nameParts...) + } + + logger = podMutatorLog.WithValues("name", pod.Name, "namespace", pod.Namespace) + }) + if nameGenErr != nil { + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("unable to render resource name: %w", nameGenErr)) + } + } - capacity, err := resource.ParseQuantity(config.Spec.Capacity) - if err != nil { - logger.Error(err, "Capacity is invalid") - return errorMode(http.StatusNotAcceptable, "Capacity is invalid:"+config.Spec.Capacity, err) + if pod.Labels == nil { + pod.Labels = map[string]string{} } + pod.Labels[utils.RenderDiskConfigLabel(config.Name)] = config.Name + pod.Labels["discoblocks-metrics"] = pod.Name logger.Info("Fetch StorageClass...") sc := storagev1.StorageClass{} - if err = a.Client.Get(ctx, types.NamespacedName{Name: config.Spec.StorageClassName}, &sc); err != nil { + if err := a.Client.Get(ctx, types.NamespacedName{Name: config.Spec.StorageClassName}, &sc); err != nil { if apierrors.IsNotFound(err) { - logger.Info("StorageClass not found") + logger.Info("StorageClass not found", "name", config.Spec.StorageClassName) return errorMode(http.StatusNotFound, "StorageClass not found: "+config.Spec.StorageClassName, err) } logger.Info("Unable to fetch StorageClass", "error", err.Error()) @@ -103,39 +146,22 @@ func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admissio driver := drivers.GetDriver(sc.Provisioner) if driver == nil { logger.Info("Driver not found") - return errorMode(http.StatusNotFound, "Driver not found: "+sc.Provisioner, errors.New("driver not found: "+sc.Provisioner)) + return errorMode(http.StatusInternalServerError, "Driver not found: "+sc.Provisioner, fmt.Errorf("driver not found: %s", sc.Provisioner)) } - preFix := config.CreationTimestamp.String() - if config.Spec.AvailabilityMode != discoblocksondatiov1.Singleton { - preFix = time.Now().String() - } + logger.Info("Attach volume to workload...") - pvcName, err := utils.RenderPVCName(preFix, config.Name, config.Namespace) - if err != nil { - logger.Error(err, "Unable to calculate hash") - return errorMode(http.StatusInternalServerError, "unable to calculate hash", err) - } - logger = logger.WithValues("pvc_name", pvcName) + prefix := utils.GetNamePrefix(config.Spec.AvailabilityMode, config.CreationTimestamp.String()) - pvc, err := driver.GetPVCStub(pvcName, config.Namespace, config.Spec.StorageClassName) + var pvc *corev1.PersistentVolumeClaim + pvc, err := utils.NewPVC(&config, prefix, driver) if err != nil { - logger.Error(err, "Unable to init a PVC", "provisioner", sc.Provisioner) - return errorMode(http.StatusInternalServerError, "Unable to init a PVC", err) + return errorMode(http.StatusInternalServerError, err.Error(), err) } + logger = logger.WithValues("pvc_name", pvc.Name) - pvc.Finalizers = []string{utils.RenderFinalizer(config.Name)} - pvc.Labels = map[string]string{ - "discoblocks": config.Name, - } - pvc.Spec.Resources = corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceStorage: capacity, - }, - } - pvc.Spec.AccessModes = config.Spec.AccessModes - if len(pvc.Spec.AccessModes) == 0 { - pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce} + pvcNamesWithMount := map[string]string{ + pvc.Name: utils.RenderMountPoint(config.Spec.MountPointPattern, pvc.Name, 0), } logger.Info("Create PVC...") @@ -146,25 +172,73 @@ func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admissio } logger.Info("PVC already exists") - } - mountpoint := utils.RenderMountPoint(config.Spec.MountPointPattern, pvc.Name, 0) - for name, mp := range volumes { - if mp == mountpoint { - logger.Info("Mount point already added", "exists", name, "actual", pvcName, "mountpoint", sc.Provisioner) - return errorMode(http.StatusInternalServerError, "Unable to init a PVC", err) + if config.Spec.AvailabilityMode != discoblocksondatiov1.ReadWriteOnce { + label, err := labels.NewRequirement("discoblocks-parent", selection.Equals, []string{pvc.Name}) + if err != nil { + logger.Error(err, "Unable to parse PVC label selector") + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("unable to parse PVC label selectors: %w", err)) + } + pvcSelector := labels.NewSelector().Add(*label) + + logger.Info("Fetch PVCs...") + + pvcs := corev1.PersistentVolumeClaimList{} + if err = a.Client.List(ctx, &pvcs, &client.ListOptions{ + Namespace: config.Namespace, + LabelSelector: pvcSelector, + }); err != nil { + logger.Error(err, "Unable to fetch PVCs") + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("unable to fetch PVCs: %w", err)) + } + + sort.Slice(pvcs.Items, func(i, j int) bool { + return pvcs.Items[i].CreationTimestamp.UnixNano() < pvcs.Items[j].CreationTimestamp.UnixNano() + }) + + for i := range pvcs.Items { + if pvcs.Items[i].DeletionTimestamp != nil || !controllerutil.ContainsFinalizer(&pvcs.Items[i], utils.RenderFinalizer(config.Name)) { + continue + } + + if _, ok := pvcs.Items[i].Labels["discoblocks-index"]; !ok { + err = errors.New("volume index not found") + logger.Error(err, "Volume index not found") + return admission.Errored(http.StatusInternalServerError, err) + } + + index, err := strconv.Atoi(pvcs.Items[i].Labels["discoblocks-index"]) + if err != nil { + logger.Error(err, "Unable to convert index") + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("unable to convert index: %w", err)) + } + + pvcNamesWithMount[pvcs.Items[i].Name] = utils.RenderMountPoint(config.Spec.MountPointPattern, pvcs.Items[i].Name, index) + + logger.Info("Volume found", "pvc_name", pvcs.Items[i].Name, "mountpoint", pvcNamesWithMount[pvcs.Items[i].Name]) + } } } - volumes[pvcName] = mountpoint - pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ - Name: pvcName, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvcName, + for pvcName, mountpoint := range pvcNamesWithMount { + for name, mp := range volumes { + if mp == mountpoint { + logger.Info("Mount point already added", "exists", name, "actual", pvcName, "mountpoint", sc.Provisioner) + return errorMode(http.StatusInternalServerError, "Unable to init a PVC", fmt.Errorf("mount point already added: %s:/%s", pvcName, name)) + } + } + + volumes[pvcName] = mountpoint + + pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ + Name: pvcName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + }, }, - }, - }) + }) + } } if len(volumes) == 0 { @@ -182,22 +256,6 @@ func (a *PodMutator) Handle(ctx context.Context, req admission.Request) admissio } pod.Spec.Containers = append(pod.Spec.Containers, *metricsSideCar) - // TODO manager sidecar is needed to mount disks on the fly - // managerSideCar, err := utils.RenderManagerSidecar() - // if err != nil { - // logger.Error(err, "Manager sidecar template invalid") - // return admission.Allowed("Manager sidecar template invalid") - // } - // pod.Spec.Containers = append(pod.Spec.Containers, *managerSideCar) - // pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ - // Name: "dev", - // VolumeSource: corev1.VolumeSource{ - // HostPath: &corev1.HostPathVolumeSource{ - // Path: "/dev", - // }, - // }, - // }) - logger.Info("Attach volume mounts...") for i := range pod.Spec.Containers { diff --git a/pkg/drivers/drivers.go b/pkg/drivers/drivers.go index 13c2a60..a7ed9b2 100644 --- a/pkg/drivers/drivers.go +++ b/pkg/drivers/drivers.go @@ -186,6 +186,81 @@ func (d *Driver) GetCSIDriverDetails() (string, map[string]string, error) { return string(namespace), labels, nil } +// GetMountCommand creates a PersistentVolumeClaim for driver +func (d *Driver) GetMountCommand() (string, error) { + wasiEnv, instance, err := d.init(nil) + if err != nil { + return "", fmt.Errorf("unable to init instance: %w", err) + } + + getMountCommand, err := instance.Exports.GetRawFunction("GetMountCommand") + if err != nil { + return "", fmt.Errorf("unable to find GetMountCommand: %w", err) + } + + _, err = getMountCommand.Native()() + if err != nil { + return "", fmt.Errorf("unable to call GetMountCommand: %w", err) + } + + errOut := string(wasiEnv.ReadStderr()) + if errOut != "" { + return "", fmt.Errorf("function error GetMountCommand: %s", errOut) + } + + return string(wasiEnv.ReadStdout()), nil +} + +// GetResizeCommand gets resize command to execute on the host +func (d *Driver) GetResizeCommand() (string, error) { + wasiEnv, instance, err := d.init(nil) + if err != nil { + return "", fmt.Errorf("unable to init instance: %w", err) + } + + getResizeCommand, err := instance.Exports.GetRawFunction("GetResizeCommand") + if err != nil { + return "", fmt.Errorf("unable to find GetResizeCommand: %w", err) + } + + _, err = getResizeCommand.Native()() + if err != nil { + return "", fmt.Errorf("unable to call GetResizeCommand: %w", err) + } + + errOut := string(wasiEnv.ReadStderr()) + if errOut != "" { + return "", fmt.Errorf("function error GetResizeCommand: %s", errOut) + } + + return string(wasiEnv.ReadStdout()), nil +} + +// WaitForVolumeAttachmentMeta defines wait for device info of plugin +func (d *Driver) WaitForVolumeAttachmentMeta() (string, error) { + wasiEnv, instance, err := d.init(nil) + if err != nil { + return "", fmt.Errorf("unable to init instance: %w", err) + } + + waitCommand, err := instance.Exports.GetRawFunction("WaitForVolumeAttachmentMeta") + if err != nil { + return "", fmt.Errorf("unable to find WaitForVolumeAttachmentMeta: %w", err) + } + + _, err = waitCommand.Native()() + if err != nil { + return "", fmt.Errorf("unable to call WaitForVolumeAttachmentMeta: %w", err) + } + + errOut := string(wasiEnv.ReadStderr()) + if errOut != "" { + return "", fmt.Errorf("function error WaitForVolumeAttachmentMeta: %s", errOut) + } + + return string(wasiEnv.ReadStdout()), nil +} + func (d *Driver) init(envs map[string]string) (*wasmer.WasiEnvironment, *wasmer.Instance, error) { builder := wasmer.NewWasiStateBuilder("wasi-program"). CaptureStdout().CaptureStderr() diff --git a/pkg/utils/helpers.go b/pkg/utils/helpers.go index 486f50e..cb6f70e 100644 --- a/pkg/utils/helpers.go +++ b/pkg/utils/helpers.go @@ -1,59 +1,21 @@ package utils import ( + "errors" "fmt" "math/big" + "regexp" "strings" + "time" + discoblocksondatiov1 "github.com/ondat/discoblocks/api/v1" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" - corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/yaml" ) -const defaultMountPattern = "/media/discoblocks/%s-%d" +const maxName = 253 -// TODO on this way on case of multiple discoblocks on a pod, -// all service would capture all disks leads to redundant data -const metricsServiceTemplate = `kind: Service -apiVersion: v1 -metadata: - name: %s - namespace: %s - annotations: - prometheus.io/path: "/metrics" - prometheus.io/scrape: "true" - prometheus.io/port: "9100" -spec: - ports: - - name: node-exporter - protocol: TCP - port: 9100 - targetPort: 9100` - -// TODO limit filesystem reports to discoblocks (ignored-mount-points) -const metricsTeamplate = `name: discoblocks-metrics -image: bitnami/node-exporter:1.3.1 -ports: -- containerPort: 9100 - protocol: TCP -command: -- /opt/bitnami/node-exporter/bin/node_exporter -- --collector.disable-defaults -- --collector.filesystem` - -// TODO maybe a config map for templates makes sense -const sidecarTeamplate = `name: discoblocks-manager -image: alpine:3.15.4 -command: -- sleep -- infinity -volumeMounts: -- name: dev - mountPath: /host/dev -securityContext: - allowPrivilegeEscalation: true - privileged: true` +const defaultMountPattern = "/media/discoblocks/%s-%d" // RenderMountPoint calculates mount point func RenderMountPoint(pattern, name string, index int) string { @@ -62,7 +24,7 @@ func RenderMountPoint(pattern, name string, index int) string { } if index != 0 && !strings.Contains(pattern, "%d") { - pattern = pattern + "-%d" + pattern += "-%d" } if !strings.Contains(pattern, "%d") { @@ -72,6 +34,23 @@ func RenderMountPoint(pattern, name string, index int) string { return fmt.Sprintf(pattern, index) } +// GetMountPointIndex calculates index by mount point +func GetMountPointIndex(pattern, name, mountPoint string) int { + if mountPoint == RenderMountPoint(pattern, name, 0) { + return 0 + } + + const maxDisks = 500 + + for i := 1; i < maxDisks; i++ { + if mountPoint == RenderMountPoint(pattern, name, i) { + return i + } + } + + return -1 +} + // RenderFinalizer calculates finalizer name func RenderFinalizer(name string, extras ...string) string { finalizer := fmt.Sprintf("discoblocks.io/%s", name) @@ -83,10 +62,19 @@ func RenderFinalizer(name string, extras ...string) string { return finalizer } -// RenderPVCName calculates PVC name -func RenderPVCName(elems ...string) (string, error) { +// RenderResourceName calculates resource name +func RenderResourceName(prefix bool, elems ...string) (string, error) { builder := strings.Builder{} - builder.WriteString("discoblocks") + + if len(elems) == 0 { + return "", errors.New("missing name elements") + } + + if prefix { + builder.WriteString("discoblocks") + } else { + builder.WriteString(elems[0]) + } for _, e := range elems { hash, err := Hash(e) @@ -97,37 +85,22 @@ func RenderPVCName(elems ...string) (string, error) { builder.WriteString(fmt.Sprintf("-%d", hash)) } - return builder.String(), nil -} - -// RenderMetricsService returns the metrics service -func RenderMetricsService(name, namespace string) (*corev1.Service, error) { - service := corev1.Service{} - if err := yaml.Unmarshal([]byte(fmt.Sprintf(metricsServiceTemplate, name, namespace)), &service); err != nil { - return nil, err - } - - return &service, nil -} - -// RenderMetricsSidecar returns the metrics sidecar -func RenderMetricsSidecar() (*corev1.Container, error) { - sidecar := corev1.Container{} - if err := yaml.Unmarshal([]byte(metricsTeamplate), &sidecar); err != nil { - return nil, err + l := builder.Len() + if l > maxName { + l = maxName } - return &sidecar, nil + return builder.String()[:l], nil } -// RenderManagerSidecar returns the manager sidecar -func RenderManagerSidecar() (*corev1.Container, error) { - sidecar := corev1.Container{} - if err := yaml.Unmarshal([]byte(sidecarTeamplate), &sidecar); err != nil { - return nil, err +// RenderDiskConfigLabel renders DiskConfig label +func RenderDiskConfigLabel(name string) string { + hash, err := Hash(name) + if err != nil { + panic("Unable to calculate hash, better to say good bye!") } - return &sidecar, nil + return fmt.Sprintf("discoblocks/%d", hash) } // IsContainsAll finds for a contains all b @@ -142,6 +115,18 @@ func IsContainsAll(a, b map[string]string) bool { return match == len(b) } +// GetNamePrefix returns the prefix by availability type +func GetNamePrefix(am discoblocksondatiov1.AvailabilityMode, uniquePerConfig string) string { + switch am { + case discoblocksondatiov1.ReadWriteOnce: + return time.Now().String() + case discoblocksondatiov1.ReadWriteSame: + return uniquePerConfig + default: + panic("Missing availability mode implementation: " + string(am)) + } +} + // ParsePrometheusMetric parses Prometheus metrisc details func ParsePrometheusMetric(metric string) (map[string]*dto.MetricFamily, error) { var parser expfmt.TextParser @@ -167,3 +152,23 @@ func ParsePrometheusMetricValue(metric string) (float64, error) { return f, err } + +// CompareStringNaturalOrder compares string in natural order +func CompareStringNaturalOrder(a, b string) bool { + numberRegex := regexp.MustCompile(`\d+`) + + convert := func(i string) string { + numbers := map[string]bool{} + for _, n := range numberRegex.FindAll([]byte(i), -1) { + numbers[string(n)] = true + } + + for n := range numbers { + i = strings.ReplaceAll(i, n, fmt.Sprintf("%09s", n)) + } + + return i + } + + return convert(a) < convert(b) +} diff --git a/pkg/utils/helpers_test.go b/pkg/utils/helpers_test.go index a4d860c..5367555 100644 --- a/pkg/utils/helpers_test.go +++ b/pkg/utils/helpers_test.go @@ -53,10 +53,56 @@ func TestRenderMountPoint(t *testing.T) { } } -func TestGetSidecarStub(t *testing.T) { - _, err := RenderMetricsSidecar() +func TestGetMountPointIndex(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + pattern string + mountPoint string + expectedIndex int + }{ + "without order": { + pattern: "/media/discoblocks/foo", + mountPoint: "/media/discoblocks/foo", + expectedIndex: 0, + }, + "without high order": { + pattern: "/media/discoblocks/foo", + mountPoint: "/media/discoblocks/foo-99", + expectedIndex: 99, + }, + "with order": { + pattern: "/media/discoblocks/foo-%d", + mountPoint: "/media/discoblocks/foo-0", + expectedIndex: 0, + }, + "with higher order": { + pattern: "/media/discoblocks/foo-%d", + mountPoint: "/media/discoblocks/foo-99", + expectedIndex: 99, + }, + "not found": { + pattern: "/media/discoblocks/foo", + mountPoint: "/media/discoblocks/bar", + expectedIndex: -1, + }, + "too many": { + pattern: "/media/discoblocks/foo", + mountPoint: "/media/discoblocks/foo-1000", + expectedIndex: -1, + }, + } - assert.Nil(t, err, "invalid sidecar template") + for n, c := range cases { + c := c + t.Run(n, func(t *testing.T) { + t.Parallel() + + index := GetMountPointIndex(c.pattern, "", c.mountPoint) + + assert.Equal(t, c.expectedIndex, index, "invalid index") + }) + } } func TestParsePrometheusMetric(t *testing.T) { @@ -81,3 +127,8 @@ func TestParsePrometheusMetricValue(t *testing.T) { assert.Nil(t, err, "invalid metric") assert.Equal(t, float64(1020678144), value) } + +func TestCompareStringNaturalOrder(t *testing.T) { + assert.True(t, CompareStringNaturalOrder("", "foo-1")) + assert.True(t, CompareStringNaturalOrder("foo-2", "foo-10")) +} diff --git a/pkg/utils/kube.go b/pkg/utils/kube.go new file mode 100644 index 0000000..ce764b4 --- /dev/null +++ b/pkg/utils/kube.go @@ -0,0 +1,180 @@ +package utils + +import ( + "fmt" + "regexp" + "strings" + "time" + + discoblocksondatiov1 "github.com/ondat/discoblocks/api/v1" + "github.com/ondat/discoblocks/pkg/drivers" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/yaml" +) + +// Used for Yaml indentation +const hostCommandPrefix = "\n " + +var hostCommandReplacePattern = regexp.MustCompile(`\n`) + +const metricsServiceTemplate = `kind: Service +apiVersion: v1 +metadata: + name: "%s" + namespace: "%s" + annotations: + prometheus.io/path: "/metrics" + prometheus.io/scrape: "true" + prometheus.io/port: "9100" +spec: + ports: + - name: node-exporter + protocol: TCP + port: 9100 + targetPort: 9100 +` + +const metricsTeamplate = `name: discoblocks-metrics +image: bitnami/node-exporter:1.3.1 +ports: +- containerPort: 9100 + protocol: TCP +command: +- /opt/bitnami/node-exporter/bin/node_exporter +- --collector.disable-defaults +- --collector.filesystem +` + +const hostJobTemplate = `apiVersion: batch/v1 +kind: Job +metadata: + name: "%s" + namespace: "%s" + labels: + app: discoblocks +spec: + template: + spec: + hostPID: true + nodeName: "%s" + containers: + - name: mount + image: nixery.dev/shell/gawk/gnugrep/gnused/coreutils-full/cri-tools/docker-client + env: + - name: MOUNT_POINT + value: "%s" + - name: CONTAINER_IDS + value: "%s" + - name: PVC_NAME + value: "%s" + - name: DEV + value: "%s" + - name: FS + value: "%s" + command: + - bash + - -exc + - | + %s + volumeMounts: + - mountPath: /run/containerd/containerd.sock + name: containerd-socket + readOnly: true + - mountPath: /var/run/docker.sock + name: docker-socket + readOnly: true + - mountPath: /host + name: host + securityContext: + privileged: true + restartPolicy: Never + volumes: + - hostPath: + path: /run/containerd/containerd.sock + name: containerd-socket + - hostPath: + path: /var/run/docker.sock + name: docker-socket + - hostPath: + path: / + name: host + backoffLimit: 10 +` + +// RenderMetricsService returns the metrics service +func RenderMetricsService(name, namespace string) (*corev1.Service, error) { + service := corev1.Service{} + if err := yaml.Unmarshal([]byte(fmt.Sprintf(metricsServiceTemplate, name, namespace)), &service); err != nil { + return nil, fmt.Errorf("unable to unmarshal service: %w", err) + } + + return &service, nil +} + +// RenderMetricsSidecar returns the metrics sidecar +func RenderMetricsSidecar() (*corev1.Container, error) { + sidecar := corev1.Container{} + if err := yaml.Unmarshal([]byte(metricsTeamplate), &sidecar); err != nil { + return nil, fmt.Errorf("unable to unmarshal container: %w", err) + } + + return &sidecar, nil +} + +// RenderHostJob returns the job executed on host +func RenderHostJob(pvcName, namespace, nodeName, dev, fs, mountPoint string, containerIDs []string, getCommand func() (string, error)) (*batchv1.Job, error) { + hostCommand, err := getCommand() + if err != nil { + return nil, fmt.Errorf("unable to get command: %w", err) + } + + hostCommand = string(hostCommandReplacePattern.ReplaceAll([]byte(hostCommand), []byte(hostCommandPrefix))) + + jobName, err := RenderResourceName(true, fmt.Sprintf("%d", time.Now().UnixNano()), pvcName, namespace) + if err != nil { + return nil, fmt.Errorf("unable to render resource name: %w", err) + } + + template := fmt.Sprintf(hostJobTemplate, jobName, namespace, nodeName, mountPoint, strings.Join(containerIDs, " "), pvcName, dev, fs, hostCommand) + + job := batchv1.Job{} + if err := yaml.Unmarshal([]byte(template), &job); err != nil { + println(template) + return nil, fmt.Errorf("unable to unmarshal job: %w", err) + } + + return &job, nil +} + +// NewPVC constructs a new PVC instance +func NewPVC(config *discoblocksondatiov1.DiskConfig, prefix string, driver *drivers.Driver) (*corev1.PersistentVolumeClaim, error) { + pvcName, err := RenderResourceName(true, prefix, config.Name, config.Namespace) + if err != nil { + return nil, fmt.Errorf("unable to calculate hash: %w", err) + } + + pvc, err := driver.GetPVCStub(pvcName, config.Namespace, config.Spec.StorageClassName) + if err != nil { + return nil, fmt.Errorf("unable to init a PVC: %w", err) + } + + pvc.Finalizers = []string{RenderFinalizer(config.Name)} + + pvc.Labels = map[string]string{ + "discoblocks": config.Name, + } + + pvc.Spec.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: config.Spec.Capacity, + }, + } + + pvc.Spec.AccessModes = config.Spec.AccessModes + if len(pvc.Spec.AccessModes) == 0 { + pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce} + } + + return pvc, nil +} diff --git a/pkg/utils/kube_test.go b/pkg/utils/kube_test.go new file mode 100644 index 0000000..8ece7bc --- /dev/null +++ b/pkg/utils/kube_test.go @@ -0,0 +1,25 @@ +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRenderMetricsService(t *testing.T) { + _, err := RenderMetricsService("name", "namespace") + + assert.Nil(t, err, "invalid metrics service template") +} + +func TestRenderMetricsSidecar(t *testing.T) { + _, err := RenderMetricsSidecar() + + assert.Nil(t, err, "invalid sidecar template") +} + +func TestRenderMountJob(t *testing.T) { + _, err := RenderHostJob("name", "namespace", "nodeName", "dev", "fs", "mountPoint", []string{"c1", "c2"}, func() (string, error) { return "command", nil }) + + assert.Nil(t, err, "invalid mount job template") +} diff --git a/schedulers/pod.go b/schedulers/pod.go index 02ca2bf..b248fd1 100644 --- a/schedulers/pod.go +++ b/schedulers/pod.go @@ -31,7 +31,7 @@ func (s *podFilter) Name() string { // Filter does the filtering func (s *podFilter) Filter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - logger := s.logger.WithValues("pod", pod.Name, "namespace", pod.Name, "node", nodeInfo.Node().Name) + logger := s.logger.WithValues("pod", pod.Name, "namespace", pod.Namespace, "node", nodeInfo.Node().Name) errorStatus := framework.Success if s.strict { @@ -59,7 +59,7 @@ func (s *podFilter) Filter(ctx context.Context, state *framework.CycleState, pod for i := range diskConfigs.Items { config := diskConfigs.Items[i] - if !utils.IsContainsAll(pod.Labels, config.Spec.PodSelector) { + if config.DeletionTimestamp != nil || !utils.IsContainsAll(pod.Labels, config.Spec.PodSelector) { continue } @@ -83,7 +83,6 @@ func (s *podFilter) Filter(ctx context.Context, state *framework.CycleState, pod } for scName := range storageClasses { - //nolint:govet // logger is ok to shadowing logger := logger.WithValues("sc_name", scName) logger.Info("Fetch StorageClass...") @@ -109,7 +108,7 @@ func (s *podFilter) Filter(ctx context.Context, state *framework.CycleState, pod namespace, podLabels, err := driver.GetCSIDriverDetails() if err != nil { - logger.Error(err, "Failed to call driver") + logger.Error(err, "Failed to call driver", "method", "GetCSIDriverDetails") return framework.NewStatus(errorStatus, "failed to call driver: "+sc.Provisioner) } diff --git a/schedulers/scheduler.go b/schedulers/scheduler.go index 2c35697..47509a7 100644 --- a/schedulers/scheduler.go +++ b/schedulers/scheduler.go @@ -11,7 +11,7 @@ import ( ) // log is for logging in this package -var schedulerLog = logf.Log.WithName("Scheduler") +var schedulerLog = logf.Log.WithName("schedulers.Scheduler") // Scheduler HTTP service for schedulers type Scheduler struct { diff --git a/tests/e2e/kuttl/kuttl-config-1.23.yaml b/tests/e2e/kuttl/kuttl-config-1.23.yaml index 2523e60..3701e83 100644 --- a/tests/e2e/kuttl/kuttl-config-1.23.yaml +++ b/tests/e2e/kuttl/kuttl-config-1.23.yaml @@ -6,4 +6,4 @@ kindConfig: tests/e2e/kind/kind-config-1.23.yaml startKIND: true kindContainers: - local/discoblocks:e2e -timeout: 300 +timeout: 400 diff --git a/tests/e2e/stable/storageos/00-install-dependencies.yaml b/tests/e2e/stable/storageos/00-install-dependencies.yaml index 6a3f725..2c2061f 100644 --- a/tests/e2e/stable/storageos/00-install-dependencies.yaml +++ b/tests/e2e/stable/storageos/00-install-dependencies.yaml @@ -5,4 +5,4 @@ commands: - command: kubectl apply -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/v6.0.1/client/config/crd/snapshot.storage.k8s.io_volumesnapshots.yaml - command: kubectl apply -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/v6.0.1/client/config/crd/snapshot.storage.k8s.io_volumesnapshotcontents.yaml - command: kubectl apply -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/v6.0.1/client/config/crd/snapshot.storage.k8s.io_volumesnapshotclasses.yaml - - command: kubectl storageos install --include-etcd --etcd-replicas 1 --stos-version=v2.8.0 + - command: kubectl storageos install --include-etcd --etcd-replicas 1 --stos-version=develop diff --git a/tests/e2e/stable/storageos/01-assert.yaml b/tests/e2e/stable/storageos/01-assert.yaml index eeeccb0..6cfcd85 100644 --- a/tests/e2e/stable/storageos/01-assert.yaml +++ b/tests/e2e/stable/storageos/01-assert.yaml @@ -26,15 +26,6 @@ spec: runAsNonRoot: true serviceAccount: discoblocks-controller-manager serviceAccountName: discoblocks-controller-manager - volumes: - - name: cert - secret: - defaultMode: 420 - secretName: webhook-server-cert - - configMap: - defaultMode: 420 - name: discoblocks-scheduler - name: config-volume status: availableReplicas: 1 --- diff --git a/tests/e2e/stable/storageos/01-install-discoblocks.yaml b/tests/e2e/stable/storageos/01-install-discoblocks.yaml index acbe3f3..f69e9ff 100644 --- a/tests/e2e/stable/storageos/01-install-discoblocks.yaml +++ b/tests/e2e/stable/storageos/01-install-discoblocks.yaml @@ -1,5 +1,4 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - command: awk '{gsub("ebs.csi.aws.com", "csi.storageos.com", $$0); print > "../../../../config/default/kustomization.yaml"}' ../../../../config/default/kustomization.yaml - command: sh -c 'IMG=local/discoblocks:e2e make -C ../../../.. deploy' diff --git a/tests/e2e/stable/storageos/02-assert.yaml b/tests/e2e/stable/storageos/02-assert.yaml index 945df70..9806e55 100644 --- a/tests/e2e/stable/storageos/02-assert.yaml +++ b/tests/e2e/stable/storageos/02-assert.yaml @@ -1,12 +1,12 @@ apiVersion: discoblocks.ondat.io/v1 kind: DiskConfig metadata: - name: diskconfig-sample-storageos + name: diskconfig-sample-storageos-once namespace: default spec: storageClassName: storageos capacity: 1Gi - mountPointPattern: /media/discoblocks/sample-%d + mountPointPattern: /media/discoblocks/once-%d nodeSelector: matchLabels: kubernetes.io/os: linux @@ -15,7 +15,29 @@ spec: policy: upscaleTriggerPercentage: 50 maximumCapacityOfDisk: 2Gi - maximumNumberOfDisks: 3 + maximumNumberOfDisks: 2 + coolDown: 1s +--- +apiVersion: discoblocks.ondat.io/v1 +kind: DiskConfig +metadata: + name: diskconfig-sample-storageos-same + namespace: default +spec: + storageClassName: storageos + capacity: 1Gi + availabilityMode: ReadWriteSame + mountPointPattern: /media/discoblocks/same-%d + nodeSelector: + matchLabels: + kubernetes.io/os: linux + podSelector: + app: nginx + policy: + upscaleTriggerPercentage: 50 + maximumCapacityOfDisk: 2Gi + maximumNumberOfDisks: 2 + coolDown: 1s --- apiVersion: apps/v1 kind: Deployment diff --git a/tests/e2e/stable/storageos/02-install-workload.yaml b/tests/e2e/stable/storageos/02-install-workload.yaml index ca279f8..7facd81 100644 --- a/tests/e2e/stable/storageos/02-install-workload.yaml +++ b/tests/e2e/stable/storageos/02-install-workload.yaml @@ -1,5 +1,6 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - command: kubectl apply -f diskconfig.yaml - - command: kubectl create deployment --image=nginx nginx + - command: kubectl apply -f diskconfig-readwriteonce.yaml + - command: kubectl apply -f diskconfig-readwritesame.yaml + - command: kubectl create deployment --image=nginx:1.23 nginx diff --git a/tests/e2e/stable/storageos/03-assert.tpl.yaml b/tests/e2e/stable/storageos/03-assert.tpl.yaml index 289c01c..9731f7a 100644 --- a/tests/e2e/stable/storageos/03-assert.tpl.yaml +++ b/tests/e2e/stable/storageos/03-assert.tpl.yaml @@ -7,11 +7,41 @@ metadata: volume.beta.kubernetes.io/storage-provisioner: csi.storageos.com volume.kubernetes.io/storage-provisioner: csi.storageos.com finalizers: - - discoblocks.io/diskconfig-sample-storageos + - discoblocks.io/diskconfig-sample-storageos-once - kubernetes.io/pvc-protection labels: - discoblocks: diskconfig-sample-storageos - name: #PVC_NAME# + discoblocks: diskconfig-sample-storageos-once + name: #PVC_ONCE_NAME# + namespace: default +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: storageos + volumeMode: Filesystem +status: + accessModes: + - ReadWriteOnce + capacity: + storage: 1Gi + phase: Bound +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + annotations: + pv.kubernetes.io/bind-completed: "yes" + pv.kubernetes.io/bound-by-controller: "yes" + volume.beta.kubernetes.io/storage-provisioner: csi.storageos.com + volume.kubernetes.io/storage-provisioner: csi.storageos.com + finalizers: + - discoblocks.io/diskconfig-sample-storageos-same + - kubernetes.io/pvc-protection + labels: + discoblocks: diskconfig-sample-storageos-same + name: #PVC_SAME_NAME# namespace: default spec: accessModes: @@ -34,7 +64,25 @@ metadata: finalizers: - kubernetes.io/pv-protection - external-attacher/csi-storageos-com - name: #PV_NAME# + name: #PV_ONCE_NAME# +spec: + accessModes: + - ReadWriteOnce + capacity: + storage: 1Gi + persistentVolumeReclaimPolicy: Delete + storageClassName: storageos + volumeMode: Filesystem +status: + phase: Bound +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + finalizers: + - kubernetes.io/pv-protection + - external-attacher/csi-storageos-com + name: #PV_SAME_NAME# spec: accessModes: - ReadWriteOnce @@ -51,26 +99,43 @@ kind: Pod metadata: labels: app: nginx - discoblocks/metrics: diskconfig-sample-storageos + discoblocks/2498526953: diskconfig-sample-storageos-once + discoblocks/619933494: diskconfig-sample-storageos-same + discoblocks-metrics: #POD_NAME# name: #POD_NAME# namespace: default spec: containers: - - image: nginx + - image: nginx:1.23 volumeMounts: - mountPath: /var/run/secrets/kubernetes.io/serviceaccount - - mountPath: /media/discoblocks/sample-0 - name: #PVC_NAME# + - mountPath: /media/discoblocks/once-0 + - mountPath: /media/discoblocks/same-0 - image: bitnami/node-exporter:1.3.1 volumeMounts: - - mountPath: /media/discoblocks/sample-0 - name: #PVC_NAME# + - mountPath: /media/discoblocks/once-0 + - mountPath: /media/discoblocks/same-0 - mountPath: /var/run/secrets/kubernetes.io/serviceaccount volumes: - projected: defaultMode: 420 - - name: #PVC_NAME# + - name: #PVC_ONCE_NAME# + persistentVolumeClaim: + claimName: #PVC_ONCE_NAME# + - name: #PVC_SAME_NAME# persistentVolumeClaim: - claimName: #PVC_NAME# + claimName: #PVC_SAME_NAME# status: phase: Running +--- +apiVersion: v1 +kind: Service +metadata: + labels: + discoblocks/2498526953: diskconfig-sample-storageos-once + discoblocks/619933494: diskconfig-sample-storageos-same + name: #SERVICE_NAME# + namespace: default +spec: + selector: + discoblocks-metrics: #POD_NAME# diff --git a/tests/e2e/stable/storageos/03-validate.yaml b/tests/e2e/stable/storageos/03-validate.yaml index 4baa8af..1587a85 100644 --- a/tests/e2e/stable/storageos/03-validate.yaml +++ b/tests/e2e/stable/storageos/03-validate.yaml @@ -1,5 +1,13 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - command: sh -c 'cat 03-assert.tpl.yaml | sed -e "s/#PVC_NAME#/$(kubectl get pvc -l discoblocks=diskconfig-sample-storageos --no-headers -o custom-columns=":metadata.name")/" -e "s/#PV_NAME#/$(kubectl get pvc -l discoblocks=diskconfig-sample-storageos --no-headers -o custom-columns=":spec.volumeName")/" -e "s/#POD_NAME#/$(kubectl get po -l app=nginx --no-headers -o custom-columns=":metadata.name")/" > workload/00-assert.yaml' + - command: | + sh -c 'cat 03-assert.tpl.yaml | sed \ + -e "s/#PVC_ONCE_NAME#/$(kubectl get pvc -l discoblocks=diskconfig-sample-storageos-once --no-headers -o custom-columns=":metadata.name")/" \ + -e "s/#PV_ONCE_NAME#/$(kubectl get pvc -l discoblocks=diskconfig-sample-storageos-once --no-headers -o custom-columns=":spec.volumeName")/" \ + -e "s/#PVC_SAME_NAME#/$(kubectl get pvc -l discoblocks=diskconfig-sample-storageos-same --no-headers -o custom-columns=":metadata.name")/" \ + -e "s/#PV_SAME_NAME#/$(kubectl get pvc -l discoblocks=diskconfig-sample-storageos-same --no-headers -o custom-columns=":spec.volumeName")/" \ + -e "s/#POD_NAME#/$(kubectl get po -l app=nginx --no-headers -o custom-columns=":metadata.name")/" \ + -e "s/#SERVICE_NAME#/$(kubectl get service -l discoblocks --no-headers -o custom-columns=":metadata.name")/" \ + > workload/00-assert.yaml' - command: kubectl-kuttl assert workload \ No newline at end of file diff --git a/tests/e2e/stable/storageos/04-fill-volume.yaml b/tests/e2e/stable/storageos/04-fill-volume.yaml new file mode 100644 index 0000000..3c240dc --- /dev/null +++ b/tests/e2e/stable/storageos/04-fill-volume.yaml @@ -0,0 +1,11 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - command: sh -c "kubectl exec $(kubectl get po --no-headers | tail -1 | awk '{print $1}') -- dd if=/dev/zero of=/media/discoblocks/same-0/data0 count=1000000" + - command: sleep 40 + - command: sh -c "kubectl exec $(kubectl get po --no-headers | tail -1 | awk '{print $1}') -- dd if=/dev/zero of=/media/discoblocks/same-0/data0 count=3000000" + - command: sleep 40 + - command: sh -c "kubectl exec $(kubectl get po --no-headers | tail -1 | awk '{print $1}') -- dd if=/dev/zero of=/media/discoblocks/same-1/data1 count=1000000" + - command: sleep 40 + - command: sh -c "kubectl exec $(kubectl get po --no-headers | tail -1 | awk '{print $1}') -- dd if=/dev/zero of=/media/discoblocks/same-1/data1 count=3000000" + - command: sleep 40 \ No newline at end of file diff --git a/tests/e2e/stable/storageos/05-assert.yaml b/tests/e2e/stable/storageos/05-assert.yaml new file mode 100644 index 0000000..6a4b585 --- /dev/null +++ b/tests/e2e/stable/storageos/05-assert.yaml @@ -0,0 +1,9 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: nginx + name: nginx + namespace: default +status: + availableReplicas: 1 diff --git a/tests/e2e/stable/storageos/05-restart-workload.yaml b/tests/e2e/stable/storageos/05-restart-workload.yaml new file mode 100644 index 0000000..f6fe2aa --- /dev/null +++ b/tests/e2e/stable/storageos/05-restart-workload.yaml @@ -0,0 +1,4 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - command: sh -c "kubectl delete po $(kubectl get po -l app=nginx --no-headers -o custom-columns=":metadata.name")" \ No newline at end of file diff --git a/tests/e2e/stable/storageos/06-validate-volumes.yaml b/tests/e2e/stable/storageos/06-validate-volumes.yaml new file mode 100644 index 0000000..fb3e5a4 --- /dev/null +++ b/tests/e2e/stable/storageos/06-validate-volumes.yaml @@ -0,0 +1,5 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - command: sh -c "kubectl exec $(kubectl get po | tail -1 | awk '{print $1}') -- ls -l /media/discoblocks/same-0/data0" + - command: sh -c "kubectl exec $(kubectl get po | tail -1 | awk '{print $1}') -- ls -l /media/discoblocks/same-1/data1" \ No newline at end of file diff --git a/tests/e2e/stable/storageos/diskconfig.yaml b/tests/e2e/stable/storageos/diskconfig-readwriteonce.yaml similarity index 63% rename from tests/e2e/stable/storageos/diskconfig.yaml rename to tests/e2e/stable/storageos/diskconfig-readwriteonce.yaml index 0746d73..5895902 100644 --- a/tests/e2e/stable/storageos/diskconfig.yaml +++ b/tests/e2e/stable/storageos/diskconfig-readwriteonce.yaml @@ -1,12 +1,13 @@ apiVersion: discoblocks.ondat.io/v1 kind: DiskConfig metadata: - name: diskconfig-sample-storageos - namespace: default + name: diskconfig-sample-storageos-once + labels: + discoblocks: ok spec: storageClassName: storageos capacity: 1Gi - mountPointPattern: /media/discoblocks/sample-%d + mountPointPattern: /media/discoblocks/once-%d nodeSelector: matchLabels: kubernetes.io/os: linux @@ -15,4 +16,5 @@ spec: policy: upscaleTriggerPercentage: 50 maximumCapacityOfDisk: 2Gi - maximumNumberOfDisks: 3 + maximumNumberOfDisks: 2 + coolDown: 1s diff --git a/tests/e2e/stable/storageos/diskconfig-readwritesame.yaml b/tests/e2e/stable/storageos/diskconfig-readwritesame.yaml new file mode 100644 index 0000000..e563280 --- /dev/null +++ b/tests/e2e/stable/storageos/diskconfig-readwritesame.yaml @@ -0,0 +1,21 @@ +apiVersion: discoblocks.ondat.io/v1 +kind: DiskConfig +metadata: + name: diskconfig-sample-storageos-same + labels: + discoblocks: ok +spec: + storageClassName: storageos + capacity: 1Gi + availabilityMode: ReadWriteSame + mountPointPattern: /media/discoblocks/same-%d + nodeSelector: + matchLabels: + kubernetes.io/os: linux + podSelector: + app: nginx + policy: + upscaleTriggerPercentage: 50 + maximumCapacityOfDisk: 2Gi + maximumNumberOfDisks: 2 + coolDown: 1s