Skip to content

Commit

Permalink
Patch newly dynamically provisioned PV with volume info to restore cu…
Browse files Browse the repository at this point in the history
…stom setting of PV

Signed-off-by: allenxu404 <[email protected]>
  • Loading branch information
allenxu404 committed Mar 18, 2024
1 parent 6ec1701 commit 67b5e82
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7504-allenxu404
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Patch newly dynamically provisioned PV with volume info to restore custom setting of PV
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelect
}
return b
}

// Phase sets the PersistentVolume's phase.
func (b *PersistentVolumeBuilder) Phase(phase corev1api.PersistentVolumePhase) *PersistentVolumeBuilder {
b.object.Status.Phase = phase
return b
}
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
newPluginManager,
backupStoreGetter,
s.metrics,
s.crClient,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer)
}
Expand Down
187 changes: 184 additions & 3 deletions pkg/controller/restore_finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,36 @@ package controller

import (
"context"
"fmt"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"

internalVolume "github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/restore"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/results"
)

const (
PVPatchMaximumDuration = 10 * time.Minute
)

type restoreFinalizerReconciler struct {
client.Client
namespace string
Expand All @@ -44,6 +56,7 @@ type restoreFinalizerReconciler struct {
backupStoreGetter persistence.ObjectBackupStoreGetter
metrics *metrics.ServerMetrics
clock clock.WithTickerAndDelayedExecution
crClient client.Client
}

func NewRestoreFinalizerReconciler(
Expand All @@ -53,6 +66,7 @@ func NewRestoreFinalizerReconciler(
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
crClient client.Client,
) *restoreFinalizerReconciler {
return &restoreFinalizerReconciler{
Client: client,
Expand All @@ -62,6 +76,7 @@ func NewRestoreFinalizerReconciler(
backupStoreGetter: backupStoreGetter,
metrics: metrics,
clock: &clock.RealClock{},
crClient: crClient,
}
}

Expand Down Expand Up @@ -123,7 +138,27 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, errors.Wrap(err, "error getting backup store")
}

finalizerCtx := &finalizerContext{log: log}
volumeInfo, err := backupStore.GetBackupVolumeInfos(restore.Spec.BackupName)
if err != nil {
log.WithError(err).Errorf("error getting volumeInfo for backup %s", restore.Spec.BackupName)
return ctrl.Result{}, errors.Wrap(err, "error getting volumeInfo")
}

restoredResourceList, err := backupStore.GetRestoredResourceList(restore.Name)
if err != nil {
log.WithError(err).Error("error getting restoredResourceList")
return ctrl.Result{}, errors.Wrap(err, "error getting restoredResourceList")
}

restoredPVCList := getRestoredPVCFromRestoredResourceList(restoredResourceList)

finalizerCtx := &finalizerContext{
logger: log,
restore: restore,
crClient: r.crClient,
volumeInfo: volumeInfo,
restoredPVCList: restoredPVCList,
}
warnings, errs := finalizerCtx.execute()

warningCnt := len(warnings.Velero) + len(warnings.Cluster)
Expand Down Expand Up @@ -200,14 +235,160 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R
// finalizerContext includes all the dependencies required by finalization tasks and
// a function execute() to orderly implement task logic.
type finalizerContext struct {
log logrus.FieldLogger
logger logrus.FieldLogger
restore *velerov1api.Restore
crClient client.Client
volumeInfo []*internalVolume.VolumeInfo
restoredPVCList map[string]struct{}
}

func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam)
warnings, errs := results.Result{}, results.Result{}

// implement finalization tasks
ctx.log.Debug("Starting running execute()")
pdpErrs := ctx.patchDynamicPVWithVolumeInfo()
errs.Merge(&pdpErrs)

return warnings, errs
}

// patchDynamicPV patches newly dynamically provisioned PV using volume info
// in order to restore custom settings that would otherwise be lost during dynamic PV recreation.
func (ctx *finalizerContext) patchDynamicPVWithVolumeInfo() (errs results.Result) {
ctx.logger.Info("patching newly dynamically provisioned PV starts")

var pvWaitGroup sync.WaitGroup
var resultLock sync.Mutex
maxConcurrency := 3
semaphore := make(chan struct{}, maxConcurrency)

for _, volumeItem := range ctx.volumeInfo {
if (volumeItem.BackupMethod == internalVolume.PodVolumeBackup || volumeItem.BackupMethod == internalVolume.CSISnapshot) && volumeItem.PVInfo != nil {
// Determine restored PVC namespace
restoredNamespace := volumeItem.PVCNamespace
if remapped, ok := ctx.restore.Spec.NamespaceMapping[restoredNamespace]; ok {
restoredNamespace = remapped
}

// Check if PVC was restored in previous phase
pvcKey := fmt.Sprintf("%s/%s", restoredNamespace, volumeItem.PVCName)
if _, restored := ctx.restoredPVCList[pvcKey]; !restored {
continue
}

pvWaitGroup.Add(1)
go func(volInfo internalVolume.VolumeInfo, restoredNamespace string) {
defer pvWaitGroup.Done()

semaphore <- struct{}{}

log := ctx.logger.WithField("PVC", volInfo.PVCName).WithField("PVCNamespace", restoredNamespace)
log.Debug("patching dynamic PV is in progress")

err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, PVPatchMaximumDuration, true, func(context.Context) (bool, error) {
// wait for PVC to be bound
pvc := &v1.PersistentVolumeClaim{}
err := ctx.crClient.Get(context.Background(), client.ObjectKey{Name: volInfo.PVCName, Namespace: restoredNamespace}, pvc)
if apierrors.IsNotFound(err) {
log.Debug("error not finding PVC")
return false, nil
}
if err != nil {
return false, err
}

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
log.Debugf("PVC: %s not ready", pvc.Name)
return false, nil
}

// wait for PV to be bound
pvName := pvc.Spec.VolumeName
pv := &v1.PersistentVolume{}
err = ctx.crClient.Get(context.Background(), client.ObjectKey{Name: pvName}, pv)
if apierrors.IsNotFound(err) {
log.Debugf("error not finding PV: %s", pvName)
return false, nil
}
if err != nil {
return false, err
}

if pv.Spec.ClaimRef == nil || pv.Status.Phase != v1.VolumeBound {
log.Debugf("PV: %s not ready", pvName)
return false, nil
}

// validate PV
if pv.Spec.ClaimRef.Name != pvc.Name || pv.Spec.ClaimRef.Namespace != restoredNamespace {
return false, fmt.Errorf("PV was bound by unexpected PVC, unexpected PVC: %s/%s, expected PVC: %s/%s",
pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name, restoredNamespace, pvc.Name)
}

// patch PV's reclaim policy and label using the corresponding data stored in volume info
if needPatch(pv, volInfo.PVInfo) {
updatedPV := pv.DeepCopy()
updatedPV.Labels = volInfo.PVInfo.Labels
updatedPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimPolicy(volInfo.PVInfo.ReclaimPolicy)
if err := kubeutil.PatchResource(pv, updatedPV, ctx.crClient); err != nil {
return false, err
}
log.Infof("newly dynamically provisioned PV:%s has been patched using volume info", pvName)
}

return true, nil
})

if err != nil {
err = fmt.Errorf("fail to patch dynamic PV, err: %s, PVC: %s, PV: %s", err, volInfo.PVCName, volInfo.PVName)
ctx.logger.WithError(errors.WithStack((err))).Error("err patching dynamic PV using volume info")
resultLock.Lock()
defer resultLock.Unlock()
errs.Add(restoredNamespace, err)
}

<-semaphore
}(*volumeItem, restoredNamespace)
}
}

pvWaitGroup.Wait()
ctx.logger.Info("patching newly dynamically provisioned PV ends")

return errs
}

func getRestoredPVCFromRestoredResourceList(restoredResourceList map[string][]string) map[string]struct{} {
pvcKey := "v1/PersistentVolumeClaim"
pvcList := make(map[string]struct{})

for _, pvc := range restoredResourceList[pvcKey] {
// the format of pvc string in restoredResourceList is like: "namespace/pvcName(status)"
// extract the substring before "(created)" if the status in rightmost Parenthesis is "created"
r := regexp.MustCompile(`\(([^)]+)\)`)
matches := r.FindAllStringSubmatch(pvc, -1)
if len(matches) > 0 && matches[len(matches)-1][1] == restore.ItemRestoreResultCreated {
pvcList[pvc[:len(pvc)-len("(created)")]] = struct{}{}
}
}

return pvcList
}

func needPatch(newPV *v1.PersistentVolume, pvInfo *internalVolume.PVInfo) bool {
if newPV.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimPolicy(pvInfo.ReclaimPolicy) {
return true
}

newPVLabels, pvLabels := newPV.Labels, pvInfo.Labels
for k, v := range pvLabels {
if _, ok := newPVLabels[k]; !ok {
return true
}
if newPVLabels[k] != v {
return true
}
}

return false
}
Loading

0 comments on commit 67b5e82

Please sign in to comment.