Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: get volumegroup in secondary cluster #5118

Open
wants to merge 3 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/csi-addons/rbd/encryptionkeyrotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate(
rbdVol, err := mgr.GetVolumeByID(ctx, volID)
if err != nil {
switch {
case errors.Is(err, rbd.ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)
Expand Down
8 changes: 4 additions & 4 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, corerbd.ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
// caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getMirroringInfo
Expand Down Expand Up @@ -785,7 +785,7 @@ func getGRPCError(err error) error {
}

errorStatusMap := map[error]codes.Code{
corerbd.ErrImageNotFound: codes.NotFound,
util.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
Expand Down Expand Up @@ -835,7 +835,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err)

switch {
case errors.Is(err, corerbd.ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
err = status.Error(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound):
err = status.Error(codes.NotFound, err.Error())
Expand Down Expand Up @@ -872,7 +872,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
if err != nil {
log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err)

if errors.Is(err, corerbd.ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
}

Expand Down
4 changes: 2 additions & 2 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ func TestGetGRPCError(t *testing.T) {
},
{
name: "ErrImageNotFound",
err: corerbd.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, corerbd.ErrImageNotFound.Error()),
err: util.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, util.ErrImageNotFound.Error()),
},
{
name: "ErrPoolNotFound",
Expand Down
3 changes: 2 additions & 1 deletion internal/rbd/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"

"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"

Expand Down Expand Up @@ -66,7 +67,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)

return true, nil

case errors.Is(err, ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
// as the temp clone does not exist,check snapshot exists on parent volume
// snapshot name is same as temporary clone image
snap.RbdImageName = tempClone.RbdImageName
Expand Down
14 changes: 7 additions & 7 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, children, err := rbdVol.listSnapAndChildren()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
}

Expand Down Expand Up @@ -831,7 +831,7 @@ func checkContentSource(
rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err)
if !errors.Is(err, ErrImageNotFound) {
if !errors.Is(err, util.ErrImageNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -871,7 +871,7 @@ func (cs *ControllerServer) checkErrAndUndoReserve(
return &csi.DeleteVolumeResponse{}, nil
}

if errors.Is(err, ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
notFoundErr := rbdVol.ensureImageCleanup(ctx)
if notFoundErr != nil {
return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr)
Expand Down Expand Up @@ -946,7 +946,7 @@ func (cs *ControllerServer) DeleteVolume(
return nil, status.Error(codes.InvalidArgument, pErr.Error())
}
pErr = deleteMigratedVolume(ctx, pmVolID, cr)
if pErr != nil && !errors.Is(pErr, ErrImageNotFound) {
if pErr != nil && !errors.Is(pErr, util.ErrImageNotFound) {
return nil, status.Error(codes.Internal, pErr.Error())
}

Expand Down Expand Up @@ -1118,7 +1118,7 @@ func (cs *ControllerServer) CreateSnapshot(
}()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId())
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err)
Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (cs *ControllerServer) DeleteSnapshot(

// if the error is ErrImageNotFound, We need to cleanup the image from
// trash and remove the metadata in OMAP.
if errors.Is(err, ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err)

err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr)
Expand Down Expand Up @@ -1562,7 +1562,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets())
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)
Expand Down
2 changes: 0 additions & 2 deletions internal/rbd/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package rbd
import "errors"

var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
ErrImageNotFound = errors.New("image not found")
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
// given image.
ErrSnapNotFound = errors.New("snapshot not found")
Expand Down
112 changes: 95 additions & 17 deletions internal/rbd/group/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/ceph/go-ceph/rados"
Expand Down Expand Up @@ -63,18 +64,8 @@ type commonVolumeGroup struct {
journal journal.VolumeGroupJournal
}

func (cvg *commonVolumeGroup) initCommonVolumeGroup(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
) error {
csiID := util.CSIIdentifier{}
err := csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}

// generateVolumeGroup generates a commonVolumeGroup structure from the volumeGroup identifier.
func (cvg *commonVolumeGroup) generateVolumeGroup(csiID util.CSIIdentifier) error {
mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err)
Expand All @@ -85,19 +76,106 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
return fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err)
}

pool, err := util.GetPoolName(mons, creds, csiID.LocationID)
pool, err := util.GetPoolName(mons, cvg.credentials, csiID.LocationID)
if err != nil {
return fmt.Errorf("failed to get pool for volume group id %q: %w", cvg.id, err)
}

cvg.monitors = mons
cvg.namespace = namespace
cvg.pool = pool

return nil
}

// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and
// generates commonVolumeGroup structure for the mapped clusterID and poolID.
func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
ctx context.Context,
csiID util.CSIIdentifier,
mapping *[]util.ClusterMappingInfo,
) error {
mcsiID := csiID
existingClusterID := csiID.ClusterID
existingPoolID := strconv.FormatInt(csiID.LocationID, 10)

for _, cm := range *mapping {
for key, val := range cm.ClusterIDMapping {
mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID)
if mappedClusterID == "" {
continue
}

log.DebugLog(ctx,
"found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID)

// Add mapped clusterID to Identifier
mcsiID.ClusterID = mappedClusterID
for _, pools := range cm.RBDpoolIDMappingInfo {
for key, val := range pools {
mappedPoolID := util.GetMappedID(key, val, existingPoolID)
if mappedPoolID == "" {
continue
}
log.DebugLog(ctx,
"found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID)

mPID, err := strconv.ParseInt(mappedPoolID, 10, 64)
if err != nil {
return err
}
mcsiID.LocationID = mPID
err = cvg.generateVolumeGroup(mcsiID)
if util.ShouldRetryVolumeGeneration(err) {
continue
}

return err
}
}
}
}

return util.ErrPoolNotFound
}

func (cvg *commonVolumeGroup) initCommonVolumeGroup(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
) error {
csiID := util.CSIIdentifier{}

err := csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}

cvg.csiDriver = csiDriver
cvg.credentials = creds
cvg.id = id
cvg.clusterID = csiID.ClusterID
cvg.objectUUID = csiID.ObjectUUID
cvg.monitors = mons
cvg.pool = pool
cvg.namespace = namespace
// cvg.monitors, cvg.namespace, cvg.pool are set in generateVolumeGroup

err = cvg.generateVolumeGroup(csiID)
if !util.ShouldRetryVolumeGeneration(err) {
return err
}

if util.ShouldRetryVolumeGeneration(err) {
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
if err != nil {
return err
}
if mapping != nil {
err = cvg.generateVolumeGroupFromMapping(ctx, csiID, mapping)
if err != nil {
return err
}
}
}

log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id)

Expand Down
6 changes: 3 additions & 3 deletions internal/rbd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu
volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets)
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
err = fmt.Errorf("volume %s not found: %w", id, err)

return nil, err
Expand All @@ -199,7 +199,7 @@ func (mgr *rbdManager) GetSnapshotByID(ctx context.Context, id string) (types.Sn
snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets)
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
case errors.Is(err, util.ErrImageNotFound):
err = fmt.Errorf("volume %s not found: %w", id, err)

return nil, err
Expand Down Expand Up @@ -467,7 +467,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(

return vgs, nil
}
} else if err != nil && !errors.Is(ErrImageNotFound, err) {
} else if err != nil && !errors.Is(err, util.ErrImageNotFound) {
// ErrImageNotFound can be returned if the VolumeGroupSnapshot
// could not be found. It is expected that it does not exist
// yet, in which case it will be created below.
Expand Down
4 changes: 2 additions & 2 deletions internal/rbd/rbd_journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func checkSnapCloneExists(
// Fetch on-disk image attributes
err = vol.getImageInfo()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
err = parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
if !errors.Is(err, ErrSnapNotFound) {
Expand Down Expand Up @@ -298,7 +298,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// Fetch on-disk image attributes and compare against request
err = rv.getImageInfo()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
if errors.Is(err, util.ErrImageNotFound) {
// Need to check cloned info here not on createvolume,
if parentVol != nil {
found, cErr := rv.checkCloneImage(ctx, parentVol)
Expand Down
Loading
Loading