Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nomad integration to CSI plugin #13

Merged
merged 1 commit into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
docker build -t moosefs-csi --build-arg MOOSEFS_IMAGE=moosefs/client --build-arg MOOSEFS_TAG=latest --file docker/Dockerfile .
96 changes: 63 additions & 33 deletions driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
"github.com/Kunde21/moosefs-csi/driver/mfsexec"
"github.com/container-storage-interface/spec/lib/go/csi"

nomad "github.com/hashicorp/nomad/api"
k8serr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/informers"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/mount"

"google.golang.org/grpc"
Expand Down Expand Up @@ -51,14 +52,9 @@ type ControllerServer struct {
mountDir string
root string

test bool

mfscli moosefsCLI

stop chan struct{}
k8vs v1.PersistentVolumeInformer
k8nds v1.NodeInformer
mfsvs *metastore.Store
orch Orchestration
mfsvs *metastore.Store
}

type moosefsCLI interface {
Expand All @@ -67,7 +63,60 @@ type moosefsCLI interface {
GetAvailableCap(context.Context) (int64, error)
}

func NewControllerServer(k8cl kubernetes.Interface, d *mfsDriver, root, mountDir string) (*ControllerServer, error) {
type Orchestration interface {
VolumeExists(string) error
NodeExists(string) error
GetVolumeCapacity(string) (int64, error)
}

func InitK8sIntegration() (*k8sIntegration, error) {
conf, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
k8cl, err := kubernetes.NewForConfig(conf)
if err != nil {
return nil, err
}
inf := informers.NewSharedInformerFactory(k8cl, 10*time.Second)
nodes := inf.Core().V1().Nodes()
vols := inf.Core().V1().PersistentVolumes()
stop := make(chan struct{}, 1)
go vols.Informer().Run(stop)
go nodes.Informer().Run(stop)
return &k8sIntegration{
stop: stop,
vs: vols,
nds: nodes,
}, nil
}

func InitNomadIntegration() (*nomadIntegration, error) {
stop := make(chan struct{}, 1)
config := &nomad.Config{
Address: os.Getenv("NOMAD_ADDR"),
SecretID: os.Getenv("NOMAD_TOKEN"),
TLSConfig: &nomad.TLSConfig{
CACert: os.Getenv("NOMAD_CAPATH"),
ClientCert: os.Getenv("NOMAD_CLIENT_CERT"),
ClientKey: os.Getenv("NOMAD_CLIENT_KEY"),
},
}

client, err := nomad.NewClient(config)

if err != nil {
return nil, err
}

return &nomadIntegration{
client: client,
stop: stop,
}, nil
}

func NewControllerServer(orch Orchestration, d *mfsDriver, root, mountDir string) (*ControllerServer, error) {

root, mountDir = path.Clean(root), path.Clean(mountDir)
if root == "" {
root = "/"
Expand All @@ -92,12 +141,6 @@ func NewControllerServer(k8cl kubernetes.Interface, d *mfsDriver, root, mountDir
return nil, fmt.Errorf("failed to mount root directory: %w", err)
}

inf := informers.NewSharedInformerFactory(k8cl, 10*time.Second)
nodes := inf.Core().V1().Nodes()
vols := inf.Core().V1().PersistentVolumes()
stop := make(chan struct{}, 1)
go vols.Informer().Run(stop)
go nodes.Informer().Run(stop)
mfscli, err := mfsexec.New(mountDir, srv)
if err != nil {
return nil, err
Expand All @@ -113,11 +156,8 @@ func NewControllerServer(k8cl kubernetes.Interface, d *mfsDriver, root, mountDir
root: root,

mfscli: mfscli,

stop: stop,
k8vs: vols,
k8nds: nodes,
mfsvs: mvols,
mfsvs: mvols,
orch: orch,
}, nil
}

Expand Down Expand Up @@ -307,7 +347,7 @@ func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *cs
}
}

if _, err := cs.k8nds.Lister().Get(req.GetNodeId()); err != nil {
if err := cs.orch.NodeExists(req.GetNodeId()); err != nil {
if k8serr.IsNotFound(err) {
return nil, status.Error(codes.NotFound, "node does not exist")
}
Expand Down Expand Up @@ -345,17 +385,7 @@ func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *cs

// getVolumeCapacity as defined in k8s pv.
func (cs *ControllerServer) getVolumeCapacity(id string) (int64, error) {
vk8, err := cs.k8vs.Lister().Get(id)
if err != nil {
log.Println(err)
return 0, status.Error(codes.NotFound, "k8s volume does not exist")
}
c := vk8.Spec.Capacity.Storage()
cap, ok := c.AsInt64()
if !ok {
return 0, status.Error(codes.Internal, "failed to get capacity")
}
return cap, nil
return cs.orch.GetVolumeCapacity(id)
}

func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
Expand Down Expand Up @@ -396,7 +426,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
defer cancel()
_, err := cs.mfsvs.ReadVol(ctx, req.GetVolumeId())
if status.Code(err) == codes.NotFound {
if _, err := cs.k8vs.Lister().Get(req.GetVolumeId()); status.Code(err) != codes.OK {
if err := cs.orch.VolumeExists(req.GetVolumeId()); status.Code(err) != codes.OK {
return nil, status.Error(codes.NotFound, "volume does not exist")
}
}
Expand Down
69 changes: 69 additions & 0 deletions driver/orchestration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package mfs

import (
"log"

nomad "github.com/hashicorp/nomad/api"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/client-go/informers/core/v1"
)

type k8sIntegration struct {
vs v1.PersistentVolumeInformer
nds v1.NodeInformer
stop chan struct{}
}

type nomadIntegration struct {
client *nomad.Client
stop chan struct{}
}

// Implement k8s interface
func (k8s *k8sIntegration) VolumeExists(id string) error {
_, err := k8s.vs.Lister().Get(id)
return err
}

func (k8s *k8sIntegration) NodeExists(id string) error {
_, err := k8s.nds.Lister().Get(id)
return err
}

func (k8s *k8sIntegration) GetVolumeCapacity(id string) (int64, error) {
vk8, err := k8s.vs.Lister().Get(id)
if err != nil {
log.Println(err)
return 0, status.Error(codes.NotFound, "k8s volume does not exist")
}
c := vk8.Spec.Capacity.Storage()
cap, ok := c.AsInt64()
if !ok {
return 0, status.Error(codes.Internal, "failed to get capacity")
}
return cap, nil
}

// Implement Nomad interface
func (n *nomadIntegration) VolumeExists(id string) error {
q := &nomad.QueryOptions{}
_, _, err := n.client.CSIVolumes().Info(id, q)
return err
}

func (n *nomadIntegration) NodeExists(id string) error {
q := &nomad.QueryOptions{}
_, _, err := n.client.Nodes().Info(id, q)
return err
}

func (n *nomadIntegration) GetVolumeCapacity(id string) (int64, error) {
q := &nomad.QueryOptions{}
vol, _, err := n.client.CSIVolumes().Info(id, q)
if err != nil {
log.Println(err)
return 0, status.Error(codes.NotFound, "k8s volume does not exist")
}
return vol.Capacity, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ require (
k8s.io/client-go v0.19.6
k8s.io/klog v1.0.0 // indirect
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
github.com/hashicorp/nomad/api v0.0.0-20210430020956-28b8767b278f
)
Loading