Skip to content

Commit

Permalink
Changing how core pods watch the cluster to use Informer() rather tha…
Browse files Browse the repository at this point in the history
…n a manual loop
  • Loading branch information
kuzmik committed Dec 20, 2023
1 parent 127bf5a commit f105f26
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 134 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
k8s.io/client-go v0.29.0
)

require github.com/google/go-cmp v0.6.0 // indirect

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
Expand Down Expand Up @@ -56,7 +58,7 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.0 // indirect
k8s.io/api v0.29.0
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231206194836-bf4651e18aa8 // indirect
k8s.io/utils v0.0.0-20231127182322-b307cd553661 // indirect
Expand Down
231 changes: 142 additions & 89 deletions internal/proxysql/core.go
Original file line number Diff line number Diff line change
@@ -1,150 +1,193 @@
package proxysql

import (
"context"
"fmt"
"log/slog"
"os"
"sort"
"strings"
"time"

"github.com/persona-id/proxysql-agent/internal/configuration"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

//
// Core mode specific settings
//

type PodInfo struct {
PodIP string
Hostname string
UID string
}
func (p *ProxySQL) Core() {
config, err := rest.InClusterConfig()
if err != nil {
slog.Error("error", slog.Any("err", err))
}

// Define a custom type to implement the Sort interface.
type ByPodIP []PodInfo
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
slog.Error("error", slog.Any("err", err))
}

func (a ByPodIP) Len() int { return len(a) }
func (a ByPodIP) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByPodIP) Less(i, j int) bool { return a[i].PodIP < a[j].PodIP }
// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)

app := p.settings.Core.PodSelector.App
// component := p.settings.Core.PodSelector.Component
namespace := p.settings.Core.PodSelector.Namespace

// create shared informers for resources in all known API group versions with a reSync period and namespace
labelSelector := labels.Set(map[string]string{
"app": app,
// "component": component,
}).AsSelector().String()

factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
1*time.Second,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector
}),
)

func (p *ProxySQL) Core() {
interval := p.settings.Core.Interval
podInformer := factory.Core().V1().Pods().Informer()

slog.Info("Core mode initialized, running loop", slog.Int("interval", interval))
defer runtime.HandleCrash()

for {
p.coreLoop()
go factory.Start(stopper)

time.Sleep(time.Duration(interval) * time.Second)
if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
}

func (p *ProxySQL) coreLoop() {
pods, err := GetCorePods(p.settings)
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.podAdded,
UpdateFunc: p.podUpdated,
})
if err != nil {
slog.Error("Failed to get pod info", slog.Any("error", err))
fmt.Println(err)
}

// block the main go routine from exiting
<-stopper
}

// This function is needed to do bootstrapping. At first I was using podUpdated to do adds, but we would never
// get the first pod to come up.
func (p *ProxySQL) podAdded(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
return
}

if len(pods) == 0 {
slog.Error("No pods returned")

// if the new pod is not THIS pod, bail out of this function. this stuff only applies to the current
// pod, when it starts up.
if hostname, _ := os.Hostname(); pod.Name != hostname {
return
}

checksumFile := "/tmp/pods-cs.txt"
digest := calculateChecksum(pods)
fmt.Println("I am here", pod.Name, pod.Status.PodIP, pod.Status.Phase)

// check if pod is already in the proxysql_servers table; this can happen when core pods add
// other core pods.
query := fmt.Sprintf("SELECT count(*) FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP)

var count int

// Read the previous checksum from the file
old, err := os.ReadFile(checksumFile)
err := p.conn.QueryRow(query).Scan(&count)
if err != nil {
old = []byte("")
slog.Error("Error in addPod()", slog.Any("err", err))
}

// If nothing changes, we still run LOAD PROXYSQL SERVERS TO RUNTIME in order to accept any
// new pods that have joined the cluster
if string(old) == digest {
command := "LOAD PROXYSQL SERVERS TO RUNTIME"
if count > 0 {
return
}

_, err = p.conn.Exec(command)
if err != nil {
slog.Error("Command failed to execute", slog.String("command", command), slog.Any("error", err))
}
err = p.addPod(pod)
if err != nil {
slog.Error("Error in addPod()", slog.Any("err", err))
}
}

// Example output (scaled up core-1, then scaled it back down):
//
// POD MODIFIED proxysql-core-1 Pending proxysql-core-1 192.168.194.102 Running
// POD MODIFIED proxysql-core-1 192.168.194.102 Running proxysql-core-1 Failed
func (p *ProxySQL) podUpdated(oldobject interface{}, newobject interface{}) {
// cast both objects into Pods, and if that fails leave the function
oldpod, ok := oldobject.(*v1.Pod)
if !ok {
return
}

newpod, ok := newobject.(*v1.Pod)
if !ok {
return
}

commands := createCommands(pods)
for _, command := range commands {
_, err = p.conn.Exec(command)
if oldpod.Status.Phase == "Pending" && newpod.Status.Phase == "Running" {
err := p.addPod(newpod)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
slog.Error("Error in addPod()", slog.Any("err", err))
}
}

// Write the new checksum to the file for the next run
err = os.WriteFile(checksumFile, []byte(digest), 0o600)
if err != nil {
slog.Error("Failed to write to checksum file", slog.String("file", checksumFile), slog.Any("error", err))
if oldpod.Status.Phase == "Running" && newpod.Status.Phase == "Failed" {
err := p.removePod(oldpod)
if err != nil {
slog.Error("Error in removePod()", slog.Any("err", err))
}
}

slog.Info("Commands ran", slog.String("commands", strings.Join(commands, "; ")))
}

func GetCorePods(settings *configuration.Config) ([]PodInfo, error) {
app := settings.Core.PodSelector.App
component := settings.Core.PodSelector.Component
namespace := settings.Core.PodSelector.Namespace
func (p *ProxySQL) addPod(pod *v1.Pod) error {
slog.Info("Pod entered the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
commands := []string{}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

pods, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s,component=%s", app, component),
})
// If the new pod is a core pod, delete the default entries in the proxysql_server list and add the new pod to it.
if pod.Labels["component"] == "core" {
// TODO: maybe make this configurable, not everyone will name the service this.
commands = append(commands, "DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'")

var corePods []PodInfo
for _, pod := range pods.Items {
corePods = append(corePods, PodInfo{PodIP: pod.Status.PodIP, Hostname: pod.Name, UID: string(pod.GetUID())})
cmd := fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.Status.PodIP, pod.Name)
commands = append(commands, cmd)
}

return corePods, err
}

func calculateChecksum(pods []PodInfo) string {
data := []string{}
commands = append(commands,
"LOAD PROXYSQL SERVERS TO RUNTIME",
"LOAD ADMIN VARIABLES TO RUNTIME",
"LOAD MYSQL VARIABLES TO RUNTIME",
"LOAD MYSQL SERVERS TO RUNTIME",
"LOAD MYSQL USERS TO RUNTIME",
"LOAD MYSQL QUERY RULES TO RUNTIME",
)

for _, pod := range pods {
data = append(data, fmt.Sprintf("%s:%s:%s", pod.PodIP, pod.Hostname, pod.UID))
for _, command := range commands {
_, err := p.conn.Exec(command)
if err != nil {
// FIXME: wrap error with extra info and return
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
return err
}
}

sort.Strings(data)
slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; ")))

return fmt.Sprintf("%x", data)
return nil
}

func createCommands(pods []PodInfo) []string {
sort.Sort(ByPodIP(pods))
func (p *ProxySQL) removePod(pod *v1.Pod) error {
slog.Info("Pod exited the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

commands := []string{"DELETE FROM proxysql_servers"}
commands := []string{}

for _, pod := range pods {
commands = append(commands,
fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.PodIP, pod.Hostname),
)
// If the new pod is a core pod, delete the default entries in the proxysql_server list and add the new pod to it.
if pod.Labels["component"] == "core" {
cmd := fmt.Sprintf("DELETE FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP)
commands = append(commands, cmd)
}

commands = append(commands,
Expand All @@ -156,5 +199,15 @@ func createCommands(pods []PodInfo) []string {
"LOAD MYSQL QUERY RULES TO RUNTIME",
)

return commands
for _, command := range commands {
_, err := p.conn.Exec(command)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
return err
}
}

slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; ")))

return nil
}
Loading

0 comments on commit f105f26

Please sign in to comment.