Skip to content

Commit

Permalink
Changing how core pods watch the cluster to use Informer() rather tha…
Browse files Browse the repository at this point in the history
…n a manual loop
  • Loading branch information
kuzmik committed Jan 30, 2024
1 parent 4aa2d26 commit 204dc17
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 115 deletions.
6 changes: 5 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ linters:
# - testifylint
# - wrapcheck
- wsl
- whitespace
- whitespace

linters-settings:
goconst:
min-occurrences: 5
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lint:

test:
@mkdir -p coverage
@go test ./... -v -shuffle=on -coverprofile coverage/coverage.out
@go test ./... --shuffle=on --coverprofile coverage/coverage.out

coverage: test
@go tool cover -html=coverage/coverage.out
Expand All @@ -33,7 +33,7 @@ run: build
@./$(TARGET)

docker: clean lint
@docker build -f build/dev.Dockerfile . -t persona-id/proxysql-agent:latest
@docker build -f build/dev.Dockerfile -t persona-id/proxysql-agent:latest -t persona-id/proxysql-agent:1.1.0 .

snapshot: clean lint
@goreleaser --snapshot --clean
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
263 changes: 177 additions & 86 deletions internal/proxysql/core.go
Original file line number Diff line number Diff line change
@@ -1,150 +1,231 @@
package proxysql

import (
"context"
"fmt"
"log/slog"
"os"
"sort"
"strings"
"time"

"github.com/persona-id/proxysql-agent/internal/configuration"
// fuck outta here.
_ "github.com/go-sql-driver/mysql"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// ProxySQL core functions.
//
// Core mode specific settings
// The core pods need to run certain commands when specific pods joins or leaves the
// cluster, so this function sets up an informer that watches the k8s pods and runs
// functions when pods change.
//
// Joining:
//
// When a new core pod joins the cluster, one of two things happen:
// - if it's the first core pod, it uses the podAdded callback to add itself to the proxysql_servers table
// - if other core pods are already running, one of them will use add the new pod via the podUpdated function
//
// When a new satellite pod joins the cluster, the core pods all run the "LOAD X TO RUNTIME" commands, which
// accepts the new pod and distributes the configuration to it.
//
// Leaving:
//
// - When a satellite pod leaves the cluster, nothing needs to be done.
// - When a core pod leaves the cluster, the remaining core pods all delete that pod from the proxysql_servers
// table and run all of the LOAD X TO RUNTIME commands.
func (p *ProxySQL) Core() {
if p.clientset == nil {
config, err := rest.InClusterConfig()
if err != nil {
slog.Error("error", slog.Any("err", err))
}

type PodInfo struct {
PodIP string
Hostname string
UID string
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
slog.Error("error", slog.Any("err", err))
}

// Define a custom type to implement the Sort interface.
type ByPodIP []PodInfo
p.clientset = clientset
}

func (a ByPodIP) Len() int { return len(a) }
func (a ByPodIP) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByPodIP) Less(i, j int) bool { return a[i].PodIP < a[j].PodIP }
// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)

func (p *ProxySQL) Core() {
interval := p.settings.Core.Interval
app := p.settings.Core.PodSelector.App
namespace := p.settings.Core.PodSelector.Namespace

slog.Info("Core mode initialized, running loop", slog.Int("interval", interval))
labelSelector := labels.Set(map[string]string{
"app": app,
}).AsSelector()

factory := informers.NewSharedInformerFactoryWithOptions(
p.clientset,
1*time.Second,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}),
)

for {
p.coreLoop()
podInformer := factory.Core().V1().Pods().Informer()

time.Sleep(time.Duration(interval) * time.Second)
defer runtime.HandleCrash()

go factory.Start(stopper)

if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
}

func (p *ProxySQL) coreLoop() {
pods, err := GetCorePods(p.settings)
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.podAdded,
UpdateFunc: p.podUpdated,
})
if err != nil {
slog.Error("Failed to get pod info", slog.Any("error", err))
fmt.Println(err)
}

// block the main go routine from exiting
<-stopper
}

// This function is needed to do bootstrapping. At first I was using podUpdated to do adds, but we would never
// get the first pod to come up. This function will only be useful on the first core pod to come up, the rest will
// be handled via podUpdated.
//
// This feels a bit clumsy.
func (p *ProxySQL) podAdded(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
return
}

if len(pods) == 0 {
slog.Error("No pods returned")

// if the new pod is not THIS pod, bail out of this function. the rest of this function should only apply
// to the first core pod to come up in the cluster.
if hostname, _ := os.Hostname(); pod.Name != hostname {
return
}

checksumFile := "/tmp/pods-cs.txt"
digest := calculateChecksum(pods)
// check if pod is already in the proxysql_servers table; this can happen when core pods add
// other core pods.
query := fmt.Sprintf("SELECT count(*) FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP)

var count int

// Read the previous checksum from the file
old, err := os.ReadFile(checksumFile)
err := p.conn.QueryRow(query).Scan(&count)
if err != nil {
old = []byte("")
slog.Error("Error in podAdded()", slog.Any("err", err))
}

// If nothing changes, we still run LOAD PROXYSQL SERVERS TO RUNTIME in order to accept any
// new pods that have joined the cluster
if string(old) == digest {
command := "LOAD PROXYSQL SERVERS TO RUNTIME"
if count > 0 {
return
}

_, err = p.conn.Exec(command)
if err != nil {
slog.Error("Command failed to execute", slog.String("command", command), slog.Any("error", err))
}
err = p.addPodToCluster(pod)
if err != nil {
slog.Error("Error in podAdded()", slog.Any("err", err))
}
}

// We aren't using podAdded here when other core pods exist because that function doesn't always get the PodIP,
// and this one does. Using this function doesn't work when bootstrapping a cluster, because the pod has started
// before the informer has started. In other words, the pod starts before the pod can detect itself joining the
// cluster.
//
// Example pod (scaled up core-1, then scaled it back down):
//
// OLD POD NAME OLD POD IP OLD STATUS NEW POD NAME NEW POD IP NEW STATUS
// proxysql-core-1 Pending proxysql-core-1 192.168.194.102 Running
// proxysql-core-1 192.168.194.102 Running proxysql-core-1 Failed
func (p *ProxySQL) podUpdated(oldobject interface{}, newobject interface{}) {
// cast both objects into Pods, and if that fails leave the function
oldpod, ok := oldobject.(*v1.Pod)
if !ok {
return
}

commands := createCommands(pods)
for _, command := range commands {
_, err = p.conn.Exec(command)
newpod, ok := newobject.(*v1.Pod)
if !ok {
return
}

// Pod is new and transitioned to running, so we add that to the proxysql_servers table.
if oldpod.Status.Phase == "Pending" && newpod.Status.Phase == "Running" {
err := p.addPodToCluster(newpod)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
slog.Error("Error in addPod()", slog.Any("err", err))
}
}

// Write the new checksum to the file for the next run
err = os.WriteFile(checksumFile, []byte(digest), 0o600)
if err != nil {
slog.Error("Failed to write to checksum file", slog.String("file", checksumFile), slog.Any("error", err))
// Pod is shutting down. Only run this for core pods, as satellites don't need special considerations when
// they leave the cluster.
if oldpod.Status.Phase == "Running" && newpod.Status.Phase == "Failed" {
err := p.removePodFromCluster(oldpod)
if err != nil {
slog.Error("Error in removePod()", slog.Any("err", err))
}
}

slog.Info("Commands ran", slog.String("commands", strings.Join(commands, "; ")))
}

func GetCorePods(settings *configuration.Config) ([]PodInfo, error) {
app := settings.Core.PodSelector.App
component := settings.Core.PodSelector.Component
namespace := settings.Core.PodSelector.Namespace

config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
// Add the new pod to the cluster.
// - If it's a core pod, add it to the proxysql_servers table
// - if it's a satellite pod, run the commands to accept it to the cluster
func (p *ProxySQL) addPodToCluster(pod *v1.Pod) error {
slog.Info("Pod joined the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
commands := []string{}

pods, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s,component=%s", app, component),
})
// If the new pod is a core pod, delete the default entries in the proxysql_server list and add the new pod to it.
if pod.Labels["component"] == "core" {
// TODO: maybe make this configurable, not everyone will name the service this.
commands = append(commands, "DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'")

var corePods []PodInfo
for _, pod := range pods.Items {
corePods = append(corePods, PodInfo{PodIP: pod.Status.PodIP, Hostname: pod.Name, UID: string(pod.GetUID())})
cmd := fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.Status.PodIP, pod.Name)
commands = append(commands, cmd)
}

return corePods, err
}

func calculateChecksum(pods []PodInfo) string {
data := []string{}
commands = append(commands,
"LOAD PROXYSQL SERVERS TO RUNTIME",
"LOAD ADMIN VARIABLES TO RUNTIME",
"LOAD MYSQL VARIABLES TO RUNTIME",
"LOAD MYSQL SERVERS TO RUNTIME",
"LOAD MYSQL USERS TO RUNTIME",
"LOAD MYSQL QUERY RULES TO RUNTIME",
)

for _, pod := range pods {
data = append(data, fmt.Sprintf("%s:%s:%s", pod.PodIP, pod.Hostname, pod.UID))
for _, command := range commands {
_, err := p.conn.Exec(command)
if err != nil {
// FIXME: wrap error with extra info and return
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
return err
}
}

sort.Strings(data)
slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; ")))

return fmt.Sprintf("%x", data)
return nil
}

func createCommands(pods []PodInfo) []string {
sort.Sort(ByPodIP(pods))
// Remove a core pod from the cluster when it leaves. This function just deletes the pod from
// proxysql_servers based on the hostname (PodIP here, technically). The function then runs all the
// LOAD TO RUNTIME commands required to sync state to the rest of the cluster.
func (p *ProxySQL) removePodFromCluster(pod *v1.Pod) error {
slog.Info("Pod left the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

commands := []string{"DELETE FROM proxysql_servers"}
commands := []string{}

for _, pod := range pods {
commands = append(commands,
fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.PodIP, pod.Hostname),
)
if pod.Labels["component"] == "core" {
cmd := fmt.Sprintf("DELETE FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP)
commands = append(commands, cmd)
}

commands = append(commands,
Expand All @@ -156,5 +237,15 @@ func createCommands(pods []PodInfo) []string {
"LOAD MYSQL QUERY RULES TO RUNTIME",
)

return commands
for _, command := range commands {
_, err := p.conn.Exec(command)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
return err
}
}

slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; ")))

return nil
}
Loading

0 comments on commit 204dc17

Please sign in to comment.