Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: dmitry.lopatin <[email protected]>
  • Loading branch information
LopatinDmitr committed Oct 30, 2024
1 parent e599b02 commit 11bbb7f
Show file tree
Hide file tree
Showing 26 changed files with 353 additions and 67 deletions.
2 changes: 2 additions & 0 deletions api/core/v1alpha2/cvicondition/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
ClusterImageNotReady DatasourceReadyReason = "ClusterImageNotReady"
// VirtualDiskNotReady indicates that the `VirtualDisk` datasource is not ready, which prevents the import process from starting.
VirtualDiskNotReady DatasourceReadyReason = "VirtualDiskNotReady"
// VirtualDiskInUseInRunningVirtualMachine indicates that the `VirtualDisk` attached to running `VirtualMachine`
VirtualDiskInUseInRunningVirtualMachine DatasourceReadyReason = "VirtualDiskInUseInRunningVirtualMachine"

// WaitForUserUpload indicates that the `ClusterVirtualImage` is waiting for the user to upload a datasource for the import process to continue.
WaitForUserUpload ReadyReason = "WaitForUserUpload"
Expand Down
9 changes: 9 additions & 0 deletions api/core/v1alpha2/vdcondition/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ResizedType Type = "Resized"
// SnapshottingType indicates whether the disk snapshotting operation is in progress.
SnapshottingType Type = "Snapshotting"
// InUseType indicates whether the VirtualDisk is attached to a running VirtualMachine or is being used in a process of a VirtualImage creation.
InUseType Type = "InUse"
)

type (
Expand All @@ -39,6 +41,8 @@ type (
ResizedReason = string
// SnapshottingReason represents the various reasons for the Snapshotting condition type.
SnapshottingReason = string
// InUseReason represents the various reasons for the InUse condition type.
InUseReason = string
)

const (
Expand Down Expand Up @@ -83,4 +87,9 @@ const (
Snapshotting SnapshottingReason = "Snapshotting"
// SnapshottingNotAvailable indicates that the snapshotting operation is not available for now.
SnapshottingNotAvailable SnapshottingReason = "NotAvailable"

// InUseByVirtualImage indicates that the VirtualDisk is being used in a process of a VirtualImage creation.
InUseByVirtualImage InUseReason = "InUseByVirtualImage"
// InUseByVirtualMachine indicates that the VirtualDisk is attached to a running VirtualMachine.
InUseByVirtualMachine InUseReason = "InUseByVirtualMachine"
)
2 changes: 2 additions & 0 deletions api/core/v1alpha2/vicondition/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
ClusterImageNotReady DatasourceReadyReason = "ClusterImageNotReady"
// VirtualDiskNotReady indicates that the `VirtualDisk` datasource is not ready, which prevents the import process from starting.
VirtualDiskNotReady DatasourceReadyReason = "VirtualDiskNotReady"
// VirtualDiskInUseInRunningVirtualMachine indicates that the `VirtualDisk` attached to running `VirtualMachine`
VirtualDiskInUseInRunningVirtualMachine DatasourceReadyReason = "VirtualDiskInUseInRunningVirtualMachine"

// WaitForUserUpload indicates that the `VirtualImage` is waiting for the user to upload a datasource for the import process to continue.
WaitForUserUpload ReadyReason = "WaitForUserUpload"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func NewController(
protection := service.NewProtectionService(mgr.GetClient(), virtv2.FinalizerCVIProtection)
importer := service.NewImporterService(dvcr, mgr.GetClient(), importerImage, requirements, PodPullPolicy, PodVerbose, ControllerName, protection)
uploader := service.NewUploaderService(dvcr, mgr.GetClient(), uploaderImage, requirements, PodPullPolicy, PodVerbose, ControllerName, protection)
lockService := service.NewLockService(mgr.GetClient())

sources := source.NewSources()
sources.Set(virtv2.DataSourceTypeHTTP, source.NewHTTPDataSource(stat, importer, dvcr, ns))
sources.Set(virtv2.DataSourceTypeContainerImage, source.NewRegistryDataSource(stat, importer, dvcr, mgr.GetClient(), ns))
sources.Set(virtv2.DataSourceTypeObjectRef, source.NewObjectRefDataSource(stat, importer, dvcr, mgr.GetClient(), ns))
sources.Set(virtv2.DataSourceTypeObjectRef, source.NewObjectRefDataSource(stat, importer, dvcr, mgr.GetClient(), ns, lockService))
sources.Set(virtv2.DataSourceTypeUpload, source.NewUploadDataSource(stat, uploader, dvcr, ns))

reconciler := NewReconciler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/deckhouse/virtualization-controller/pkg/controller/watchers"
"github.com/deckhouse/virtualization-controller/pkg/logger"
virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
"github.com/deckhouse/virtualization/api/core/v1alpha2/vdcondition"
)

type Handler interface {
Expand Down Expand Up @@ -165,7 +166,14 @@ func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr
return false
}

return oldVD.Status.Phase != newVD.Status.Phase
oldInUseCondition, _ := service.GetCondition(vdcondition.InUseType, oldVD.Status.Conditions)
newInUseCondition, _ := service.GetCondition(vdcondition.InUseType, newVD.Status.Conditions)

if oldVD.Status.Phase != newVD.Status.Phase || len(oldVD.Status.AttachedToVirtualMachines) != len(newVD.Status.AttachedToVirtualMachines) || oldInUseCondition.Status != newInUseCondition.Status {
return true
}

return false
},
},
); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func (h DatasourceReadyHandler) Handle(ctx context.Context, cvi *virtv2.ClusterV
condition.Reason = cvicondition.VirtualDiskNotReady
condition.Message = service.CapitalizeFirstLetter(err.Error())
return reconcile.Result{}, nil
case errors.As(err, &source.VirtualDiskAttachedToRunningVMError{}):
case errors.As(err, &source.VirtualDiskInUseError{}):
condition.Status = metav1.ConditionFalse
condition.Reason = cvicondition.VirtualDiskNotReady
condition.Reason = cvicondition.VirtualDiskInUseInRunningVirtualMachine
condition.Message = service.CapitalizeFirstLetter(err.Error())
return reconcile.Result{}, nil
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,16 @@ func NewVirtualDiskNotReadyError(name string) error {
}
}

type VirtualDiskAttachedToRunningVMError struct {
name string
vmName string
type VirtualDiskInUseError struct {
name string
}

func (e VirtualDiskAttachedToRunningVMError) Error() string {
return fmt.Sprintf("VirtualDisk %q attached to running VirtualMachine %q", e.name, e.vmName)
func (e VirtualDiskInUseError) Error() string {
return fmt.Sprintf("reading from the VirtualDisk is not possible while it is in use by the running VirtualMachine/%s", e.name)
}

func NewVirtualDiskAttachedToRunningVMError(name, vmName string) error {
return VirtualDiskAttachedToRunningVMError{
name: name,
vmName: vmName,
func NewVirtualDiskInUseError(name string) error {
return VirtualDiskInUseError{
name: name,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewObjectRefDataSource(
dvcrSettings *dvcr.Settings,
client client.Client,
controllerNamespace string,
lockService *service.LockService,
) *ObjectRefDataSource {
return &ObjectRefDataSource{
statService: statService,
Expand All @@ -65,7 +66,7 @@ func NewObjectRefDataSource(
controllerNamespace: controllerNamespace,

viOnPvcSyncer: NewObjectRefVirtualImageOnPvc(importerService, dvcrSettings, statService),
vdSyncer: NewObjectRefVirtualDisk(importerService, client, controllerNamespace, dvcrSettings, statService),
vdSyncer: NewObjectRefVirtualDisk(importerService, client, controllerNamespace, dvcrSettings, statService, lockService),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/deckhouse/virtualization-controller/pkg/sdk/framework/helper"
virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
"github.com/deckhouse/virtualization/api/core/v1alpha2/cvicondition"
"github.com/deckhouse/virtualization/api/core/v1alpha2/vdcondition"
"github.com/deckhouse/virtualization/api/core/v1alpha2/vicondition"
)

Expand All @@ -45,15 +46,17 @@ type ObjectRefVirtualDisk struct {
statService Stat
dvcrSettings *dvcr.Settings
controllerNamespace string
lockService *service.LockService
}

func NewObjectRefVirtualDisk(importerService Importer, client client.Client, controllerNamespace string, dvcrSettings *dvcr.Settings, statService Stat) *ObjectRefVirtualDisk {
func NewObjectRefVirtualDisk(importerService Importer, client client.Client, controllerNamespace string, dvcrSettings *dvcr.Settings, statService Stat, lockService *service.LockService) *ObjectRefVirtualDisk {
return &ObjectRefVirtualDisk{
importerService: importerService,
statService: statService,
dvcrSettings: dvcrSettings,
client: client,
controllerNamespace: controllerNamespace,
lockService: lockService,
}
}

Expand Down Expand Up @@ -81,12 +84,28 @@ func (ds ObjectRefVirtualDisk) Sync(ctx context.Context, cvi *virtv2.ClusterVirt
return reconcile.Result{}, err
}

err = ds.lockService.UnlockVirtualDisk(ctx, vdRef)
if err != nil {
return reconcile.Result{}, err
}

return CleanUp(ctx, cvi, ds)
case cc.IsTerminating(pod):
cvi.Status.Phase = virtv2.ImagePending

log.Info("Cleaning up...")

err = ds.lockService.UnlockVirtualDisk(ctx, vdRef)
if err != nil {
return reconcile.Result{}, err
}

case pod == nil:
err = ds.lockService.LockVirtualDiskByVirtualImage(ctx, vdRef)
if err != nil {
return reconcile.Result{}, err
}

cvi.Status.Progress = ds.statService.GetProgress(cvi.GetUID(), pod, cvi.Status.Progress)
cvi.Status.Target.RegistryURL = ds.statService.GetDVCRImageName(pod)

Expand All @@ -99,8 +118,16 @@ func (ds ObjectRefVirtualDisk) Sync(ctx context.Context, cvi *virtv2.ClusterVirt
case err == nil:
// OK.
case cc.ErrQuotaExceeded(err):
err = ds.lockService.UnlockVirtualDisk(ctx, vdRef)
if err != nil {
return reconcile.Result{}, err
}
return setQuotaExceededPhaseCondition(condition, &cvi.Status.Phase, err, cvi.CreationTimestamp), nil
default:
err = ds.lockService.UnlockVirtualDisk(ctx, vdRef)
if err != nil {
return reconcile.Result{}, err
}
setPhaseConditionToFailed(condition, &cvi.Status.Phase, fmt.Errorf("unexpected error: %w", err))
return reconcile.Result{}, err
}
Expand All @@ -114,6 +141,11 @@ func (ds ObjectRefVirtualDisk) Sync(ctx context.Context, cvi *virtv2.ClusterVirt

return reconcile.Result{Requeue: true}, nil
case cc.IsPodComplete(pod):
err = ds.lockService.UnlockVirtualDisk(ctx, vdRef)
if err != nil {
return reconcile.Result{}, err
}

err = ds.statService.CheckPod(pod)
if err != nil {
cvi.Status.Phase = virtv2.ImageFailed
Expand Down Expand Up @@ -144,6 +176,10 @@ func (ds ObjectRefVirtualDisk) Sync(ctx context.Context, cvi *virtv2.ClusterVirt
default:
err = ds.statService.CheckPod(pod)
if err != nil {
unLockErr := ds.lockService.UnlockVirtualDisk(ctx, vdRef)
if unLockErr != nil {
return reconcile.Result{}, unLockErr
}
cvi.Status.Phase = virtv2.ImageFailed

switch {
Expand Down Expand Up @@ -217,16 +253,10 @@ func (ds ObjectRefVirtualDisk) Validate(ctx context.Context, cvi *virtv2.Cluster
return NewVirtualDiskNotReadyError(cvi.Spec.DataSource.ObjectRef.Name)
}

if len(vd.Status.AttachedToVirtualMachines) != 0 {
vmName := vd.Status.AttachedToVirtualMachines[0]

vm, err := helper.FetchObject(ctx, types.NamespacedName{Name: vmName.Name, Namespace: vd.Namespace}, ds.client, &virtv2.VirtualMachine{})
if err != nil {
return err
}

if vm.Status.Phase != virtv2.MachineStopped {
return NewVirtualDiskAttachedToRunningVMError(vd.Name, vmName.Name)
inUseCondition, _ := service.GetCondition(vdcondition.InUseType, vd.Status.Conditions)
if inUseCondition.Status == metav1.ConditionTrue {
if inUseCondition.Reason == vdcondition.InUseByVirtualMachine {
return NewVirtualDiskInUseError(vd.Name)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package service

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2"
"github.com/deckhouse/virtualization/api/core/v1alpha2/vdcondition"
)

type LockService struct {
client client.Client
}

func NewLockService(
client client.Client,
) *LockService {
return &LockService{
client: client,
}
}

func (s LockService) LockVirtualDiskByVirtualImage(ctx context.Context, vd *virtv2.VirtualDisk) error {
inUseCondition, ok := GetCondition(vdcondition.InUseType, vd.Status.Conditions)

if !ok || inUseCondition.Status == metav1.ConditionUnknown {
inUseCondition = metav1.Condition{
Type: vdcondition.InUseType,
Reason: vdcondition.InUseByVirtualImage,
Status: metav1.ConditionTrue,
}

SetCondition(inUseCondition, &vd.Status.Conditions)
err := s.client.Status().Update(ctx, vd)
if err != nil {
return err
}
} else {
if inUseCondition.Reason != vdcondition.InUseByVirtualImage {
return fmt.Errorf("virtual disk %q already used by running virtual machine", vd.GetName())
}
}

return nil
}

func (s LockService) UnlockVirtualDisk(ctx context.Context, vd *virtv2.VirtualDisk) error {
inUseCondition, ok := GetCondition(vdcondition.InUseType, vd.Status.Conditions)
if ok {
inUseCondition = metav1.Condition{
Type: vdcondition.InUseType,
Status: metav1.ConditionUnknown,
}

SetCondition(inUseCondition, &vd.Status.Conditions)
err := s.client.Status().Update(ctx, vd)
if err != nil {
return err
}
}

return nil
}

func (s LockService) LockVirtualDiskByVirtualMachine(ctx context.Context, vd *virtv2.VirtualDisk) error {
inUseCondition, ok := GetCondition(vdcondition.InUseType, vd.Status.Conditions)
if !ok || inUseCondition.Status == metav1.ConditionUnknown {
inUseCondition = metav1.Condition{
Type: vdcondition.InUseType,
Reason: vdcondition.InUseByVirtualMachine,
Status: metav1.ConditionTrue,
}

SetCondition(inUseCondition, &vd.Status.Conditions)
err := s.client.Status().Update(ctx, vd)
if err != nil {
return err
}
} else {
if inUseCondition.Reason != vdcondition.InUseByVirtualMachine {
return fmt.Errorf("virtual disk %q already used by creating virtual disk", vd.GetName())
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package validators
package validator

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package validators
package validator

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package validators
package validator

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr
return fmt.Errorf("error setting watch on CVIs: %w", err)
}

w := watcher.NewVirtualDiskSnapshotWatcher(mgr.GetClient())
if err := w.Watch(mgr, ctr); err != nil {
snapshotWatcher := watcher.NewVirtualDiskSnapshotWatcher(mgr.GetClient())
if err := snapshotWatcher.Watch(mgr, ctr); err != nil {
return fmt.Errorf("error setting watch on VDSnapshots: %w", err)
}

Expand Down
Loading

0 comments on commit 11bbb7f

Please sign in to comment.