Skip to content

Commit

Permalink
Update health monitor to also use better node matching logic
Browse files Browse the repository at this point in the history
Issue #144
  • Loading branch information
disrani-px committed Sep 15, 2018
1 parent ada2f49 commit f464192
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 45 deletions.
44 changes: 43 additions & 1 deletion drivers/volume/volume.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package volume

import (
"strings"

snapshotVolume "github.com/kubernetes-incubator/external-storage/snapshot/pkg/volume"
"github.com/libopenstorage/stork/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -97,7 +99,7 @@ func Register(name string, d Driver) error {
return nil
}

// Get an external storage provider to be used with Stork.
// Get an external storage provider to be used with Stork
func Get(name string) (Driver, error) {
d, ok := volDrivers[name]
if ok {
Expand All @@ -109,3 +111,43 @@ func Get(name string) (Driver, error) {
Type: "VolumeDriver",
}
}

// IsNodeMatch There are a couple of things that need to be checked to see if the driver
// node matched the k8s node since different k8s installs set the node name,
// hostname and IPs differently
func IsNodeMatch(k8sNode *v1.Node, driverNode *NodeInfo) bool {
if driverNode == nil {
return false
}

if isHostnameMatch(driverNode.ID, k8sNode.Name) {
return true
}
for _, address := range k8sNode.Status.Addresses {
switch address.Type {
case v1.NodeHostName:
if isHostnameMatch(driverNode.Hostname, address.Address) {
return true
}
case v1.NodeInternalIP:
for _, ip := range driverNode.IPs {
if ip == address.Address {
return true
}
}
}
}
return false
}

// The driver might not return fully qualified hostnames, so check if the short
// hostname matches too
func isHostnameMatch(driverHostname string, k8sHostname string) bool {
if driverHostname == k8sHostname {
return true
}
if strings.HasPrefix(k8sHostname, driverHostname+".") {
return true
}
return false
}
46 changes: 3 additions & 43 deletions pkg/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,46 +102,6 @@ func (e *Extender) getHostname(node *v1.Node) string {
return ""
}

// There are a couple of things that need to be checked to see if the driver
// node matched the k8s node since different k8s installs set the node name,
// hostname and IPs differently
func (e *Extender) isNodeMatch(k8sNode *v1.Node, driverNode *volume.NodeInfo) bool {
if driverNode == nil {
return false
}

if e.isHostnameMatch(driverNode.ID, k8sNode.Name) {
return true
}
for _, address := range k8sNode.Status.Addresses {
switch address.Type {
case v1.NodeHostName:
if e.isHostnameMatch(driverNode.Hostname, address.Address) {
return true
}
case v1.NodeInternalIP:
for _, ip := range driverNode.IPs {
if ip == address.Address {
return true
}
}
}
}
return false
}

// The driver might not return fully qualified hostnames, so check if the short
// hostname matches too
func (e *Extender) isHostnameMatch(driverHostname string, k8sHostname string) bool {
if driverHostname == k8sHostname {
return true
}
if strings.HasPrefix(k8sHostname, driverHostname+".") {
return true
}
return false
}

func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request) {
decoder := json.NewDecoder(req.Body)
defer func() {
Expand Down Expand Up @@ -181,7 +141,7 @@ func (e *Extender) processFilterRequest(w http.ResponseWriter, req *http.Request
for _, driverNode := range driverNodes {
storklog.PodLog(pod).Debugf("nodeInfo: %v", driverNode)
if driverNode.Status == volume.NodeOnline &&
e.isNodeMatch(&node, driverNode) {
volume.IsNodeMatch(&node, driverNode) {
filteredNodes = append(filteredNodes, node)
break
}
Expand Down Expand Up @@ -240,7 +200,7 @@ func (e *Extender) getNodeScore(
for _, rack := range rackInfo.PreferredLocality {
if rack == nodeRack || nodeRack == "" {
for _, datanode := range volumeInfo.DataNodes {
if e.isNodeMatch(&node, idMap[datanode]) {
if volume.IsNodeMatch(&node, idMap[datanode]) {
return nodePriorityScore
}
}
Expand Down Expand Up @@ -326,7 +286,7 @@ func (e *Extender) processPrioritizeRequest(w http.ResponseWriter, req *http.Req
// Replace driver's hostname with the kubernetes hostname to make it
// easier to match nodes when calculating scores
for _, knode := range args.Nodes.Items {
if e.isNodeMatch(&knode, dnode) {
if volume.IsNodeMatch(&knode, dnode) {
dnode.Hostname = e.getHostname(&knode)
break
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ func (m *Monitor) Stop() error {
return nil
}

func (m *Monitor) isSameNode(k8sNodeName string, driverNode *volume.NodeInfo) bool {
if k8sNodeName == driverNode.Hostname {
return true
}
node, err := k8s.Instance().GetNodeByName(k8sNodeName)
if err != nil {
log.Errorf("Error getting node %v: %v", k8sNodeName, err)
return false
}
return volume.IsNodeMatch(node, driverNode)
}

func (m *Monitor) driverMonitor() {
defer close(m.done)
for {
Expand Down Expand Up @@ -102,7 +114,7 @@ func (m *Monitor) driverMonitor() {
continue
}

if pod.Spec.NodeName == node.Hostname &&
if m.isSameNode(pod.Spec.NodeName, node) &&
(pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodFailed) {
storklog.PodLog(&pod).Infof("Deleting Pod from Node: %v", pod.Spec.NodeName)
err = k8s.Instance().DeletePods([]v1.Pod{pod}, true)
Expand Down

0 comments on commit f464192

Please sign in to comment.