Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/splunk observability scaler #6192

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Add Splunk Observability Cloud Scaler ([#6190](https://github.com/kedacore/keda/issues/6190))
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))

#### Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
controller-gen.kubebuilder.io/version: v0.15.0
name: clustercloudeventsources.eventing.keda.sh
spec:
group: eventing.keda.sh
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/segmentio/kafka-go v0.4.47
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0
github.com/signalfx/signalflow-client-go/v2 v2.3.0
github.com/spf13/cast v1.6.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -119,6 +120,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.4.3
)

require github.com/signalfx/signalfx-go v1.34.0 // indirect
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved

// Remove this when they merge the PR and cut a release https://github.com/open-policy-agent/cert-controller/pull/202
replace github.com/open-policy-agent/cert-controller => github.com/jorturfer/cert-controller v0.0.0-20240427003941-363ba56751d7

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,10 @@ github.com/shurcooL/go v0.0.0-20200502201357-93f07166e636/go.mod h1:TDJrrUr11Vxr
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/signalfx/signalflow-client-go/v2 v2.3.0 h1:CMhvEfDDWbdPCfMNiQTAymRIRzVbgveGbTq5wr8OHuM=
github.com/signalfx/signalflow-client-go/v2 v2.3.0/go.mod h1:ir6CHksVkhh1vlslldjf6k5qD88QQxWW8WMG5PxSQco=
github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE=
github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down
2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/liiklus/LiiklusService.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/liiklus/LiiklusService_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

183 changes: 183 additions & 0 deletions pkg/scalers/splunk_observability_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package scalers

import (
"context"
"fmt"
"math"
"regexp"
"time"

"github.com/go-logr/logr"
"github.com/signalfx/signalflow-client-go/v2/signalflow"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type splunkObservabilityMetadata struct {
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
TriggerIndex int

AccessToken string `keda:"name=accessToken, order=authParams"`
Realm string `keda:"name=realm, order=authParams"`
Query string `keda:"name=query, order=triggerMetadata"`
Duration int `keda:"name=duration, order=triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"`
QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata"`
}

type splunkObservabilityScaler struct {
metadata *splunkObservabilityMetadata
apiClient *signalflow.Client
logger logr.Logger
}

func parseSplunkObservabilityMetadata(config *scalersconfig.ScalerConfig) (*splunkObservabilityMetadata, error) {
meta := &splunkObservabilityMetadata{}
meta.TriggerIndex = config.TriggerIndex

if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing splunk observability metadata: %w", err)
}

return meta, nil
}

func newSplunkO11yConnection(meta *splunkObservabilityMetadata, logger logr.Logger) (*signalflow.Client, error) {
logger.Info(fmt.Sprintf("meta: %+v\n", meta))
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved

if meta.Realm == "" || meta.AccessToken == "" {
return nil, fmt.Errorf("error: Could not find splunk access token or ream")
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
}

apiClient, err := signalflow.NewClient(
signalflow.StreamURLForRealm(meta.Realm),
signalflow.AccessToken(meta.AccessToken),
signalflow.OnError(func(err error) {
errorMsg := fmt.Sprintf("error in SignalFlow client: %v\n", err)
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
logger.Info(errorMsg)
}))
if err != nil {
return nil, fmt.Errorf("error creating SignalFlow client: %w", err)
}

return apiClient, nil
}

func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "splunk_observability_scaler")

meta, err := parseSplunkObservabilityMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Splunk metadata: %w", err)
}

apiClient, err := newSplunkO11yConnection(meta, logger)
if err != nil {
return nil, fmt.Errorf("error establishing Splunk Observability Cloud connection: %w", err)
}

return &splunkObservabilityScaler{
metadata: meta,
apiClient: apiClient,
logger: logger,
}, nil
}

func (s *splunkObservabilityScaler) getQueryResult() (float64, error) {
comp, err := s.apiClient.Execute(context.Background(), &signalflow.ExecuteRequest{
Program: s.metadata.Query,
})
if err != nil {
return -1, fmt.Errorf("error: could not execute signalflow query: %w", err)
}

s.logger.Info("Started MTS stream.")
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved

time.Sleep(time.Duration(s.metadata.Duration * int(time.Second)))
if err := comp.Stop(context.Background()); err != nil {
return -1, fmt.Errorf("error creating SignalFlow client: %w", err)
}

s.logger.Info("Closed MTS stream.")

max := math.Inf(-1)
min := math.Inf(1)
valueSum := 0.0
valueCount := 0
s.logger.Info("Now iterating over results.")
for msg := range comp.Data() {
if len(msg.Payloads) == 0 {
s.logger.Info("No data retreived.")
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
continue
}
for _, pl := range msg.Payloads {
value, ok := pl.Value().(float64)
if !ok {
return -1, fmt.Errorf("error: could not convert Splunk Observability metric value to float64")
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
}
s.logger.Info(fmt.Sprintf("Encountering value %.4f\n", value))
max = math.Max(max, value)
min = math.Min(min, value)
valueSum += value
valueCount++
}
}

if valueCount > 1 && s.metadata.QueryAggregator == "" {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series or add a queryAggregator")
}

switch s.metadata.QueryAggregator {
case "max":
s.logger.Info(fmt.Sprintf("Returning max value: %.4f\n", max))
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
return max, nil
case "min":
s.logger.Info(fmt.Sprintf("Returning min value: %.4f\n", min))
return min, nil
case "avg":
avg := valueSum / float64(valueCount)
s.logger.Info(fmt.Sprintf("Returning avg value: %.4f\n", avg))
return avg, nil
default:
return max, nil
}
}

func (s *splunkObservabilityScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
num, err := s.getQueryResult()

if err != nil {
s.logger.Error(err, "error getting metrics from Splunk Observability Cloud.")
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error getting metrics from Splunk Observability Cloud: %w", err)
}
metric := GenerateMetricInMili(metricName, num)

return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil
}

func (s *splunkObservabilityScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString("signalfx")
re := regexp.MustCompile(`data\('([^']*)'`)
sschimper-splunk marked this conversation as resolved.
Show resolved Hide resolved
match := re.FindStringSubmatch(s.metadata.Query)
if len(match) > 1 {
metricName = kedautil.NormalizeString(match[1])
}

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: metricName,
},
Target: GetMetricTargetMili(v2.ValueMetricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *splunkObservabilityScaler) Close(context.Context) error {
return nil
}
89 changes: 89 additions & 0 deletions pkg/scalers/splunk_observability_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package scalers

import (
"context"
"testing"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

type parseSplunkObservabilityMetadataTestData struct {
metadata map[string]string
authParams map[string]string
isError bool
}

type SplunkObservabilityMetricIdentifier struct {
metadataTestData *parseSplunkObservabilityMetadataTestData
triggerIndex int
metricName string
}

var validSplunkObservabilityAuthParams = map[string]string{
"accessToken": "my-suyper-secret-access-token",
"realm": "my-realm",
}

var validSplunkObservabilityMetadata = map[string]string{
"query": "data('demo.trans.latency').max().publish()",
"duration": "10",
"targetValue": "200.0",
"queryAggregator": "avg",
"ActivationTargetValue": "1.1",
}

var testSplunkObservabilityMetadata = []parseSplunkObservabilityMetadataTestData{
// Valid metadata and valid auth params, pass.
{validSplunkObservabilityMetadata, validSplunkObservabilityAuthParams, false},
// no params at all, fail
{map[string]string{}, map[string]string{}, true},
// No meta dada but valid auth, fail.
{map[string]string{}, validSplunkObservabilityAuthParams, true},
// Valid meta dada but no auth params, fail.
{validSplunkObservabilityMetadata, map[string]string{}, true},
// Missing 'query' field, fail
{map[string]string{"duration": "10", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'duration' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'targetValue' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'queryAggregator' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'activationTargetValue' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "queryAggregator": "avg"}, validSplunkObservabilityAuthParams, true},
}

var SplunkObservabilityMetricIdentifiers = []SplunkObservabilityMetricIdentifier{
{&testSplunkObservabilityMetadata[0], 0, "demo-trans-latency"},
{&testSplunkObservabilityMetadata[0], 1, "demo-trans-latency"},
}

func TestSplunkObservabilityParseMetadata(t *testing.T) {
for _, testData := range testSplunkObservabilityMetadata {
_, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
} else if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) {
for _, testData := range SplunkObservabilityMetricIdentifiers {
ctx := context.Background()
meta, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validSplunkObservabilityAuthParams, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse Splunk Observability metadata:", err)
}
mockSplunkObservabilityScaler := splunkObservabilityScaler{
metadata: meta,
}

metricSpec := mockSplunkObservabilityScaler.GetMetricSpecForScaling(ctx)
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.metricName {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewSolrScaler(config)
case "splunk":
return scalers.NewSplunkScaler(config)
case "splunk-observability":
return scalers.NewSplunkObservabilityScaler(config)
case "stan":
return scalers.NewStanScaler(config)
default:
Expand Down
Loading
Loading