Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Changing how core pods watch the cluster to use Informer() rather tha…
Browse files Browse the repository at this point in the history
…n a manual loop
kuzmik committed Feb 6, 2024
1 parent 4aa2d26 commit a2bf992
Showing 10 changed files with 581 additions and 124 deletions.
6 changes: 5 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -46,4 +46,8 @@ linters:
# - testifylint
# - wrapcheck
- wsl
- whitespace
- whitespace

linters-settings:
goconst:
min-occurrences: 5
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
## UNRELEASED

- More Go module updates
- Switched to using an `Informer()` to notify the agent when core pods enter/leave the cluster, rather than running get commands every X seconds; it's now push rather than pull, in other words
- Improved some of the tests

## 1.0.0

- Go module updates, fixed a breaking change that Viper introduced
- Followed some guides on go project layouts to redo the project layout
- Enabled some extra golangci checks and addressed as many of the findings as possible
- fix: add a configurable namespace to the core pod selector. We have several proxysql namespaces,
- fix: add a configurable namespace to the core pod selector. We have several proxysql namespaces,
so being able to configure it at runtime is important
- Add goreleaser config and workflow
- Added a config flag for logging format, defaults to json structured logs
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ lint:

test:
@mkdir -p coverage
@go test ./... -v -shuffle=on -coverprofile coverage/coverage.out
@go test ./... --shuffle=on --coverprofile coverage/coverage.out

coverage: test
@go tool cover -html=coverage/coverage.out
@@ -33,7 +33,7 @@ run: build
@./$(TARGET)

docker: clean lint
@docker build -f build/dev.Dockerfile . -t persona-id/proxysql-agent:latest
@docker build -f build/dev.Dockerfile -t persona-id/proxysql-agent:latest -t persona-id/proxysql-agent:1.1.0 .

snapshot: clean lint
@goreleaser --snapshot --clean
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
275 changes: 181 additions & 94 deletions internal/proxysql/core.go
Original file line number Diff line number Diff line change
@@ -1,160 +1,247 @@
package proxysql

import (
"context"
"fmt"
"log/slog"
"os"
"sort"
"strings"
"time"

"github.com/persona-id/proxysql-agent/internal/configuration"
// fuck outta here.
_ "github.com/go-sql-driver/mysql"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// ProxySQL core functions.
//
// Core mode specific settings
// The core pods need to run certain commands when specific pods joins or leaves the
// cluster, so this function sets up an informer that watches the k8s pods and runs
// functions when pods change.
//
// Joining:
//
// When a new core pod joins the cluster, one of two things happen:
// - if it's the first core pod, it uses the podAdded callback to add itself to the proxysql_servers table
// - if other core pods are already running, one of them will use add the new pod via the podUpdated function
//
// When a new satellite pod joins the cluster, the core pods all run the "LOAD X TO RUNTIME" commands, which
// accepts the new pod and distributes the configuration to it.
//
// Leaving:
//
// - When a satellite pod leaves the cluster, nothing needs to be done.
// - When a core pod leaves the cluster, the remaining core pods all delete that pod from the proxysql_servers
// table and run all of the LOAD X TO RUNTIME commands.
func (p *ProxySQL) Core() {
if p.clientset == nil {
config, err := rest.InClusterConfig()
if err != nil {
slog.Error("error", slog.Any("err", err))
panic(err)
}

type PodInfo struct {
PodIP string
Hostname string
UID string
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
slog.Error("error", slog.Any("err", err))
panic(err)
}

// Define a custom type to implement the Sort interface.
type ByPodIP []PodInfo
p.clientset = clientset
}

func (a ByPodIP) Len() int { return len(a) }
func (a ByPodIP) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByPodIP) Less(i, j int) bool { return a[i].PodIP < a[j].PodIP }
// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)

func (p *ProxySQL) Core() {
interval := p.settings.Core.Interval
app := p.settings.Core.PodSelector.App
namespace := p.settings.Core.PodSelector.Namespace

labelSelector := labels.Set(map[string]string{
"app": app,
}).AsSelector()

factory := informers.NewSharedInformerFactoryWithOptions(
p.clientset,
1*time.Second,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}),
)

podInformer := factory.Core().V1().Pods().Informer()

slog.Info("Core mode initialized, running loop", slog.Int("interval", interval))
defer runtime.HandleCrash()

for {
p.coreLoop()
go factory.Start(stopper)

time.Sleep(time.Duration(interval) * time.Second)
if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
}

func (p *ProxySQL) coreLoop() {
pods, err := GetCorePods(p.settings)
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.podAdded,
UpdateFunc: p.podUpdated,
})
if err != nil {
slog.Error("Failed to get pod info", slog.Any("error", err))
slog.Error("Error creating Informer", slog.Any("err", err))
}

// block the main go routine from exiting
<-stopper
}

// This function is needed to do bootstrapping. At first I was using podUpdated to do adds, but we would never
// get the first pod to come up. This function will only be useful on the first core pod to come up, the rest will
// be handled via podUpdated.
//
// This feels a bit clumsy.
func (p *ProxySQL) podAdded(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
return
}

if len(pods) == 0 {
slog.Error("No pods returned")

// if the new pod is not THIS pod, bail out of this function. the rest of this function should only apply
// to the first core pod to come up in the cluster.
if hostname, _ := os.Hostname(); pod.Name != hostname {
return
}

checksumFile := "/tmp/pods-cs.txt"
digest := calculateChecksum(pods)
// check if pod is already in the proxysql_servers table; this can happen when core pods add
// other core pods.
var count int

// Read the previous checksum from the file
old, err := os.ReadFile(checksumFile)
err := p.conn.QueryRow("SELECT count(*) FROM proxysql_servers WHERE hostname = ?", pod.Status.PodIP).Scan(&count)
if err != nil {
old = []byte("")
slog.Error("Error in podAdded()", slog.Any("err", err))
}

// If nothing changes, we still run LOAD PROXYSQL SERVERS TO RUNTIME in order to accept any
// new pods that have joined the cluster
if string(old) == digest {
command := "LOAD PROXYSQL SERVERS TO RUNTIME"
if count > 0 {
return
}

_, err = p.conn.Exec(command)
if err != nil {
slog.Error("Command failed to execute", slog.String("command", command), slog.Any("error", err))
}
err = p.addPodToCluster(pod)
if err != nil {
slog.Error("Error in podAdded()", slog.Any("err", err))
}
}

// We aren't using podAdded here when other core pods exist because that function doesn't always get the PodIP,
// and this one does. Using this function doesn't work when bootstrapping a cluster, because the pod has started
// before the informer has started. In other words, the pod starts before the pod can detect itself joining the
// cluster.
//
// Example pod (scaled up core-1, then scaled it back down):
//
// OLD POD NAME OLD POD IP OLD STATUS NEW POD NAME NEW POD IP NEW STATUS
// proxysql-core-1 Pending proxysql-core-1 192.168.194.102 Running
// proxysql-core-1 192.168.194.102 Running proxysql-core-1 Failed
func (p *ProxySQL) podUpdated(oldobject interface{}, newobject interface{}) {
// cast both objects into Pods, and if that fails leave the function
oldpod, ok := oldobject.(*v1.Pod)
if !ok {
return
}

commands := createCommands(pods)
for _, command := range commands {
_, err = p.conn.Exec(command)
newpod, ok := newobject.(*v1.Pod)
if !ok {
return
}

// Pod is new and transitioned to running, so we add that to the proxysql_servers table.
if oldpod.Status.Phase == "Pending" && newpod.Status.Phase == "Running" {
err := p.addPodToCluster(newpod)
if err != nil {
slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err))
slog.Error("Error in addPod()", slog.Any("err", err))
}
}

// Write the new checksum to the file for the next run
err = os.WriteFile(checksumFile, []byte(digest), 0o600)
if err != nil {
slog.Error("Failed to write to checksum file", slog.String("file", checksumFile), slog.Any("error", err))
// Pod is shutting down. Only run this for core pods, as satellites don't need special considerations when
// they leave the cluster.
if oldpod.Status.Phase == "Running" && newpod.Status.Phase == "Failed" {
err := p.removePodFromCluster(oldpod)
if err != nil {
slog.Error("Error in removePod()", slog.Any("err", err))
}
}

slog.Info("Commands ran", slog.String("commands", strings.Join(commands, "; ")))
}

func GetCorePods(settings *configuration.Config) ([]PodInfo, error) {
app := settings.Core.PodSelector.App
component := settings.Core.PodSelector.Component
namespace := settings.Core.PodSelector.Namespace

config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// Add the new pod to the cluster.
// - If it's a core pod, add it to the proxysql_servers table
// - if it's a satellite pod, run the commands to accept it to the cluster
func (p *ProxySQL) addPodToCluster(pod *v1.Pod) error {
slog.Info("Pod joined the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

pods, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s,component=%s", app, component),
})
commands := [][]any{}

var corePods []PodInfo
for _, pod := range pods.Items {
corePods = append(corePods, PodInfo{PodIP: pod.Status.PodIP, Hostname: pod.Name, UID: string(pod.GetUID())})
// If the new pod is a core pod, delete the default entries in the proxysql_server list and add the new pod to it.
if pod.Labels["component"] == "core" {
// TODO: maybe make this configurable, not everyone will name the service this.
commands = append(commands, []any{"DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'"})
commands = append(commands, []any{"INSERT INTO proxysql_servers VALUES (?, 6032, 0, ?)", pod.Status.PodIP, pod.Name})
}

return corePods, err
}

func calculateChecksum(pods []PodInfo) string {
data := []string{}
commands = append(commands,
[]any{"LOAD PROXYSQL SERVERS TO RUNTIME"},
[]any{"LOAD ADMIN VARIABLES TO RUNTIME"},
[]any{"LOAD MYSQL VARIABLES TO RUNTIME"},
[]any{"LOAD MYSQL SERVERS TO RUNTIME"},
[]any{"LOAD MYSQL USERS TO RUNTIME"},
[]any{"LOAD MYSQL QUERY RULES TO RUNTIME"},
)

for _, pod := range pods {
data = append(data, fmt.Sprintf("%s:%s:%s", pod.PodIP, pod.Hostname, pod.UID))
for _, command := range commands {
_, err := p.conn.Exec(command[0].(string), command[1:]...)
if err != nil {
// FIXME: wrap error with extra info and return
slog.Error("Commands failed", slog.Any("commands", command), slog.Any("error", err))
return err
}
}

sort.Strings(data)
slog.Debug("Ran commands", slog.Any("commands", commands))

return fmt.Sprintf("%x", data)
return nil
}

func createCommands(pods []PodInfo) []string {
sort.Sort(ByPodIP(pods))
// Remove a core pod from the cluster when it leaves. This function just deletes the pod from
// proxysql_servers based on the hostname (PodIP here, technically). The function then runs all the
// LOAD TO RUNTIME commands required to sync state to the rest of the cluster.
func (p *ProxySQL) removePodFromCluster(pod *v1.Pod) error {
slog.Info("Pod left the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP))

commands := []string{"DELETE FROM proxysql_servers"}
commands := [][]any{}

for _, pod := range pods {
commands = append(commands,
fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.PodIP, pod.Hostname),
)
if pod.Labels["component"] == "core" {
commands = append(commands, []any{"DELETE FROM proxysql_servers WHERE hostname = ?", pod.Status.PodIP})
}

commands = append(commands,
"LOAD PROXYSQL SERVERS TO RUNTIME",
"LOAD ADMIN VARIABLES TO RUNTIME",
"LOAD MYSQL VARIABLES TO RUNTIME",
"LOAD MYSQL SERVERS TO RUNTIME",
"LOAD MYSQL USERS TO RUNTIME",
"LOAD MYSQL QUERY RULES TO RUNTIME",
[]any{"LOAD PROXYSQL SERVERS TO RUNTIME"},
[]any{"LOAD ADMIN VARIABLES TO RUNTIME"},
[]any{"LOAD MYSQL VARIABLES TO RUNTIME"},
[]any{"LOAD MYSQL SERVERS TO RUNTIME"},
[]any{"LOAD MYSQL USERS TO RUNTIME"},
[]any{"LOAD MYSQL QUERY RULES TO RUNTIME"},
)

return commands
for _, command := range commands {
_, err := p.conn.Exec(command[0].(string), command[1:]...)
if err != nil {
slog.Error("Commands failed", slog.Any("commands", command), slog.Any("error", err))
return err
}
}

slog.Debug("Ran commands", slog.Any("commands", commands))

return nil
}
Loading

0 comments on commit a2bf992

Please sign in to comment.