Skip to content

Commit

Permalink
[core] add support for context
Browse files Browse the repository at this point in the history
  • Loading branch information
kayrus committed Nov 28, 2024
1 parent d228854 commit b6c3784
Show file tree
Hide file tree
Showing 52 changed files with 826 additions and 794 deletions.
7 changes: 4 additions & 3 deletions cmd/client-keystone-auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -156,7 +157,7 @@ func main() {
Use: "client-keystone-auth",
Short: "Keystone client credential plugin for Kubernetes",
Run: func(cmd *cobra.Command, args []string) {
handle()
handle(context.Background())
},
Version: version.Version,
}
Expand All @@ -177,7 +178,7 @@ func main() {
os.Exit(code)
}

func handle() {
func handle(ctx context.Context) {
// Generate Gophercloud Auth Options based on input data from stdin
// if IsTerminal returns "true", or from env variables otherwise.
if !term.IsTerminal(int(os.Stdin.Fd())) {
Expand Down Expand Up @@ -214,7 +215,7 @@ func handle() {
options.ClientKeyPath = clientKeyPath
options.ClientCAPath = clientCAPath

token, err := keystone.GetToken(options)
token, err := keystone.GetToken(ctx, options)
if err != nil {
if gophercloud.ResponseCodeIs(err, http.StatusUnauthorized) {
fmt.Println(errRespTemplate)
Expand Down
3 changes: 2 additions & 1 deletion cmd/k8s-keystone-auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package main

import (
"context"
"os"

"github.com/spf13/cobra"
Expand All @@ -38,7 +39,7 @@ func main() {
os.Exit(1)
}

keystoneAuth, err := keystone.NewKeystoneAuth(config)
keystoneAuth, err := keystone.NewKeystoneAuth(context.Background(), config)
if err != nil {
klog.Errorf("%v", err)
os.Exit(1)
Expand Down
8 changes: 5 additions & 3 deletions pkg/autohealing/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package cloudprovider

import (
"context"

"k8s.io/client-go/kubernetes"
log "k8s.io/klog/v2"

Expand All @@ -34,17 +36,17 @@ type CloudProvider interface {
GetName() string

// Update cluster health status.
UpdateHealthStatus([]healthcheck.NodeInfo, []healthcheck.NodeInfo) error
UpdateHealthStatus(context.Context, []healthcheck.NodeInfo, []healthcheck.NodeInfo) error

// Repair triggers the node repair process in the cloud.
Repair([]healthcheck.NodeInfo) error
Repair(context.Context, []healthcheck.NodeInfo) error

// Enabled decides if the repair should be triggered.
// It's recommended that the `Enabled()` function of the cloud provider doesn't allow to re-trigger when the repair
// is in place, e.g. before the repair process is finished, `Enabled()` should return false so that we won't
// re-trigger the repair process in the subsequent checks.
// This function also provides the cluster admin the capability to disable the cluster auto healing on the fly.
Enabled() bool
Enabled(context.Context) bool
}

type RegisterFunc func(config config.Config, client kubernetes.Interface) (CloudProvider, error)
Expand Down
58 changes: 29 additions & 29 deletions pkg/autohealing/cloudprovider/openstack/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (provider CloudProvider) GetName() string {
}

// getStackName finds the name of a stack matching a given ID.
func (provider *CloudProvider) getStackName(stackID string) (string, error) {
stack, err := stacks.Find(context.TODO(), provider.Heat, stackID).Extract()
func (provider *CloudProvider) getStackName(ctx context.Context, stackID string) (string, error) {
stack, err := stacks.Find(ctx, provider.Heat, stackID).Extract()
if err != nil {
return "", err
}
Expand All @@ -108,14 +108,14 @@ func (provider *CloudProvider) getStackName(stackID string) (string, error) {
// masters and minions(workers). The key in the map is the server/instance ID
// in Nova and the value is the resource ID and name of the server, and the
// parent stack ID and name.
func (provider *CloudProvider) getAllStackResourceMapping(stackName, stackID string) (m map[string]ResourceStackRelationship, err error) {
func (provider *CloudProvider) getAllStackResourceMapping(ctx context.Context, stackName, stackID string) (m map[string]ResourceStackRelationship, err error) {
if provider.ResourceStackMapping != nil {
return provider.ResourceStackMapping, nil
}

mapping := make(map[string]ResourceStackRelationship)

serverPages, err := stackresources.List(provider.Heat, stackName, stackID, stackresources.ListOpts{Depth: 2}).AllPages(context.TODO())
serverPages, err := stackresources.List(provider.Heat, stackName, stackID, stackresources.ListOpts{Depth: 2}).AllPages(ctx)
if err != nil {
return m, err
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func (provider CloudProvider) waitForServerDetachVolumes(serverID string, timeou
// will be kept as False, which means the node need to be rebuilt to fix it, otherwise it means the has been processed.
//
// The bool type return value means that if the node has been processed from a first time repair PoV
func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID string, firstTimeRebootNodes map[string]healthcheck.NodeInfo) (bool, error) {
func (provider CloudProvider) firstTimeRepair(ctx context.Context, n healthcheck.NodeInfo, serverID string, firstTimeRebootNodes map[string]healthcheck.NodeInfo) (bool, error) {
var firstTimeUnhealthy = true
for id := range unHealthyNodes {
log.V(5).Infof("comparing server ID %s with known broken ID %s", serverID, id)
Expand All @@ -281,7 +281,7 @@ func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID s
if firstTimeUnhealthy {
log.Infof("rebooting node %s to repair it", serverID)

if res := servers.Reboot(context.TODO(), provider.Nova, serverID, servers.RebootOpts{Type: servers.SoftReboot}); res.Err != nil {
if res := servers.Reboot(ctx, provider.Nova, serverID, servers.RebootOpts{Type: servers.SoftReboot}); res.Err != nil {
// Usually it means the node is being rebooted
log.Warningf("failed to reboot node %s, error: %v", serverID, res.Err)
if strings.Contains(res.Err.Error(), "reboot_started") {
Expand Down Expand Up @@ -351,7 +351,7 @@ func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID s
// - Heat stack ID and resource ID.
//
// For worker nodes: Call Magnum resize API directly.
func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
func (provider CloudProvider) Repair(ctx context.Context, nodes []healthcheck.NodeInfo) error {
if len(nodes) == 0 {
return nil
}
Expand All @@ -370,12 +370,12 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {

firstTimeRebootNodes := make(map[string]healthcheck.NodeInfo)

err := provider.UpdateHealthStatus(masters, workers)
err := provider.UpdateHealthStatus(ctx, masters, workers)
if err != nil {
return fmt.Errorf("failed to update the health status of cluster %s, error: %v", clusterName, err)
}

cluster, err := clusters.Get(context.TODO(), provider.Magnum, clusterName).Extract()
cluster, err := clusters.Get(ctx, provider.Magnum, clusterName).Extract()
if err != nil {
return fmt.Errorf("failed to get the cluster %s, error: %v", clusterName, err)
}
Expand All @@ -389,7 +389,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
continue
}

if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
if processed, err := provider.firstTimeRepair(ctx, n, serverID, firstTimeRebootNodes); err != nil {
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
} else if processed {
log.Infof("Node %s has been processed", serverID)
Expand All @@ -405,7 +405,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
}

nodesToReplace.Insert(serverID)
ng, err := provider.getNodeGroup(clusterName, n)
ng, err := provider.getNodeGroup(ctx, clusterName, n)
ngName := "default-worker"
ngNodeCount := &cluster.NodeCount
if err == nil {
Expand All @@ -419,7 +419,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
NodesToRemove: nodesToReplace.List(),
}

clusters.Resize(context.TODO(), provider.Magnum, clusterName, opts)
clusters.Resize(ctx, provider.Magnum, clusterName, opts)
// Wait 10 seconds to make sure Magnum has already got the request
// to avoid sending all of the resize API calls at the same time.
time.Sleep(10 * time.Second)
Expand All @@ -432,14 +432,14 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Infof("Cluster %s resized", clusterName)
}
} else {
clusterStackName, err := provider.getStackName(cluster.StackID)
clusterStackName, err := provider.getStackName(ctx, cluster.StackID)
if err != nil {
return fmt.Errorf("failed to get the Heat stack for cluster %s, error: %v", clusterName, err)
}

// In order to rebuild the nodes by Heat stack update, we need to know the parent stack ID of the resources and
// mark them unhealthy first.
allMapping, err := provider.getAllStackResourceMapping(clusterStackName, cluster.StackID)
allMapping, err := provider.getAllStackResourceMapping(ctx, clusterStackName, cluster.StackID)
if err != nil {
return fmt.Errorf("failed to get the resource stack mapping for cluster %s, error: %v", clusterName, err)
}
Expand All @@ -456,7 +456,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
continue
}

if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
if processed, err := provider.firstTimeRepair(ctx, n, serverID, firstTimeRebootNodes); err != nil {
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
} else if processed {
log.Infof("Node %s has been processed", serverID)
Expand All @@ -468,7 +468,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
} else {
// Mark root volume as unhealthy
if rootVolumeID != "" {
err = stackresources.MarkUnhealthy(context.TODO(), provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
err = stackresources.MarkUnhealthy(ctx, provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
if err != nil {
log.Errorf("failed to mark resource %s unhealthy, error: %v", rootVolumeID, err)
}
Expand All @@ -479,7 +479,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Warningf("Failed to shutdown the server %s, error: %v", serverID, err)
// If the server is failed to delete after 180s, then delete it to avoid the
// stack update failure later.
res := servers.ForceDelete(context.TODO(), provider.Nova, serverID)
res := servers.ForceDelete(ctx, provider.Nova, serverID)
if res.Err != nil {
log.Warningf("Failed to delete the server %s, error: %v", serverID, err)
}
Expand All @@ -488,15 +488,15 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Infof("Marking Nova VM %s(Heat resource %s) unhealthy for Heat stack %s", serverID, allMapping[serverID].ResourceID, cluster.StackID)

// Mark VM as unhealthy
err = stackresources.MarkUnhealthy(context.TODO(), provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
err = stackresources.MarkUnhealthy(ctx, provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
if err != nil {
log.Errorf("failed to mark resource %s unhealthy, error: %v", serverID, err)
}

delete(unHealthyNodes, serverID)
}

if err := stacks.UpdatePatch(context.TODO(), provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
if err := stacks.UpdatePatch(ctx, provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
return fmt.Errorf("failed to update Heat stack to rebuild resources, error: %v", err)
}

Expand All @@ -514,26 +514,26 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Infof("Skip node delete for %s because it's repaired by reboot", serverID)
continue
}
if err := provider.KubeClient.CoreV1().Nodes().Delete(context.TODO(), n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
if err := provider.KubeClient.CoreV1().Nodes().Delete(ctx, n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
log.Errorf("Failed to remove the node %s from cluster, error: %v", n.KubeNode.Name, err)
}
}

return nil
}

func (provider CloudProvider) getNodeGroup(clusterName string, node healthcheck.NodeInfo) (nodegroups.NodeGroup, error) {
func (provider CloudProvider) getNodeGroup(ctx context.Context, clusterName string, node healthcheck.NodeInfo) (nodegroups.NodeGroup, error) {
var ng nodegroups.NodeGroup

ngPages, err := nodegroups.List(provider.Magnum, clusterName, nodegroups.ListOpts{}).AllPages(context.TODO())
ngPages, err := nodegroups.List(provider.Magnum, clusterName, nodegroups.ListOpts{}).AllPages(ctx)
if err == nil {
ngs, err := nodegroups.ExtractNodeGroups(ngPages)
if err != nil {
log.Warningf("Failed to get node group for cluster %s, error: %v", clusterName, err)
return ng, err
}
for _, ng := range ngs {
ngInfo, err := nodegroups.Get(context.TODO(), provider.Magnum, clusterName, ng.UUID).Extract()
ngInfo, err := nodegroups.Get(ctx, provider.Magnum, clusterName, ng.UUID).Extract()
if err != nil {
log.Warningf("Failed to get node group for cluster %s, error: %v", clusterName, err)
return ng, err
Expand All @@ -555,7 +555,7 @@ func (provider CloudProvider) getNodeGroup(clusterName string, node healthcheck.

// UpdateHealthStatus can update the cluster health status to reflect the
// real-time health status of the k8s cluster.
func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo, workers []healthcheck.NodeInfo) error {
func (provider CloudProvider) UpdateHealthStatus(ctx context.Context, masters []healthcheck.NodeInfo, workers []healthcheck.NodeInfo) error {
log.Infof("start to update cluster health status.")
clusterName := provider.Config.ClusterName

Expand Down Expand Up @@ -600,7 +600,7 @@ func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo,
}

log.Infof("updating cluster health status as %s for reason %s.", healthStatus, healthStatusReason)
res := clusters.Update(context.TODO(), provider.Magnum, clusterName, updateOpts)
res := clusters.Update(ctx, provider.Magnum, clusterName, updateOpts)

if res.Err != nil {
return fmt.Errorf("failed to update the health status of cluster %s error: %v", clusterName, res.Err)
Expand All @@ -617,10 +617,10 @@ func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo,
// There are two conditions that we disable the repair:
// - The cluster admin disables the auto healing via OpenStack API.
// - The Magnum cluster is not in stable status.
func (provider CloudProvider) Enabled() bool {
func (provider CloudProvider) Enabled(ctx context.Context) bool {
clusterName := provider.Config.ClusterName

cluster, err := clusters.Get(context.TODO(), provider.Magnum, clusterName).Extract()
cluster, err := clusters.Get(ctx, provider.Magnum, clusterName).Extract()
if err != nil {
log.Warningf("failed to get the cluster %s, error: %v", clusterName, err)
return false
Expand All @@ -644,12 +644,12 @@ func (provider CloudProvider) Enabled() bool {
return false
}

clusterStackName, err := provider.getStackName(cluster.StackID)
clusterStackName, err := provider.getStackName(ctx, cluster.StackID)
if err != nil {
log.Warningf("Failed to get the Heat stack ID for cluster %s, error: %v", clusterName, err)
return false
}
stack, err := stacks.Get(context.TODO(), provider.Heat, clusterStackName, cluster.StackID).Extract()
stack, err := stacks.Get(ctx, provider.Heat, clusterStackName, cluster.StackID).Extract()
if err != nil {
log.Warningf("Failed to get Heat stack %s for cluster %s, error: %v", cluster.StackID, clusterName, err)
return false
Expand Down
5 changes: 3 additions & 2 deletions pkg/autohealing/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ var rootCmd = &cobra.Command{
"OpenStack is supported by default.",

Run: func(cmd *cobra.Command, args []string) {
ctx := context.TODO()
autohealer := controller.NewController(conf)

if !conf.LeaderElect {
autohealer.Start(context.TODO())
autohealer.Start(ctx)
panic("unreachable")
}

Expand All @@ -63,7 +64,7 @@ var rootCmd = &cobra.Command{
}

// Try and become the leader and start autohealing loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 20 * time.Second,
RenewDeadline: 15 * time.Second,
Expand Down
Loading

0 comments on commit b6c3784

Please sign in to comment.