Skip to content

Commit

Permalink
Changing how core pods watch the cluster to use Informer() rather tha…
Browse files Browse the repository at this point in the history
…n a manual loop
  • Loading branch information
kuzmik committed Jan 11, 2024
1 parent 127bf5a commit 01c1555
Show file tree
Hide file tree
Showing 8 changed files with 496 additions and 114 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lint:

test:
@mkdir -p coverage
@go test ./... -v -shuffle=on -coverprofile coverage/coverage.out
@go test ./... --shuffle=on --coverprofile coverage/coverage.out

coverage: test
@go tool cover -html=coverage/coverage.out
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -56,7 +57,7 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.0 // indirect
k8s.io/api v0.29.0
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231206194836-bf4651e18aa8 // indirect
k8s.io/utils v0.0.0-20231127182322-b307cd553661 // indirect
Expand Down
261 changes: 175 additions & 86 deletions internal/proxysql/core.go
Original file line number Diff line number Diff line change
@@ -1,150 +1,229 @@
package proxysql

import (
"context"
"fmt"
"log/slog"
"os"
"sort"
"strings"
"time"

"github.com/persona-id/proxysql-agent/internal/configuration"
// fuck outta here.
_ "github.com/go-sql-driver/mysql"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// ProxySQL core functions.
//
// Core mode specific settings
// The core pods need to run certain commands when specific pods joins or leaves the
// cluster, so this function sets up an informer that watches the k8s pods and runs
// functions when pods change.
//
// Joining:
//
// When a new core pod joins the cluster, one of two things happen:
// - if it's the first core pod, it uses the podAdded callback to add itself to the proxysql_servers table
// - if other core pods are already running, one of them will use add the new pod via the podUpdated function
//
// When a new satellite pod joins the cluster, the core pods all run the "LOAD X TO RUNTIME" commands, which
// accepts the new pod and distributes the configuration to it.
//
// Leaving:
//
// - When a satellite pod leaves the cluster, nothing needs to be done.
// - When a core pod leaves the cluster, the remaining core pods all delete that pod from the proxysql_servers
// table and run all of the LOAD X TO RUNTIME commands.
func (p *ProxySQL) Core() {
if p.clientset == nil {
config, err := rest.InClusterConfig()
if err != nil {
slog.Error("error", slog.Any("err", err))
}

type PodInfo struct {
PodIP string
Hostname string
UID string
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
slog.Error("error", slog.Any("err", err))
}

// Define a custom type to implement the Sort interface.
type ByPodIP []PodInfo
p.clientset = clientset
}

func (a ByPodIP) Len() int { return len(a) }
func (a ByPodIP) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByPodIP) Less(i, j int) bool { return a[i].PodIP < a[j].PodIP }
// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)

app := p.settings.Core.PodSelector.App
namespace := p.settings.Core.PodSelector.Namespace

// create shared informers for resources in all known API group versions with a reSync period and namespace
labelSelector := labels.Set(map[string]string{
"app": app,
}).AsSelector()

factory := informers.NewSharedInformerFactoryWithOptions(
p.clientset,
1*time.Second,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}),
)

func (p *ProxySQL) Core() {
interval := p.settings.Core.Interval
podInformer := factory.Core().V1().Pods().Informer()

slog.Info("Core mode initialized, running loop", slog.Int("interval", interval))
defer runtime.HandleCrash()

for {
p.coreLoop()
go factory.Start(stopper)

time.Sleep(time.Duration(interval) * time.Second)
if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
}

func (p *ProxySQL) coreLoop() {
pods, err := GetCorePods(p.settings)
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.podAdded,
UpdateFunc: p.podUpdated,
})
if err != nil {
slog.Error("Failed to get pod info", slog.Any("error", err))
fmt.Println(err)
}

// block the main go routine from exiting
<-stopper
}

// This function is needed to do bootstrapping. At first I was using podUpdated to do adds, but we would never
// get the first pod to come up.
func (p *ProxySQL) podAdded(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
return
}

if len(pods) == 0 {
slog.Error("No pods returned")

// if the new pod is not THIS pod, bail out of this function. this stuff only applies to the current
// pod, when it starts up.
if hostname, _ := os.Hostname(); pod.Name != hostname {
return
}

checksumFile := "/tmp/pods-cs.txt"
digest := calculateChecksum(pods)
// check if pod is already in the proxysql_servers table; this can happen when core pods add
// other core pods.
query := fmt.Sprintf("SELECT count(*) FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP)

var count int

// Read the previous checksum from the file
old, err := os.ReadFile(checksumFile)
err := p.conn.QueryRow(query).Scan(&count)
if err != nil {
old = []byte("")
slog.Error("Error in podAdded()", slog.Any("err", err))
}

// If nothing changes, we still run LOAD PROXYSQL SERVERS TO RUNTIME in order to accept any
// new pods that have joined the cluster
if string(old) == digest {
command := "LOAD PROXYSQL SERVERS TO RUNTIME"
if count > 0 {
return
}

_, err = p.conn.Exec(command)
if err != nil {
slog.Error("Command failed to execute", slog.String("command", command), slog.Any("error", err))
}
err = p.addPod(pod)
if err != nil {
slog.Error("Error in podAdded()", slog.Any("err", err))
}
}

// We aren't using podAdded here when other core pods exist because that function doesn't always get the PodIP,
// and this one does. Using this function doesn't work when bootstrapping a cluster, because the pod has started
// before the informer has started. In other words, the pod starts before the pod can detect itself joining the
// cluster.
//
// Example pod (scaled up core-1, then scaled it back down):
//
// OLD POD NAME OLD POD IP OLD STATUS NEW POD NAME NEW POD IP NEW STATUS
// proxysql-core-1 Pending proxysql-core-1 192.168.194.102 Running
// proxysql-core-1 192.168.194.102 Running proxysql-core-1 Failed
func (p *ProxySQL) podUpdated(oldobject interface{}, newobject interface{}) {
// cast both objects into Pods, and if that fails leave the function
oldpod, ok := oldobject.(*v1.Pod)
if !ok {
return
}

commands := createCommands(pods)
for _, command := range commands {
_, err = p.conn.Exec(command)
newpod, ok := newobject.(*v1.Pod)
if !ok {
return
}

// Pod is new and transitioned to running, so we add that to the proxysql_servers table.
if oldpod.Status.Phase == "Pending" && newpod.Status.Phase == "Running" {
err := p.addPod(newpod)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
slog.Error("Error in addPod()", slog.Any("err", err))
}
}

// Write the new checksum to the file for the next run
err = os.WriteFile(checksumFile, []byte(digest), 0o600)
if err != nil {
slog.Error("Failed to write to checksum file", slog.String("file", checksumFile), slog.Any("error", err))
// Pod is shutting down. Only run this for core pods, as satellites don't need special considerations when
// they leave the cluster.
if oldpod.Status.Phase == "Running" && newpod.Status.Phase == "Failed" {
err := p.removePod(oldpod)
if err != nil {
slog.Error("Error in removePod()", slog.Any("err", err))
}
}

slog.Info("Commands ran", slog.String("commands", strings.Join(commands, "; ")))
}

func GetCorePods(settings *configuration.Config) ([]PodInfo, error) {
app := settings.Core.PodSelector.App
component := settings.Core.PodSelector.Component
namespace := settings.Core.PodSelector.Namespace
// Add the new pod to the cluster.
// - If it's a core pod, add it to the proxysql_servers table
// - if it's a satellite pod, run the commands to accept it to the cluster
func (p *ProxySQL) addPod(pod *v1.Pod) error {
slog.Info("Pod joined the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
commands := []string{}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

pods, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s,component=%s", app, component),
})
// If the new pod is a core pod, delete the default entries in the proxysql_server list and add the new pod to it.
if pod.Labels["component"] == "core" {
// TODO: maybe make this configurable, not everyone will name the service this.
commands = append(commands, "DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'")

var corePods []PodInfo
for _, pod := range pods.Items {
corePods = append(corePods, PodInfo{PodIP: pod.Status.PodIP, Hostname: pod.Name, UID: string(pod.GetUID())})
cmd := fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.Status.PodIP, pod.Name)
commands = append(commands, cmd)
}

return corePods, err
}

func calculateChecksum(pods []PodInfo) string {
data := []string{}
commands = append(commands,
"LOAD PROXYSQL SERVERS TO RUNTIME",
"LOAD ADMIN VARIABLES TO RUNTIME",
"LOAD MYSQL VARIABLES TO RUNTIME",
"LOAD MYSQL SERVERS TO RUNTIME",
"LOAD MYSQL USERS TO RUNTIME",
"LOAD MYSQL QUERY RULES TO RUNTIME",
)

for _, pod := range pods {
data = append(data, fmt.Sprintf("%s:%s:%s", pod.PodIP, pod.Hostname, pod.UID))
for _, command := range commands {
_, err := p.conn.Exec(command)
if err != nil {
// FIXME: wrap error with extra info and return
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
return err
}
}

sort.Strings(data)
slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; ")))

return fmt.Sprintf("%x", data)
return nil
}

func createCommands(pods []PodInfo) []string {
sort.Sort(ByPodIP(pods))
// Remove a core pod from the cluster when it leaves. This function just deletes the pod from
// proxysql_servers based on the hostname (PodIP here, technically). The function then runs all the
// LOAD TO RUNTIME commands required to sync state to the rest of the cluster.
func (p *ProxySQL) removePod(pod *v1.Pod) error {
slog.Info("Pod left the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

commands := []string{"DELETE FROM proxysql_servers"}
commands := []string{}

for _, pod := range pods {
commands = append(commands,
fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.PodIP, pod.Hostname),
)
if pod.Labels["component"] == "core" {
cmd := fmt.Sprintf("DELETE FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP)
commands = append(commands, cmd)
}

commands = append(commands,
Expand All @@ -156,5 +235,15 @@ func createCommands(pods []PodInfo) []string {
"LOAD MYSQL QUERY RULES TO RUNTIME",
)

return commands
for _, command := range commands {
_, err := p.conn.Exec(command)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
return err
}
}

slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; ")))

return nil
}
Loading

0 comments on commit 01c1555

Please sign in to comment.