Skip to content

Commit

Permalink
recovery from node failure working somewhat
Browse files Browse the repository at this point in the history
  • Loading branch information
rbo54 committed Dec 27, 2024
1 parent 7fe3d88 commit 32332ae
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 16 deletions.
4 changes: 2 additions & 2 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package provider

import (
"github.com/dell/csi-vxflexos/v2/service"
nfs "github.com/dell/csi-nfs/pkg"
"github.com/dell/csi-md/md"
nfs "github.com/dell/csi-nfs/nfs"
"github.com/dell/csi-vxflexos/v2/service"
"github.com/dell/gocsi"
logrus "github.com/sirupsen/logrus"
)
Expand Down
28 changes: 27 additions & 1 deletion service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/dell/csi-nfs/nfs"
"github.com/dell/csi-vxflexos/v2/k8sutils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -184,6 +185,9 @@ func (s *service) CreateVolume(
if md.IsMDStorageClass(params) {
return mdsvc.CreateVolume(ctx, req)
}
if nfs.IsNFSStorageClass(params) {
return nfssvc.CreateVolume(ctx, req)
}

systemID, err := s.getSystemIDFromParameters(params)
if err != nil {
Expand Down Expand Up @@ -998,6 +1002,9 @@ func (s *service) DeleteVolume(
if md.IsMDVolumeID(csiVolID) {
return mdsvc.DeleteVolume(ctx, req)
}
if nfs.IsNFSVolumeID(csiVolID) {
csiVolID = nfs.NFSToArrayVolumeID(csiVolID)
}

isNFS := strings.Contains(csiVolID, "/")
// ensure no ambiguity if legacy vol
Expand Down Expand Up @@ -1270,7 +1277,7 @@ func (s *service) ControllerPublishVolume(
if md.IsMDVolumeID(csiVolID) {
return mdsvc.ControllerPublishVolume(ctx, req)
}
if req.VolumeContext[CsiNfsParameter] == "RWX" {
if nfs.IsNFSVolumeID(csiVolID) {
Log.Infof("csi-nfs: RWX calling nfssvc.ControllerPublishVolume")
return nfssvc.ControllerPublishVolume(ctx, req)
}
Expand Down Expand Up @@ -1609,6 +1616,10 @@ func (s *service) ControllerUnpublishVolume(
if md.IsMDVolumeID(req.GetVolumeId()) {
return mdsvc.ControllerUnpublishVolume(ctx, req)
}
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
Log.Info("csi-nfs: calling nfssrv.Controller.UnpublishVolume")
return nfssvc.ControllerUnpublishVolume(ctx, req)
}

// get systemID from req
systemID := s.getSystemIDFromCsiVolumeID(req.GetVolumeId())
Expand Down Expand Up @@ -1736,6 +1747,9 @@ func (s *service) ValidateVolumeCapabilities(
req *csi.ValidateVolumeCapabilitiesRequest) (
*csi.ValidateVolumeCapabilitiesResponse, error,
) {
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
req.VolumeId = nfs.NFSToArrayVolumeID(req.GetVolumeId())
}
csiVolID := req.GetVolumeId()
if csiVolID == "" {
return nil, status.Error(codes.InvalidArgument,
Expand Down Expand Up @@ -2589,6 +2603,9 @@ func (s *service) CreateSnapshot(
*csi.CreateSnapshotResponse, error,
) {
// Validate snapshot volume
if nfs.IsNFSSnapshotID(req.GetSourceVolumeId()) {
req.SourceVolumeId = nfs.NFSToArrayVolumeID(req.GetSourceVolumeId())
}
csiVolID := req.GetSourceVolumeId()
if csiVolID == "" {
return nil, status.Errorf(codes.InvalidArgument, "CSI volume ID to be snapped is required")
Expand Down Expand Up @@ -2825,6 +2842,9 @@ func (s *service) DeleteSnapshot(
req *csi.DeleteSnapshotRequest) (
*csi.DeleteSnapshotResponse, error,
) {
if nfs.IsNFSSnapshotID(req.GetSnapshotId()) {
req.SnapshotId = nfs.NFSToArrayVolumeID(req.GetSnapshotId())
}
// Display any secrets passed in
secrets := req.GetSecrets()
for k, v := range secrets {
Expand Down Expand Up @@ -2996,6 +3016,9 @@ func (s *service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
}
}

if nfs.IsNFSVolumeID(req.GetVolumeId()) {
req.VolumeId = nfs.NFSToArrayVolumeID(req.GetVolumeId())
}
csiVolID := req.GetVolumeId()
if csiVolID == "" {
return nil, status.Error(codes.InvalidArgument,
Expand Down Expand Up @@ -3306,6 +3329,9 @@ func (s *service) Clone(req *csi.CreateVolumeRequest,
// returns volume condition if found else returns not found
func (s *service) ControllerGetVolume(_ context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
abnormal := false
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
req.VolumeId = nfs.NFSToArrayVolumeID(req.GetVolumeId())
}
csiVolID := req.GetVolumeId()
if csiVolID == "" {
return nil, status.Error(codes.InvalidArgument,
Expand Down
55 changes: 42 additions & 13 deletions service/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

csi "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/dell/csi-md/md"
"github.com/dell/csi-nfs/nfs"
"github.com/dell/gofsutil"
"github.com/dell/goscaleio"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -59,6 +60,9 @@ func (s *service) NodeStageVolume(
if md.IsMDVolumeID(req.GetVolumeId()) {
return mdsvc.NodeStageVolume(ctx, req)
}
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
req.VolumeId = nfs.NFSToArrayVolumeID(req.GetVolumeId())
}

return nil, status.Error(codes.Unimplemented, "")
}
Expand All @@ -74,6 +78,9 @@ func (s *service) NodeUnstageVolume(
if md.IsMDVolumeID(req.GetVolumeId()) {
return mdsvc.NodeUnstageVolume(ctx, req)
}
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
req.VolumeId = nfs.NFSToArrayVolumeID(req.GetVolumeId())
}

var reqID string
headers, ok := metadata.FromIncomingContext(ctx)
Expand Down Expand Up @@ -127,6 +134,10 @@ func (s *service) NodePublishVolume(
req *csi.NodePublishVolumeRequest) (
*csi.NodePublishVolumeResponse, error) {

if nfs.IsNFSVolumeID(req.GetVolumeId()) {
return nfssvc.NodePublishVolume(ctx, req)
}

Log := getLogger(ctx)
if md.IsMDVolumeID(req.GetVolumeId()) {
// md requires a call to NodeStageVolume before node publish. The current csi-powerflex driver does not
Expand Down Expand Up @@ -158,18 +169,16 @@ func (s *service) NodePublishVolume(
Log.Infof("Error storing meta-data: %s", metadataerror)
}
volumeContext := req.GetVolumeContext()
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
req.VolumeId = nfs.NFSToArrayVolumeID(req.GetVolumeId())
}
if volumeContext != nil {
Log.Info("VolumeContext:")
for key, value := range volumeContext {
Log.WithFields(logrus.Fields{key: value}).Info("found in VolumeContext")
}
}

if req.VolumeContext["csi-nfs"] == "RWX" {
Log.Infof("csi-nfs: RWX calling nfssvc.NodeStageVolume")
return nfssvc.NodePublishVolume(ctx, req)
}

ephemeral, ok := req.VolumeContext["csi.storage.k8s.io/ephemeral"]
if ok && strings.ToLower(ephemeral) == "true" {
resp, err := s.ephemeralNodePublish(ctx, req)
Expand Down Expand Up @@ -271,6 +280,11 @@ func (s *service) NodeUnpublishVolume(
req *csi.NodeUnpublishVolumeRequest) (
*csi.NodeUnpublishVolumeResponse, error) {

var err error
if nfs.IsNFSVolumeID(req.GetVolumeId()) {
resp, err := nfssvc.NodeUnpublishVolume(ctx, req)
return resp, err
}
Log := getLogger(ctx)

var reqID string
Expand Down Expand Up @@ -301,13 +315,6 @@ func (s *service) NodeUnpublishVolume(
return resp, err
}

// temporary need to filter this somehow
// assume it's a csi-nfs volume unless rejected
resp, err := nfssvc.NodeUnpublishVolume(ctx, req)
if err == nil {
return resp, err
}

s.logStatistics()

csiVolID := req.GetVolumeId()
Expand Down Expand Up @@ -1116,12 +1123,14 @@ func getNodeUID(ctx context.Context, s *service) (string, error) {
// MountVolume finds a volume exported to the node by VolumeId, mounts to a staging path,
// The fsType and nfsExport directory are optional arguments.
// and returns that staging path or an error. This is used by csinfs.
// Note the volumeId here is an NFS volume id prepended with nfs-.
func (s *service) MountVolume(ctx context.Context, volumeId, fsType, nfsExportDirectory string) (string, error) {
Log.Infof("MountVolume called volumeId %s", volumeId)
if volumeId == "" {
return "", fmt.Errorf("mountVolume: volumeId was empty")
}
systemId := s.getSystemIDFromCsiVolumeID(volumeId)
systemId = strings.Replace(systemId, "nfs-", "", 1)
volId := getVolumeIDFromCsiVolumeID(volumeId)
if systemId == "" {
return "", fmt.Errorf("mountVolume: could not determine systemId for volumeId %s", volumeId)
Expand All @@ -1130,7 +1139,7 @@ func (s *service) MountVolume(ctx context.Context, volumeId, fsType, nfsExportDi
if err := s.requireProbe(ctx, systemId); err != nil {
return "", err
}
Log.Infof("systemId %s", systemId)
Log.Infof("systemId %s volumeId", systemId, volId)
sdcMappedVol, err := s.getSDCMappedVol(volId, systemId, publishGetMappedVolMaxRetry)
if err != nil {
return "", fmt.Errorf("mountVolume: getSDCMappedVol returned error %s", err)
Expand All @@ -1155,3 +1164,23 @@ func (s *service) MountVolume(ctx context.Context, volumeId, fsType, nfsExportDi

return target, nil
}

func (s *service) UnmountVolume(ctx context.Context, volumeId, nfsExportDirectory string) error {
if nfsExportDirectory == "" {
nfsExportDirectory = "/nfs/exports"
}
target := nfsExportDirectory + "/" + volumeId
Log.Infof("calling gofsutil.Unmount %s", target)
err := gofsutil.Unmount(ctx, target)
if err != nil &&
!strings.Contains(err.Error(), "no such file or directory") &&
!strings.Contains(err.Error(), "invalid argument") &&
!strings.Contains(err.Error(), "no mount point specified") {
return fmt.Errorf("unmountVolume: gofsutil.Unmount %s failed: %s", target, err)
}
err = os.Remove(target)
if err != nil && !strings.Contains(err.Error(), "no such file or directory") {
Log.Infof("UnmountVolume could not remove directory %s: %s", target, err)
}
return nil
}
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type Service interface {
ProcessMapSecretChange() error
VolumeIDToArrayID(string) string
MountVolume(context.Context, string, string, string) (string, error)
UnmountVolume(context.Context, string, string) error
}

type NetworkInterface interface {
Expand Down

0 comments on commit 32332ae

Please sign in to comment.