diff --git a/.golangci.yml b/.golangci.yml index c898265..44dc4c1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -46,4 +46,8 @@ linters: # - testifylint # - wrapcheck - wsl - - whitespace \ No newline at end of file + - whitespace + +linters-settings: + goconst: + min-occurrences: 5 diff --git a/Makefile b/Makefile index a45ad44..e25f089 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ lint: test: @mkdir -p coverage - @go test ./... -v -shuffle=on -coverprofile coverage/coverage.out + @go test ./... --shuffle=on --coverprofile coverage/coverage.out coverage: test @go tool cover -html=coverage/coverage.out @@ -33,7 +33,7 @@ run: build @./$(TARGET) docker: clean lint - @docker build -f build/dev.Dockerfile . -t persona-id/proxysql-agent:latest + @docker build -f build/dev.Dockerfile -t persona-id/proxysql-agent:latest -t persona-id/proxysql-agent:1.1.0 . snapshot: clean lint @goreleaser --snapshot --clean diff --git a/go.mod b/go.mod index b066fac..02d2bb7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/internal/proxysql/core.go b/internal/proxysql/core.go index 2fbabba..cd29e42 100644 --- a/internal/proxysql/core.go +++ b/internal/proxysql/core.go @@ -1,150 +1,231 @@ package proxysql import ( - "context" "fmt" "log/slog" "os" - "sort" "strings" "time" - "github.com/persona-id/proxysql-agent/internal/configuration" + // fuck outta here. + _ "github.com/go-sql-driver/mysql" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) +// ProxySQL core functions. // -// Core mode specific settings +// The core pods need to run certain commands when specific pods joins or leaves the +// cluster, so this function sets up an informer that watches the k8s pods and runs +// functions when pods change. // +// Joining: +// +// When a new core pod joins the cluster, one of two things happen: +// - if it's the first core pod, it uses the podAdded callback to add itself to the proxysql_servers table +// - if other core pods are already running, one of them will use add the new pod via the podUpdated function +// +// When a new satellite pod joins the cluster, the core pods all run the "LOAD X TO RUNTIME" commands, which +// accepts the new pod and distributes the configuration to it. +// +// Leaving: +// +// - When a satellite pod leaves the cluster, nothing needs to be done. +// - When a core pod leaves the cluster, the remaining core pods all delete that pod from the proxysql_servers +// table and run all of the LOAD X TO RUNTIME commands. +func (p *ProxySQL) Core() { + if p.clientset == nil { + config, err := rest.InClusterConfig() + if err != nil { + slog.Error("error", slog.Any("err", err)) + } -type PodInfo struct { - PodIP string - Hostname string - UID string -} + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + slog.Error("error", slog.Any("err", err)) + } -// Define a custom type to implement the Sort interface. -type ByPodIP []PodInfo + p.clientset = clientset + } -func (a ByPodIP) Len() int { return len(a) } -func (a ByPodIP) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a ByPodIP) Less(i, j int) bool { return a[i].PodIP < a[j].PodIP } + // stop signal for the informer + stopper := make(chan struct{}) + defer close(stopper) -func (p *ProxySQL) Core() { - interval := p.settings.Core.Interval + app := p.settings.Core.PodSelector.App + namespace := p.settings.Core.PodSelector.Namespace - slog.Info("Core mode initialized, running loop", slog.Int("interval", interval)) + labelSelector := labels.Set(map[string]string{ + "app": app, + }).AsSelector() + + factory := informers.NewSharedInformerFactoryWithOptions( + p.clientset, + 1*time.Second, + informers.WithNamespace(namespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labelSelector.String() + }), + ) - for { - p.coreLoop() + podInformer := factory.Core().V1().Pods().Informer() - time.Sleep(time.Duration(interval) * time.Second) + defer runtime.HandleCrash() + + go factory.Start(stopper) + + if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + return } -} -func (p *ProxySQL) coreLoop() { - pods, err := GetCorePods(p.settings) + _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: p.podAdded, + UpdateFunc: p.podUpdated, + }) if err != nil { - slog.Error("Failed to get pod info", slog.Any("error", err)) + fmt.Println(err) + } + // block the main go routine from exiting + <-stopper +} + +// This function is needed to do bootstrapping. At first I was using podUpdated to do adds, but we would never +// get the first pod to come up. This function will only be useful on the first core pod to come up, the rest will +// be handled via podUpdated. +// +// This feels a bit clumsy. +func (p *ProxySQL) podAdded(object interface{}) { + pod, ok := object.(*v1.Pod) + if !ok { return } - if len(pods) == 0 { - slog.Error("No pods returned") - + // if the new pod is not THIS pod, bail out of this function. the rest of this function should only apply + // to the first core pod to come up in the cluster. + if hostname, _ := os.Hostname(); pod.Name != hostname { return } - checksumFile := "/tmp/pods-cs.txt" - digest := calculateChecksum(pods) + // check if pod is already in the proxysql_servers table; this can happen when core pods add + // other core pods. + query := fmt.Sprintf("SELECT count(*) FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP) + + var count int - // Read the previous checksum from the file - old, err := os.ReadFile(checksumFile) + err := p.conn.QueryRow(query).Scan(&count) if err != nil { - old = []byte("") + slog.Error("Error in podAdded()", slog.Any("err", err)) } - // If nothing changes, we still run LOAD PROXYSQL SERVERS TO RUNTIME in order to accept any - // new pods that have joined the cluster - if string(old) == digest { - command := "LOAD PROXYSQL SERVERS TO RUNTIME" + if count > 0 { + return + } - _, err = p.conn.Exec(command) - if err != nil { - slog.Error("Command failed to execute", slog.String("command", command), slog.Any("error", err)) - } + err = p.addPodToCluster(pod) + if err != nil { + slog.Error("Error in podAdded()", slog.Any("err", err)) + } +} +// We aren't using podAdded here when other core pods exist because that function doesn't always get the PodIP, +// and this one does. Using this function doesn't work when bootstrapping a cluster, because the pod has started +// before the informer has started. In other words, the pod starts before the pod can detect itself joining the +// cluster. +// +// Example pod (scaled up core-1, then scaled it back down): +// +// OLD POD NAME OLD POD IP OLD STATUS NEW POD NAME NEW POD IP NEW STATUS +// proxysql-core-1 Pending proxysql-core-1 192.168.194.102 Running +// proxysql-core-1 192.168.194.102 Running proxysql-core-1 Failed +func (p *ProxySQL) podUpdated(oldobject interface{}, newobject interface{}) { + // cast both objects into Pods, and if that fails leave the function + oldpod, ok := oldobject.(*v1.Pod) + if !ok { return } - commands := createCommands(pods) - for _, command := range commands { - _, err = p.conn.Exec(command) + newpod, ok := newobject.(*v1.Pod) + if !ok { + return + } + + // Pod is new and transitioned to running, so we add that to the proxysql_servers table. + if oldpod.Status.Phase == "Pending" && newpod.Status.Phase == "Running" { + err := p.addPodToCluster(newpod) if err != nil { - slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err)) + slog.Error("Error in addPod()", slog.Any("err", err)) } } - // Write the new checksum to the file for the next run - err = os.WriteFile(checksumFile, []byte(digest), 0o600) - if err != nil { - slog.Error("Failed to write to checksum file", slog.String("file", checksumFile), slog.Any("error", err)) + // Pod is shutting down. Only run this for core pods, as satellites don't need special considerations when + // they leave the cluster. + if oldpod.Status.Phase == "Running" && newpod.Status.Phase == "Failed" { + err := p.removePodFromCluster(oldpod) + if err != nil { + slog.Error("Error in removePod()", slog.Any("err", err)) + } } - - slog.Info("Commands ran", slog.String("commands", strings.Join(commands, "; "))) } -func GetCorePods(settings *configuration.Config) ([]PodInfo, error) { - app := settings.Core.PodSelector.App - component := settings.Core.PodSelector.Component - namespace := settings.Core.PodSelector.Namespace - - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } +// Add the new pod to the cluster. +// - If it's a core pod, add it to the proxysql_servers table +// - if it's a satellite pod, run the commands to accept it to the cluster +func (p *ProxySQL) addPodToCluster(pod *v1.Pod) error { + slog.Info("Pod joined the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP)) - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } + commands := []string{} - pods, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: fmt.Sprintf("app=%s,component=%s", app, component), - }) + // If the new pod is a core pod, delete the default entries in the proxysql_server list and add the new pod to it. + if pod.Labels["component"] == "core" { + // TODO: maybe make this configurable, not everyone will name the service this. + commands = append(commands, "DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'") - var corePods []PodInfo - for _, pod := range pods.Items { - corePods = append(corePods, PodInfo{PodIP: pod.Status.PodIP, Hostname: pod.Name, UID: string(pod.GetUID())}) + cmd := fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.Status.PodIP, pod.Name) + commands = append(commands, cmd) } - return corePods, err -} - -func calculateChecksum(pods []PodInfo) string { - data := []string{} + commands = append(commands, + "LOAD PROXYSQL SERVERS TO RUNTIME", + "LOAD ADMIN VARIABLES TO RUNTIME", + "LOAD MYSQL VARIABLES TO RUNTIME", + "LOAD MYSQL SERVERS TO RUNTIME", + "LOAD MYSQL USERS TO RUNTIME", + "LOAD MYSQL QUERY RULES TO RUNTIME", + ) - for _, pod := range pods { - data = append(data, fmt.Sprintf("%s:%s:%s", pod.PodIP, pod.Hostname, pod.UID)) + for _, command := range commands { + _, err := p.conn.Exec(command) + if err != nil { + // FIXME: wrap error with extra info and return + slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err)) + return err + } } - sort.Strings(data) + slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; "))) - return fmt.Sprintf("%x", data) + return nil } -func createCommands(pods []PodInfo) []string { - sort.Sort(ByPodIP(pods)) +// Remove a core pod from the cluster when it leaves. This function just deletes the pod from +// proxysql_servers based on the hostname (PodIP here, technically). The function then runs all the +// LOAD TO RUNTIME commands required to sync state to the rest of the cluster. +func (p *ProxySQL) removePodFromCluster(pod *v1.Pod) error { + slog.Info("Pod left the cluster", slog.String("name", pod.Name), slog.String("ip", pod.Status.PodIP)) - commands := []string{"DELETE FROM proxysql_servers"} + commands := []string{} - for _, pod := range pods { - commands = append(commands, - fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('%s', 6032, 0, '%s')", pod.PodIP, pod.Hostname), - ) + if pod.Labels["component"] == "core" { + cmd := fmt.Sprintf("DELETE FROM proxysql_servers WHERE hostname = '%s'", pod.Status.PodIP) + commands = append(commands, cmd) } commands = append(commands, @@ -156,5 +237,15 @@ func createCommands(pods []PodInfo) []string { "LOAD MYSQL QUERY RULES TO RUNTIME", ) - return commands + for _, command := range commands { + _, err := p.conn.Exec(command) + if err != nil { + slog.Error("Commands failed", slog.String("commands", command), slog.Any("error", err)) + return err + } + } + + slog.Debug("Ran commands", slog.String("commands", strings.Join(commands, "; "))) + + return nil } diff --git a/internal/proxysql/core_test.go b/internal/proxysql/core_test.go index f3568d9..eb3dd98 100644 --- a/internal/proxysql/core_test.go +++ b/internal/proxysql/core_test.go @@ -1,48 +1,252 @@ package proxysql import ( + "fmt" + "os" + "regexp" "testing" + _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" + "gopkg.in/DATA-DOG/go-sqlmock.v2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestCreateCommands(t *testing.T) { - t.Run("one pod", func(t *testing.T) { - singlePod := []PodInfo{{PodIP: "192.168.0.1", Hostname: "host1", UID: "testuid1"}} - singleCommands := createCommands(singlePod) +func TestCore(t *testing.T) { + t.Run("TODO", func(t *testing.T) { + fmt.Println("TODO") + }) +} + +func TestPodUpdated(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Failed to create mock database connection: %v", err) + } + defer db.Close() + + mock.MatchExpectationsInOrder(true) + + p := &ProxySQL{db, tmpConfig, nil} + + oldpod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "old-pod", + Namespace: "test-ns", + Labels: map[string]string{ + "component": "core", + }, + }, + Status: v1.PodStatus{ + PodIP: "old-pod-ip", + }, + } + + newpod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "new-pod", + Namespace: "test-ns", + Labels: map[string]string{ + "component": "core", + }, + }, + Status: v1.PodStatus{ + PodIP: "new-pod-ip", + }, + } + + t.Run("pod started", func(t *testing.T) { + oldpod.Status.Phase = "Pending" + newpod.Status.Phase = "Running" + + for _, cmd := range []string{ + "DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'", + regexp.QuoteMeta("INSERT INTO proxysql_servers VALUES ('new-pod-ip', 6032, 0, 'new-pod')"), + "LOAD PROXYSQL SERVERS TO RUNTIME", + "LOAD ADMIN VARIABLES TO RUNTIME", + "LOAD MYSQL VARIABLES TO RUNTIME", + "LOAD MYSQL SERVERS TO RUNTIME", + "LOAD MYSQL USERS TO RUNTIME", + "LOAD MYSQL QUERY RULES TO RUNTIME", + } { + mock.ExpectExec(cmd).WillReturnResult(sqlmock.NewResult(0, 1)) + } + + p.podUpdated(oldpod, newpod) + }) + + t.Run("pod stopped", func(t *testing.T) { + oldpod.Status.Phase = "Running" + newpod.Status.Phase = "Failed" + + for _, cmd := range []string{ + "DELETE FROM proxysql_servers WHERE hostname = 'old-pod-ip'", + "LOAD PROXYSQL SERVERS TO RUNTIME", + "LOAD ADMIN VARIABLES TO RUNTIME", + "LOAD MYSQL VARIABLES TO RUNTIME", + "LOAD MYSQL SERVERS TO RUNTIME", + "LOAD MYSQL USERS TO RUNTIME", + "LOAD MYSQL QUERY RULES TO RUNTIME", + } { + mock.ExpectExec(cmd).WillReturnResult(sqlmock.NewResult(0, 1)) + } + + p.podUpdated(oldpod, newpod) + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("Unfulfilled expectations: %s", err) + } + + assert.NoError(t, err) +} + +func TestPodAdded(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Failed to create mock database connection: %v", err) + } + defer db.Close() + + mock.MatchExpectationsInOrder(true) - expectedSingleCommands := []string{ - "DELETE FROM proxysql_servers", - "INSERT INTO proxysql_servers VALUES ('192.168.0.1', 6032, 0, 'host1')", + p := &ProxySQL{db, tmpConfig, nil} + + // we have to do a little hostname trickery for this test, as podAdded will immediately return for any pods + // that aren't processing themselves. + hostname, _ := os.Hostname() + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: hostname, + Namespace: "test-ns", + Labels: map[string]string{ + "component": "core", + }, + }, + Status: v1.PodStatus{ + PodIP: "pod-ip", + }, + } + + t.Run("core pod already exists in cluster", func(t *testing.T) { + // Expect the query and return the row set + mock.ExpectQuery( + regexp.QuoteMeta("SELECT count(*) FROM proxysql_servers WHERE hostname = 'pod-ip'"), + ).WillReturnRows( + sqlmock.NewRows([]string{"count"}).AddRow(1), + ) + + p.podAdded(pod) + }) + + t.Run("core pod does not exist in cluster", func(t *testing.T) { + // Expect the query and return the row set + mock.ExpectQuery( + regexp.QuoteMeta("SELECT count(*) FROM proxysql_servers WHERE hostname = 'pod-ip'"), + ).WillReturnRows( + sqlmock.NewRows([]string{"count"}).AddRow(0), + ) + + for _, cmd := range []string{ + "DELETE FROM proxysql_servers WHERE hostname = 'proxysql-core'", + regexp.QuoteMeta(fmt.Sprintf("INSERT INTO proxysql_servers VALUES ('pod-ip', 6032, 0, '%s')", hostname)), "LOAD PROXYSQL SERVERS TO RUNTIME", "LOAD ADMIN VARIABLES TO RUNTIME", "LOAD MYSQL VARIABLES TO RUNTIME", "LOAD MYSQL SERVERS TO RUNTIME", "LOAD MYSQL USERS TO RUNTIME", "LOAD MYSQL QUERY RULES TO RUNTIME", + } { + mock.ExpectExec(cmd).WillReturnResult(sqlmock.NewResult(0, 1)) } - assert.Equal(t, expectedSingleCommands, singleCommands, "Single pod should generate expected commands") + + p.podAdded(pod) }) - t.Run("several pods", func(t *testing.T) { - // making these out of order, to test the sort functions - multiplePods := []PodInfo{ - {PodIP: "192.168.0.2", Hostname: "host2", UID: "testuid2"}, - {PodIP: "192.168.0.1", Hostname: "host1", UID: "testuid1"}, + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("Unfulfilled expectations: %s", err) + } + + assert.NoError(t, err) +} + +func TestRemovePodFromCluster(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Failed to create mock database connection: %v", err) + } + defer db.Close() + + mock.MatchExpectationsInOrder(true) + + p := &ProxySQL{db, tmpConfig, nil} + + t.Run("core pod", func(t *testing.T) { + mock.ExpectExec(fmt.Sprintf("DELETE FROM proxysql_servers WHERE hostname = '%s'", "pod-ip")).WillReturnResult(sqlmock.NewResult(0, 1)) + + for _, cmd := range []string{ + "LOAD PROXYSQL SERVERS TO RUNTIME", + "LOAD ADMIN VARIABLES TO RUNTIME", + "LOAD MYSQL VARIABLES TO RUNTIME", + "LOAD MYSQL SERVERS TO RUNTIME", + "LOAD MYSQL USERS TO RUNTIME", + "LOAD MYSQL QUERY RULES TO RUNTIME", + } { + mock.ExpectExec(cmd).WillReturnResult(sqlmock.NewResult(0, 1)) + } + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-ns", + Labels: map[string]string{ + "component": "core", + }, + }, + Status: v1.PodStatus{ + PodIP: "pod-ip", + }, + } + + err = p.removePodFromCluster(pod) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("Unfulfilled expectations: %s", err) } - multipleCommands := createCommands(multiplePods) - expectedMultipleCommands := []string{ - "DELETE FROM proxysql_servers", - "INSERT INTO proxysql_servers VALUES ('192.168.0.1', 6032, 0, 'host1')", - "INSERT INTO proxysql_servers VALUES ('192.168.0.2', 6032, 0, 'host2')", + assert.NoError(t, err) + }) + + t.Run("satellite pod", func(t *testing.T) { + for _, cmd := range []string{ "LOAD PROXYSQL SERVERS TO RUNTIME", "LOAD ADMIN VARIABLES TO RUNTIME", "LOAD MYSQL VARIABLES TO RUNTIME", "LOAD MYSQL SERVERS TO RUNTIME", "LOAD MYSQL USERS TO RUNTIME", "LOAD MYSQL QUERY RULES TO RUNTIME", + } { + mock.ExpectExec(cmd).WillReturnResult(sqlmock.NewResult(0, 1)) + } + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{ + "component": "satellite", + }, + }, } - assert.Equal(t, expectedMultipleCommands, multipleCommands, "Multiple pods should generate expected commands") + + err = p.removePodFromCluster(pod) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("Unfulfilled expectations: %s", err) + } + + assert.NoError(t, err) }) } diff --git a/internal/proxysql/proxysql.go b/internal/proxysql/proxysql.go index 67de16d..8d5c731 100644 --- a/internal/proxysql/proxysql.go +++ b/internal/proxysql/proxysql.go @@ -8,14 +8,16 @@ import ( "os" "github.com/persona-id/proxysql-agent/internal/configuration" + "k8s.io/client-go/kubernetes" // Import the mysql driver functionality. _ "github.com/go-sql-driver/mysql" ) type ProxySQL struct { - conn *sql.DB - settings *configuration.Config + conn *sql.DB + settings *configuration.Config + clientset kubernetes.Interface } func (p *ProxySQL) New(configs *configuration.Config) (*ProxySQL, error) { @@ -38,7 +40,7 @@ func (p *ProxySQL) New(configs *configuration.Config) (*ProxySQL, error) { slog.Info("Connected to ProxySQL admin", slog.String("Host", address)) - return &ProxySQL{conn, settings}, nil + return &ProxySQL{conn, settings, nil}, nil } func (p *ProxySQL) Conn() *sql.DB { diff --git a/internal/proxysql/proxysql_test.go b/internal/proxysql/proxysql_test.go index af57fac..281f114 100644 --- a/internal/proxysql/proxysql_test.go +++ b/internal/proxysql/proxysql_test.go @@ -46,7 +46,7 @@ func TestPing(t *testing.T) { // FIXME: this doesn't exist now apparently, idk. // mock.ExpectPing() - proxy := &ProxySQL{db, tmpConfig} + proxy := &ProxySQL{db, tmpConfig, nil} err = proxy.Ping() assert.NoError(t, err, "Ping() should not return an error") @@ -60,7 +60,7 @@ func TestGetBackends(t *testing.T) { defer db.Close() - proxy := &ProxySQL{db, tmpConfig} + proxy := &ProxySQL{db, tmpConfig, nil} t.Run("no error", func(t *testing.T) { expectedRows := sqlmock.NewRows([]string{"hostgroup_id", "hostname", "port"}). diff --git a/internal/proxysql/satellite.go b/internal/proxysql/satellite.go index bf5cda5..de7247d 100644 --- a/internal/proxysql/satellite.go +++ b/internal/proxysql/satellite.go @@ -11,7 +11,7 @@ import ( ) // -// Satellite mode specific functons +// Satellite mode specific functions // func (p *ProxySQL) Satellite() { diff --git a/internal/proxysql/satellite_test.go b/internal/proxysql/satellite_test.go index 257d902..dbd5a0c 100644 --- a/internal/proxysql/satellite_test.go +++ b/internal/proxysql/satellite_test.go @@ -1,8 +1,11 @@ package proxysql import ( + "bufio" "errors" + "os" "regexp" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -17,7 +20,7 @@ func TestGetMissingCorePods(t *testing.T) { query := regexp.QuoteMeta("SELECT COUNT(hostname) FROM stats_proxysql_servers_metrics WHERE last_check_ms > 30000 AND hostname != 'proxysql-core' AND Uptime_s > 0") - proxy := &ProxySQL{db, tmpConfig} + proxy := &ProxySQL{db, tmpConfig, nil} t.Run("no error", func(t *testing.T) { expectedCount := 1 @@ -41,3 +44,119 @@ func TestGetMissingCorePods(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet(), "SQL expectations were not met") }) } + +func TestSatelliteResync(t *testing.T) { + // Mock database connection + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("An error '%s' was not expected when opening a mock database connection", err) + } + defer db.Close() + + mock.MatchExpectationsInOrder(true) + + p := &ProxySQL{conn: db} + + query := regexp.QuoteMeta("SELECT COUNT(hostname) FROM stats_proxysql_servers_metrics WHERE last_check_ms > 30000 AND hostname != 'proxysql-core' AND Uptime_s > 0") + mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + commands := []string{ + "DELETE FROM proxysql_servers", + "LOAD PROXYSQL SERVERS FROM CONFIG", + "LOAD PROXYSQL SERVERS TO RUNTIME;", + } + for _, command := range commands { + mock.ExpectExec(command).WillReturnResult(sqlmock.NewResult(1, 1)) + } + + err = p.SatelliteResync() + + if err != nil { + t.Errorf("Expected no error, but got %s", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("There were unfulfilled expectations: %s", err) + } +} + +func TestDumpQueryRuleStats(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + + tmpdir := os.TempDir() + + p := &ProxySQL{conn: db} + + // No stats in table, nothing is done. + t.Run("no stats", func(t *testing.T) { + rows := sqlmock.NewRows([]string{"count"}).AddRow(0) + mock.ExpectQuery( + regexp.QuoteMeta("SELECT COUNT(*) FROM stats_mysql_query_rules"), + ).WillReturnRows(rows) + + _, err := p.DumpQueryRuleStats(tmpdir) + if err != nil { + t.Errorf("Expected no error, but got %s instead", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } + }) + + // Has stats in table, so the file should have data + t.Run("has stats", func(t *testing.T) { + rows := sqlmock.NewRows([]string{"count"}).AddRow(1) + mock.ExpectQuery( + regexp.QuoteMeta("SELECT COUNT(*) FROM stats_mysql_query_rules"), + ).WillReturnRows(rows) + + rows = sqlmock.NewRows([]string{"rule_id", "hits"}).AddRow(1, 100).AddRow(2, 200) + mock.ExpectQuery( + regexp.QuoteMeta("SELECT * FROM stats_mysql_query_rules"), + ).WillReturnRows(rows) + + filePath, err := p.DumpQueryRuleStats(tmpdir) + if err != nil { + t.Errorf("Expected no error, but got %s instead", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled expectations: %s", err) + } + + // verify the file content + file, err := os.Open(filePath) + if err != nil { + t.Errorf("Expected file to be created, but got %s", err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + header := "rule_id,hits" + + if scanner.Scan() { + if strings.TrimSpace(scanner.Text()) != header { + t.Errorf("Expected file header to be '%s', got '%s'", header, scanner.Text()) + } + } + + if scanner.Scan() { + line := "1,100" + if strings.TrimSpace(scanner.Text()) != line { + t.Errorf("Expected first line to be '%s', got '%s'", line, scanner.Text()) + } + } + + if scanner.Scan() { + line := "2,200" + if strings.TrimSpace(scanner.Text()) != line { + t.Errorf("Expected last line to be '%s', got '%s'", line, scanner.Text()) + } + } + }) +}