Skip to content

Commit

Permalink
Fix panic runtime error predictkube scaler test
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer committed Dec 2, 2024
1 parent 2adbc81 commit 0cb209d
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 78 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ require (
github.com/onsi/ginkgo/v2 v2.21.0
github.com/onsi/gomega v1.35.1
github.com/open-policy-agent/cert-controller v0.12.0
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2206,8 +2206,6 @@ github.com/otiai10/mint v1.3.3/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
Expand Down
207 changes: 133 additions & 74 deletions pkg/scalers/predictkube_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package scalers

import (
"context"
"crypto/rand"
"fmt"
"log"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

libsSrv "github.com/dysnix/predictkube-libs/external/grpc/server"
pb "github.com/dysnix/predictkube-proto/external/proto/services"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

const (
defaultTestPort = 50051
// nosemgrep: detected-jwt-token, detected-generic-api-key
testAPIKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJ0ZXN0IENyZWF0ZUNsaWVudCIsImV4cCI6MTY0NjkxNzI3Nywic3ViIjoiODM4NjY5ODAtM2UzNS0xMWVjLTlmMjQtYWNkZTQ4MDAxMTIyIn0.5QEuO6_ysdk2abGvk3Xp7Q25M4H4pIFXeqP2E7n9rKI"
)

type server struct {
grpcSrv *grpc.Server
listener net.Listener
Expand All @@ -33,56 +40,18 @@ func (s *server) GetPredictMetric(_ context.Context, _ *pb.ReqGetPredictMetric)
}, nil
}

func (s *server) start() <-chan error {
errCh := make(chan error, 1)

go func() {
defer close(errCh)

var (
err error
)

s.port, err = freeport.GetFreePort()
if err != nil {
log.Fatalf("Could not get free port for init mock grpc server: %s", err)
}

serverURL := fmt.Sprintf("0.0.0.0:%d", s.port)
if s.listener == nil {
var err error
s.listener, err = net.Listen("tcp4", serverURL)

if err != nil {
log.Println("starting grpc server with error")

errCh <- err
return
}
}

log.Printf("🚀 starting mock grpc server. On host 0.0.0.0, with port: %d", s.port)

if err := s.grpcSrv.Serve(s.listener); err != nil {
log.Println(err, "serving grpc server with error")

errCh <- err
return
}
}()

return errCh
}

func (s *server) stop() error {
s.grpcSrv.GracefulStop()
return libsSrv.CheckNetErrClosing(s.listener.Close())
}

func runMockGrpcPredictServer() (*server, *grpc.Server) {
// nosemgrep
grpcServer := grpc.NewServer()

mockGrpcServer := &server{grpcSrv: grpcServer}
mockGrpcServer := &server{
grpcSrv: grpcServer,
port: defaultTestPort,
}

defer func() {
if r := recover(); r != nil {
Expand All @@ -91,21 +60,28 @@ func runMockGrpcPredictServer() (*server, *grpc.Server) {
}
}()

pb.RegisterMlEngineServiceServer(grpcServer, mockGrpcServer)

serverURL := fmt.Sprintf("0.0.0.0:%d", mockGrpcServer.port)
listener, err := net.Listen("tcp4", serverURL)
if err != nil {
log.Printf("Failed to listen: %v", err)
return nil, nil
}
mockGrpcServer.listener = listener

go func() {
for errCh := range mockGrpcServer.start() {
if errCh != nil {
log.Printf("GRPC server listen error: %3v", errCh)
}
log.Printf("🚀 starting mock grpc server. On host 0.0.0.0, with port: %d", mockGrpcServer.port)
if err := grpcServer.Serve(listener); err != nil {
log.Printf("GRPC server listen error: %v", err)
}
}()

pb.RegisterMlEngineServiceServer(grpcServer, mockGrpcServer)
time.Sleep(time.Second)

return mockGrpcServer, grpcServer
}

const testAPIKey = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJ0ZXN0IENyZWF0ZUNsaWVudCIsImV4cCI6MTY0NjkxNzI3Nywic3ViIjoiODM4NjY5ODAtM2UzNS0xMWVjLTlmMjQtYWNkZTQ4MDAxMTIyIn0.5QEuO6_ysdk2abGvk3Xp7Q25M4H4pIFXeqP2E7n9rKI"

type predictKubeMetadataTestData struct {
metadata map[string]string
authParams map[string]string
Expand All @@ -126,13 +102,11 @@ var testPredictKubeMetadata = []predictKubeMetadataTestData{
// malformed threshold
{
map[string]string{"predictHorizon": "2h", "historyTimeWindow": "7d", "prometheusAddress": "http://localhost:9090", "queryStep": "2m", "threshold": "one", "query": "up"},

map[string]string{"apiKey": testAPIKey}, true,
},
// malformed activation threshold
{
map[string]string{"predictHorizon": "2h", "historyTimeWindow": "7d", "prometheusAddress": "http://localhost:9090", "queryStep": "2m", "threshold": "1", "activationThreshold": "one", "query": "up"},

map[string]string{"apiKey": testAPIKey}, true,
},
// missing query
Expand Down Expand Up @@ -167,18 +141,45 @@ var predictKubeMetricIdentifiers = []predictKubeMetricIdentifier{

func TestPredictKubeGetMetricSpecForScaling(t *testing.T) {
mockPredictServer, grpcServer := runMockGrpcPredictServer()
if mockPredictServer == nil || grpcServer == nil {
t.Fatal("Failed to start mock server")
}

defer func() {
_ = mockPredictServer.stop()
grpcServer.GracefulStop()
}()

err := waitForServer(fmt.Sprintf("0.0.0.0:%d", defaultTestPort), 5*time.Second)
if err != nil {
t.Fatalf("Server failed to start: %v", err)
}

// Mock Prometheus server
mockPrometheus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`))
}))
defer mockPrometheus.Close()

mlEngineHost = "0.0.0.0"
mlEnginePort = mockPredictServer.port

for _, testData := range predictKubeMetricIdentifiers {
metadata := make(map[string]string)
for k, v := range testData.metadataTestData.metadata {
if k == "prometheusAddress" {
metadata[k] = mockPrometheus.URL
} else {
metadata[k] = v
}
}

mockPredictKubeScaler, err := NewPredictKubeScaler(
context.Background(), &scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadataTestData.metadata,
context.Background(),
&scalersconfig.ScalerConfig{
TriggerMetadata: metadata,
AuthParams: testData.metadataTestData.authParams,
TriggerIndex: testData.triggerIndex,
},
Expand All @@ -191,39 +192,97 @@ func TestPredictKubeGetMetricSpecForScaling(t *testing.T) {
t.Error("Wrong External metric source name:", metricName)
return
}
}
}

t.Log(metricSpec)
func waitForServer(address string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for server")
default:
conn, err := net.Dial("tcp", address)
if err == nil {
conn.Close()
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
}

func TestPredictKubeGetMetrics(t *testing.T) {
grpcConf.Conn.Insecure = true

mockPredictServer, grpcServer := runMockGrpcPredictServer()
<-time.After(time.Second * 3)
if mockPredictServer == nil || grpcServer == nil {
t.Fatal("Failed to start mock server")
}

defer func() {
_ = mockPredictServer.stop()
grpcServer.GracefulStop()
}()

err := waitForServer(fmt.Sprintf("0.0.0.0:%d", defaultTestPort), 5*time.Second)
if err != nil {
t.Fatalf("Server failed to start: %v", err)
}

// Mock Prometheus server
mockPrometheus := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`))
}))
defer mockPrometheus.Close()

grpcConf.Conn.Insecure = true
mlEngineHost = "0.0.0.0"
mlEnginePort = mockPredictServer.port
mlEnginePort = defaultTestPort

for _, testData := range predictKubeMetricIdentifiers {
mockPredictKubeScaler, err := NewPredictKubeScaler(
context.Background(), &scalersconfig.ScalerConfig{
TriggerMetadata: testData.metadataTestData.metadata,
AuthParams: testData.metadataTestData.authParams,
TriggerIndex: testData.triggerIndex,
},
)
assert.NoError(t, err)
testData := testData
t.Run(fmt.Sprintf("trigger_index_%d", testData.triggerIndex), func(t *testing.T) {
metadata := make(map[string]string)
for k, v := range testData.metadataTestData.metadata {
if k == "prometheusAddress" {
metadata[k] = mockPrometheus.URL
} else {
metadata[k] = v
}
}

result, _, err := mockPredictKubeScaler.GetMetricsAndActivity(context.Background(), predictKubeMetricPrefix)
assert.NoError(t, err)
assert.Equal(t, len(result), 1)
assert.Equal(t, result[0].Value, *resource.NewMilliQuantity(mockPredictServer.val*1000, resource.DecimalSI))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

mockPredictKubeScaler, err := NewPredictKubeScaler(
ctx,
&scalersconfig.ScalerConfig{
TriggerMetadata: metadata,
AuthParams: testData.metadataTestData.authParams,
TriggerIndex: testData.triggerIndex,
},
)
if err != nil {
t.Fatalf("Failed to create scaler: %v", err)
}

result, _, err := mockPredictKubeScaler.GetMetricsAndActivity(ctx, predictKubeMetricPrefix)
if err != nil {
t.Fatalf("Failed to get metrics: %v", err)
}

if len(result) == 0 {
t.Fatal("Expected non-empty result")
}

t.Logf("get: %v, want: %v, predictMetric: %d", result[0].Value, *resource.NewQuantity(mockPredictServer.val, resource.DecimalSI), mockPredictServer.val)
assert.Equal(t, result[0].Value, *resource.NewMilliQuantity(mockPredictServer.val*1000, resource.DecimalSI))
t.Logf("get: %v, want: %v, predictMetric: %d",
result[0].Value,
*resource.NewQuantity(mockPredictServer.val, resource.DecimalSI),
mockPredictServer.val)
})
}
}
1 change: 0 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,6 @@ github.com/onsi/gomega/types
# github.com/open-policy-agent/cert-controller v0.12.0
## explicit; go 1.22.0
github.com/open-policy-agent/cert-controller/pkg/rotator
# github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
## explicit
github.com/phayes/freeport
# github.com/pierrec/lz4/v4 v4.1.21
Expand Down

0 comments on commit 0cb209d

Please sign in to comment.