Skip to content

Commit

Permalink
Add support for block volumes (vmware-tanzu#6680)
Browse files Browse the repository at this point in the history
Signed-off-by: David Zaninovic <[email protected]>
  • Loading branch information
dzaninovic authored Sep 28, 2023
1 parent a22f28e commit 8e01d1b
Show file tree
Hide file tree
Showing 33 changed files with 615 additions and 192 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6680-dzaninovic
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for block volumes with Kopia
6 changes: 6 additions & 0 deletions design/CLI/PoC/overlays/plugins/node-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
- mountPath: /host_pods
mountPropagation: HostToContainer
name: host-pods
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins
- mountPath: /scratch
name: scratch
- mountPath: /credentials
Expand All @@ -60,6 +63,9 @@ spec:
- hostPath:
path: /var/lib/kubelet/pods
name: host-pods
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
- emptyDir: {}
name: scratch
- name: cloud-credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,33 +703,38 @@ type Provider interface {
In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use the [StreamingFile](https://pkg.go.dev/github.com/kopia/[email protected]/fs#StreamingFile) to stream the device and backup to the kopia repository.
```go
func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) {
path := kopiaEntry.LocalFilesystemPath()
func getLocalBlockEntry(sourcePath string) (fs.Entry, error) {
source, err := resolveSymlink(sourcePath)
if err != nil {
return nil, errors.Wrap(err, "resolveSymlink")
}

fileInfo, err := os.Lstat(path)
fileInfo, err := os.Lstat(source)
if err != nil {
return nil, errors.Wrapf(err, "Unable to get the source device information %s", path)
return nil, errors.Wrapf(err, "unable to get the source device information %s", source)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return nil, errors.Errorf("Source path %s is not a block device", path)
return nil, errors.Errorf("source path %s is not a block device", source)
}
device, err := os.Open(path)

device, err := os.Open(source)
if err != nil {
if os.IsPermission(err) || err.Error() == ErrNotPermitted {
return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path)
return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source)
}
return nil, errors.Wrapf(err, "Unable to open the source device %s", path)
return nil, errors.Wrapf(err, "unable to open the source device %s", source)
}
return virtualfs.StreamingFileFromReader(kopiaEntry.Name(), device), nil

sf := virtualfs.StreamingFileFromReader(source, device)
return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil
}
```
In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
```go
if volMode == PersistentVolumeFilesystem {
if volMode == uploader.PersistentVolumeFilesystem {
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := os.ReadDir(source)
if err != nil {
Expand All @@ -742,15 +747,17 @@ In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
source = filepath.Clean(source)
...

sourceEntry, err := getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}
var sourceEntry fs.Entry

if volMode == PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(sourceEntry, log)
if volMode == uploader.PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "unable to get local block device entry")
}
} else {
sourceEntry, err = getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local block device entry")
return nil, false, errors.Wrap(err, "unable to get local filesystem entry")
}
}

Expand All @@ -766,37 +773,24 @@ We only need to extend two functions the rest will be passed through.
```go
type BlockOutput struct {
*restore.FilesystemOutput

targetFileName string
}

var _ restore.Output = &BlockOutput{}

const bufferSize = 128 * 1024

func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {

targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
}

fileInfo, err := os.Lstat(targetFileName)
if err != nil {
return errors.Wrapf(err, "Unable to get the target device information for %s", targetFileName)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("Target file %s is not a block device", targetFileName)
}

remoteReader, err := remoteFile.Open(ctx)
if err != nil {
return errors.Wrapf(err, "Failed to open remote file %s", remoteFile.Name())
return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
}
defer remoteReader.Close()

targetFile, err := os.Create(targetFileName)
targetFile, err := os.Create(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "Failed to open file %s", targetFileName)
return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
}
defer targetFile.Close()

Expand All @@ -807,7 +801,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
bytesToWrite, err := remoteReader.Read(buffer)
if err != nil {
if err != io.EOF {
return errors.Wrapf(err, "Failed to read data from remote file %s", targetFileName)
return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName)
}
readData = false
}
Expand All @@ -819,7 +813,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
bytesToWrite -= bytesWritten
offset += bytesWritten
} else {
return errors.Wrapf(err, "Failed to write data to file %s", targetFileName)
return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName)
}
}
}
Expand All @@ -829,42 +823,43 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
}

func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
var err error
o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName)
}

fileInfo, err := os.Lstat(targetFileName)
fileInfo, err := os.Lstat(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "Unable to get the target device information for %s", o.TargetPath)
return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("Target file %s is not a block device", o.TargetPath)
return errors.Errorf("target file %s is not a block device", o.TargetPath)
}

return nil
}
```
Of note, we do need to add root access to the daemon set node agent to access the new mount.
Additional mount is required in the node-agent specification to resolve symlinks to the block devices from /host_pods/POD_ID/volumeDevices/kubernetes.io~csi directory.
```yaml
...
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins

....
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
```
Privileged mode is required to access the block devices in /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish directory as confirmed by testing on EKS and Minikube.
...
```yaml
SecurityContext: &corev1.SecurityContext{
Privileged: &c.privilegedAgent,
Privileged: &c.privilegedNodeAgent,
},

```
## Plugin Data Movers
Expand Down
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (b *PersistentVolumeBuilder) StorageClass(name string) *PersistentVolumeBui
return b
}

// VolumeMode sets the PersistentVolume's volume mode.
func (b *PersistentVolumeBuilder) VolumeMode(volMode corev1api.PersistentVolumeMode) *PersistentVolumeBuilder {
b.object.Spec.VolumeMode = &volMode
return b
}

// NodeAffinityRequired sets the PersistentVolume's NodeAffinity Requirement.
func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelector) *PersistentVolumeBuilder {
b.object.Spec.NodeAffinity = &corev1api.VolumeNodeAffinity{
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/cli/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Options struct {
BackupStorageConfig flag.Map
VolumeSnapshotConfig flag.Map
UseNodeAgent bool
PrivilegedNodeAgent bool
//TODO remove UseRestic when migration test out of using it
UseRestic bool
Wait bool
Expand Down Expand Up @@ -110,6 +111,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.RestoreOnly, "restore-only", o.RestoreOnly, "Run the server in restore-only mode. Optional.")
flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.")
flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).")
flags.BoolVar(&o.PrivilegedNodeAgent, "privileged-node-agent", o.PrivilegedNodeAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.")
flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.")
flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.")
flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)")
Expand Down Expand Up @@ -198,6 +200,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
SecretData: secretData,
RestoreOnly: o.RestoreOnly,
UseNodeAgent: o.UseNodeAgent,
PrivilegedNodeAgent: o.PrivilegedNodeAgent,
UseVolumeSnapshots: o.UseVolumeSnapshots,
BSLConfig: o.BackupStorageConfig.Data(),
VSLConfig: o.VolumeSnapshotConfig.Data(),
Expand Down
28 changes: 23 additions & 5 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

exposeParam := r.setupExposeParam(du)
exposeParam, err := r.setupExposeParam(du)
if err != nil {
return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
}

// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
Expand Down Expand Up @@ -735,18 +738,33 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string)
r.dataPathMgr.RemoveAsyncBR(duName)
}

func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} {
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
pvc := &corev1.PersistentVolumeClaim{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: du.Spec.SourceNamespace,
Name: du.Spec.SourcePVC,
}, pvc)

if err != nil {
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}

accessMode := exposer.AccessModeFileSystem
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
accessMode = exposer.AccessModeBlock
}

return &exposer.CSISnapshotExposeParam{
SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot,
SourceNamespace: du.Spec.SourceNamespace,
StorageClass: du.Spec.CSISnapshot.StorageClass,
HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name},
AccessMode: exposer.AccessModeFileSystem,
AccessMode: accessMode,
Timeout: du.Spec.OperationTimeout.Duration,
}
}, nil
}
return nil
return nil, nil
}

func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} {
Expand Down
18 changes: 17 additions & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestReconcile(t *testing.T) {
name string
du *velerov2alpha1api.DataUpload
pod *corev1.Pod
pvc *corev1.PersistentVolumeClaim
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataMgr *datapath.Manager
expectedProcessed bool
Expand Down Expand Up @@ -345,11 +346,21 @@ func TestReconcile(t *testing.T) {
}, {
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should fail to get PVC information",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "wrong-pvc"}).Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "failed to get PVC",
},
{
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
Expand Down Expand Up @@ -448,6 +459,11 @@ func TestReconcile(t *testing.T) {
require.NoError(t, err)
}

if test.pvc != nil {
err = r.client.Create(ctx, test.pvc)
require.NoError(t, err)
}

if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
Expand Down
15 changes: 3 additions & 12 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(source)

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -155,10 +155,8 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return errors.New("file system data path is not initialized")
}

volMode := getPersistentVolumeMode(target)

go func() {
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -172,13 +170,6 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return nil
}

func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
if source.ByBlock != "" {
return uploader.PersistentVolumeBlock
}
return uploader.PersistentVolumeFilesystem
}

// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Callbacks struct {
// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
ByBlock string
VolMode uploader.PersistentVolumeMode
}

// AsyncBR is the interface for asynchronous data path methods
Expand Down
Loading

0 comments on commit 8e01d1b

Please sign in to comment.