diff --git a/CHANGELOG.md b/CHANGELOG.md index 449f61bef8e..c583cd2b8a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) +- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216)) - **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 44a27e8e463..70e2030d9bf 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/go-logr/logr" "github.com/tidwall/gjson" v2 "k8s.io/api/autoscaling/v2" @@ -34,7 +35,8 @@ type elasticsearchMetadata struct { CloudID string `keda:"name=cloudID, order=authParams;triggerMetadata, optional"` APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"` Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"` - SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata"` + SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata, optional"` + Query string `keda:"name=query, order=authParams;triggerMetadata, optional"` Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"` ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"` TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"` @@ -57,6 +59,13 @@ func (m *elasticsearchMetadata) Validate() error { if len(m.Addresses) > 0 && (m.Username == "" || m.Password == "") { return fmt.Errorf("both username and password must be provided when addresses is used") } + if m.SearchTemplateName == "" && m.Query == "" { + return fmt.Errorf("either searchTemplateName or query must be provided") + } + if m.SearchTemplateName != "" && m.Query != "" { + return fmt.Errorf("cannot provide both searchTemplateName and query") + } + return nil } @@ -93,7 +102,12 @@ func parseElasticsearchMetadata(config *scalersconfig.ScalerConfig) (elasticsear return meta, err } - meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName))) + if meta.SearchTemplateName != "" { + meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.SearchTemplateName))) + } else { + meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, "elasticsearch-query") + } + meta.TriggerIndex = config.TriggerIndex return meta, nil @@ -137,17 +151,29 @@ func (s *elasticsearchScaler) Close(_ context.Context) error { // getQueryResult returns result of the scaler query func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (float64, error) { // Build the request body. - var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil { - s.logger.Error(err, "Error encoding query: %s", err) + var res *esapi.Response + var err error + + if s.metadata.SearchTemplateName != "" { + // Using SearchTemplateName + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(buildQuery(&s.metadata)); err != nil { + s.logger.Error(err, "Error encoding query: %s", err) + } + res, err = s.esClient.SearchTemplate( + &body, + s.esClient.SearchTemplate.WithIndex(s.metadata.Index...), + s.esClient.SearchTemplate.WithContext(ctx), + ) + } else { + // Using Query + res, err = s.esClient.Search( + s.esClient.Search.WithIndex(s.metadata.Index...), + s.esClient.Search.WithBody(strings.NewReader(s.metadata.Query)), + s.esClient.Search.WithContext(ctx), + ) } - // Run the templated search - res, err := s.esClient.SearchTemplate( - &body, - s.esClient.SearchTemplate.WithIndex(s.metadata.Index...), - s.esClient.SearchTemplate.WithContext(ctx), - ) if err != nil { s.logger.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) return 0, err diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 95725065703..2e6966f0422 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -73,13 +73,34 @@ var testCases = []parseElasticsearchMetadataTestData{ expectedError: fmt.Errorf("missing required parameter \"index\""), }, { - name: "no searchTemplateName given", + name: "query and searchTemplateName provided", metadata: map[string]string{ - "addresses": "http://localhost:9200", - "index": "index1", + "addresses": "http://localhost:9200", + "index": "index1", + "query": `{"match": {"field": "value"}}`, + "searchTemplateName": "myTemplate", + "valueLocation": "hits.total.value", + "targetValue": "12", }, - authParams: map[string]string{"username": "admin"}, - expectedError: fmt.Errorf("missing required parameter \"searchTemplateName\""), + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedError: fmt.Errorf("cannot provide both searchTemplateName and query"), + }, + { + name: "neither query nor searchTemplateName provided", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "valueLocation": "hits.total.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedError: fmt.Errorf("either searchTemplateName or query must be provided"), }, { name: "no valueLocation given", @@ -306,6 +327,31 @@ var testCases = []parseElasticsearchMetadataTestData{ }, expectedError: nil, }, + { + name: "valid query parameter", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "query": `{"match": {"field": "value"}}`, + "valueLocation": "hits.total.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + Addresses: []string{"http://localhost:9200"}, + Index: []string{"index1"}, + Username: "admin", + Password: "password", + Query: `{"match": {"field": "value"}}`, + ValueLocation: "hits.total.value", + TargetValue: 12, + MetricName: "s0-elasticsearch-query", + }, + expectedError: nil, + }, } func TestParseElasticsearchMetadata(t *testing.T) { diff --git a/tests/scalers/elasticsearch/elasticsearch_test.go b/tests/scalers/elasticsearch/elasticsearch_test.go index de2d997066c..ea83f315298 100644 --- a/tests/scalers/elasticsearch/elasticsearch_test.go +++ b/tests/scalers/elasticsearch/elasticsearch_test.go @@ -202,7 +202,7 @@ spec: name: elasticsearch ` - scaledObjectTemplate = ` + scaledObjectTemplateSearchTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -232,6 +232,54 @@ spec: name: keda-trigger-auth-elasticsearch-secret ` + scaledObjectTemplateQuery = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: 0 + maxReplicaCount: 2 + pollingInterval: 3 + cooldownPeriod: 5 + triggers: + - type: elasticsearch + metadata: + addresses: "http://{{.DeploymentName}}.{{.TestNamespace}}.svc:9200" + username: "elastic" + index: {{.IndexName}} + query: | + { + "query": { + "bool": { + "must": [ + { + "range": { + "@timestamp": { + "gte": "now-1m", + "lte": "now" + } + } + }, + { + "match_all": {} + } + ] + } + } + } + valueLocation: "hits.total.value" + targetValue: "1" + activationTargetValue: "4" + authenticationRef: + name: keda-trigger-auth-elasticsearch-secret +` + elasticsearchCreateIndex = ` { "mappings": { @@ -297,9 +345,6 @@ spec: func TestElasticsearchScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - t.Cleanup(func() { - DeleteKubernetesResources(t, testNamespace, data, templates) - }) // Create kubernetes resources CreateKubernetesResources(t, kc, testNamespace, data, templates) @@ -307,13 +352,32 @@ func TestElasticsearchScaler(t *testing.T) { // setup elastic setupElasticsearch(t, kc) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), - "replica count should be %d after 3 minutes", minReplicaCount) + t.Run("test with searchTemplateName", func(t *testing.T) { + t.Log("--- testing with searchTemplateName ---") + + // Create ScaledObject with searchTemplateName + KubectlApplyWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate) - // test scaling - testActivation(t, kc) - testScaleOut(t, kc) - testScaleIn(t, kc) + testElasticsearchScaler(t, kc) + + // Delete ScaledObject + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateSearchTemplate", scaledObjectTemplateSearchTemplate) + }) + + t.Run("test with query", func(t *testing.T) { + t.Log("--- testing with query ---") + + // Create ScaledObject with query + KubectlApplyWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery) + + testElasticsearchScaler(t, kc) + + // Delete ScaledObject + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplateQuery", scaledObjectTemplateQuery) + }) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) } func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) { @@ -326,22 +390,18 @@ func setupElasticsearch(t *testing.T, kc *kubernetes.Clientset) { require.NoErrorf(t, err, "cannot execute command - %s", err) } -func testActivation(t *testing.T, kc *kubernetes.Clientset) { +func testElasticsearchScaler(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing activation ---") addElements(t, 3) AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60) -} -func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale out ---") addElements(t, 5) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), "replica count should be %d after 3 minutes", maxReplicaCount) -} -func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { t.Log("--- testing scale in ---") assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), @@ -383,6 +443,5 @@ func getTemplateData() (templateData, []Template) { {Name: "serviceTemplate", Config: serviceTemplate}, {Name: "elasticsearchDeploymentTemplate", Config: elasticsearchDeploymentTemplate}, {Name: "deploymentTemplate", Config: deploymentTemplate}, - {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, } }