diff --git a/pkg/scalers/gitlab_runner_scaler.go b/pkg/scalers/gitlab_runner_scaler.go new file mode 100644 index 00000000000..11f475d7637 --- /dev/null +++ b/pkg/scalers/gitlab_runner_scaler.go @@ -0,0 +1,183 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + // pipelineWaitingForResourceStatus is the status of the pipelines that are waiting for resources. + pipelineWaitingForResourceStatus = "waiting_for_resource" + + // maxGitlabAPIPageCount is the maximum number of pages to query for pipelines. + maxGitlabAPIPageCount = 50 + // gitlabAPIPerPage is the number of pipelines to query per page. + gitlabAPIPerPage = "200" +) + +type gitlabRunnerScaler struct { + metricType v2.MetricTargetType + metadata *gitlabRunnerMetadata + httpClient *http.Client + logger logr.Logger +} + +type gitlabRunnerMetadata struct { + GitLabAPIURL *url.URL `keda:"name=gitlabAPIURL, order=triggerMetadata, default=https://gitlab.com, optional"` + PersonalAccessToken string `keda:"name=personalAccessToken, order=authParams"` + ProjectID string `keda:"name=projectID, order=triggerMetadata"` + + TargetPipelineQueueLength int64 `keda:"name=targetPipelineQueueLength, order=triggerMetadata, default=1, optional"` + TriggerIndex int +} + +// NewGitLabRunnerScaler creates a new GitLab Runner Scaler +func NewGitLabRunnerScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + meta, err := parseGitLabRunnerMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing GitLab Runner metadata: %w", err) + } + + return &gitlabRunnerScaler{ + metricType: metricType, + metadata: meta, + httpClient: httpClient, + logger: InitializeLogger(config, "gitlab_runner_scaler"), + }, nil +} + +func parseGitLabRunnerMetadata(config *scalersconfig.ScalerConfig) (*gitlabRunnerMetadata, error) { + meta := gitlabRunnerMetadata{} + + meta.TriggerIndex = config.TriggerIndex + if err := config.TypedConfig(&meta); err != nil { + return nil, fmt.Errorf("error parsing gitlabRunner metadata: %w", err) + } + + uri := constructGitlabAPIPipelinesURL(*meta.GitLabAPIURL, meta.ProjectID, pipelineWaitingForResourceStatus) + + meta.GitLabAPIURL = &uri + + return &meta, nil +} + +func (s *gitlabRunnerScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + queueLen, err := s.getPipelineQueueLength(ctx) + + if err != nil { + s.logger.Error(err, "error getting workflow queue length") + return []external_metrics.ExternalMetricValue{}, false, err + } + + metric := GenerateMetricInMili(metricName, float64(queueLen)) + + return []external_metrics.ExternalMetricValue{metric}, queueLen >= s.metadata.TargetPipelineQueueLength, nil +} + +func (s *gitlabRunnerScaler) GetMetricSpecForScaling(_ context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("gitlab-runner-%s", s.metadata.ProjectID))), + }, + Target: GetMetricTarget(s.metricType, s.metadata.TargetPipelineQueueLength), + } + metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2.MetricSpec{metricSpec} +} + +func (s *gitlabRunnerScaler) Close(_ context.Context) error { + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + return nil +} +func constructGitlabAPIPipelinesURL(baseURL url.URL, projectID string, status string) url.URL { + baseURL.Path = "/api/v4/projects/" + projectID + "/pipelines" + + qParams := baseURL.Query() + qParams.Set("status", status) + qParams.Set("per_page", gitlabAPIPerPage) + + baseURL.RawQuery = qParams.Encode() + + return baseURL +} + +// getPipelineCount returns the number of pipelines in the GitLab project (per the page set in url) +func (s *gitlabRunnerScaler) getPipelineCount(ctx context.Context, uri string) (int64, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return 0, fmt.Errorf("creating request: %w", err) + } + + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("PRIVATE-TOKEN", s.metadata.PersonalAccessToken) + + res, err := s.httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("doing request: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return 0, fmt.Errorf("unexpected status code: %d", res.StatusCode) + } + + gitlabPipelines := make([]struct{}, 0) + if err := json.NewDecoder(res.Body).Decode(&gitlabPipelines); err != nil { + return 0, fmt.Errorf("decoding response: %w", err) + } + + return int64(len(gitlabPipelines)), nil +} + +// getPipelineQueueLength returns the number of pipelines in the +// GitLab project that are waiting for resources. +func (s *gitlabRunnerScaler) getPipelineQueueLength(ctx context.Context) (int64, error) { + var count int64 + + page := 1 + for ; page < maxGitlabAPIPageCount; page++ { + pagedURL := pagedURL(*s.metadata.GitLabAPIURL, fmt.Sprint(page)) + + gitlabPipelinesLen, err := s.getPipelineCount(ctx, pagedURL.String()) + if err != nil { + return 0, err + } + + if gitlabPipelinesLen == 0 { + break + } + + count += gitlabPipelinesLen + } + + return count, nil +} + +func pagedURL(uri url.URL, page string) url.URL { + qParams := uri.Query() + qParams.Set("page", fmt.Sprint(page)) + + uri.RawQuery = qParams.Encode() + + return uri +} diff --git a/pkg/scalers/gitlab_runner_scaler_test.go b/pkg/scalers/gitlab_runner_scaler_test.go new file mode 100644 index 00000000000..e9bb49dac7e --- /dev/null +++ b/pkg/scalers/gitlab_runner_scaler_test.go @@ -0,0 +1,424 @@ +package scalers + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + v2 "k8s.io/api/autoscaling/v2" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +func TestParseGitLabRunnerMetadata(t *testing.T) { + // Create a properly initialized ScalerConfig with valid metadata. + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: map[string]string{ + "gitlabAPIURL": "https://gitlab.com", + "projectID": "12345", + "targetPipelineQueueLength": "5", + }, + AuthParams: map[string]string{ + "personalAccessToken": "fake-token", + }, + GlobalHTTPTimeout: 10 * time.Second, + TriggerIndex: 0, + } + + // Attempt to parse the metadata. + meta, err := parseGitLabRunnerMetadata(config) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Validate the parsed metadata + if meta.GitLabAPIURL.String() != "https://gitlab.com/api/v4/projects/12345/pipelines?per_page=200&status=waiting_for_resource" { + t.Errorf("Expected URL to be correctly formed, got %v", meta.GitLabAPIURL.String()) + } + + if meta.ProjectID != "12345" { + t.Errorf("Expected projectID to be '12345', got %v", meta.ProjectID) + } + + if meta.TargetPipelineQueueLength != 5 { + t.Errorf("Expected targetPipelineQueueLength to be 5, got %v", meta.TargetPipelineQueueLength) + } + + if meta.PersonalAccessToken != "fake-token" { + t.Errorf("Expected personalAccessToken to be 'fake-token', got %v", meta.PersonalAccessToken) + } +} + +func mustParseURL(rawURL string) *url.URL { + parsed, err := url.Parse(rawURL) + if err != nil { + panic(err) + } + return parsed +} + +func TestGitLabRunnerScaler_GetPipelineCount(t *testing.T) { + testCases := []struct { + name string + responseStatus int + responseBody interface{} + expectedCount int64 + expectError bool + }{ + { + name: "Valid response with pipelines", + responseStatus: http.StatusOK, + responseBody: []map[string]interface{}{ + {"id": 1}, + {"id": 2}, + {"id": 3}, + }, + expectedCount: 3, + expectError: false, + }, + { + name: "Valid response with no pipelines", + responseStatus: http.StatusOK, + responseBody: []map[string]interface{}{}, + expectedCount: 0, + expectError: false, + }, + { + name: "Unauthorized response", + responseStatus: http.StatusUnauthorized, + responseBody: map[string]string{"message": "401 Unauthorized"}, + expectedCount: 0, + expectError: true, + }, + { + name: "Invalid JSON response", + responseStatus: http.StatusOK, + responseBody: "invalid-json", + expectedCount: 0, + expectError: true, + }, + { + name: "Internal server error", + responseStatus: http.StatusInternalServerError, + responseBody: map[string]string{"message": "500 Internal Server Error"}, + expectedCount: 0, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.responseStatus) + if err := json.NewEncoder(w).Encode(tc.responseBody); err != nil { + t.Fatalf("failed to write response: %v", err) + } + })) + defer server.Close() + + meta := &gitlabRunnerMetadata{ + GitLabAPIURL: mustParseURL(server.URL), + PersonalAccessToken: "test-token", + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + logger: logr.Discard(), + } + + count, err := scaler.getPipelineCount(context.Background(), server.URL) + if tc.expectError { + assert.Error(t, err) + assert.Equal(t, tc.expectedCount, count) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedCount, count) + } + }) + } +} + +func TestGitLabRunnerScaler_GetPipelineQueueLength(t *testing.T) { + totalPipelines := 450 // More than one page + perPage := 200 + + // Create fake pipelines + createPipelines := func(count int) []map[string]interface{} { + pipelines := make([]map[string]interface{}, count) + for i := 0; i < count; i++ { + pipelines[i] = map[string]interface{}{ + "id": i + 1, + } + } + return pipelines + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pageStr := r.URL.Query().Get("page") + page, _ := strconv.Atoi(pageStr) + start := (page - 1) * perPage + end := start + perPage + + if start >= totalPipelines { + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode([]map[string]interface{}{}) + return + } + + if end > totalPipelines { + end = totalPipelines + } + + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(createPipelines(end - start)) + })) + defer server.Close() + + meta := &gitlabRunnerMetadata{ + GitLabAPIURL: mustParseURL(server.URL), + PersonalAccessToken: "test-token", + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + logger: logr.Discard(), + } + + count, err := scaler.getPipelineQueueLength(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int64(totalPipelines), count) +} + +func TestGitLabRunnerScaler_GetMetricsAndActivity(t *testing.T) { + testCases := []struct { + name string + pipelineQueueLength int64 + targetPipelineQueueLength int64 + expectedMetricValue int64 + expectedActive bool + expectError bool + }{ + { + name: "Queue length below target", + pipelineQueueLength: 2, + targetPipelineQueueLength: 5, + expectedMetricValue: 2, + expectedActive: false, + expectError: false, + }, + { + name: "Queue length equal to target", + pipelineQueueLength: 5, + targetPipelineQueueLength: 5, + expectedMetricValue: 5, + expectedActive: true, + expectError: false, + }, + { + name: "Queue length above target", + pipelineQueueLength: 10, + targetPipelineQueueLength: 5, + expectedMetricValue: 10, + expectedActive: true, + expectError: false, + }, + { + name: "Error retrieving queue length", + pipelineQueueLength: 0, + targetPipelineQueueLength: 5, + expectedMetricValue: 0, + expectedActive: false, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.expectError { + w.WriteHeader(http.StatusInternalServerError) + return + } + + page := r.URL.Query().Get("page") + + w.WriteHeader(http.StatusOK) + + pipelines := make([]map[string]interface{}, 0, tc.pipelineQueueLength) + if page == "1" { + for i := int64(0); i < tc.pipelineQueueLength; i++ { + pipelines = append(pipelines, map[string]interface{}{ + "id": i + 1, + }) + } + } + + _ = json.NewEncoder(w).Encode(pipelines) + })) + defer server.Close() + + meta := &gitlabRunnerMetadata{ + GitLabAPIURL: mustParseURL(server.URL), + PersonalAccessToken: "test-token", + TargetPipelineQueueLength: tc.targetPipelineQueueLength, + ProjectID: "12345", + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + logger: logr.Discard(), + } + + metrics, active, err := scaler.GetMetricsAndActivity(context.Background(), "gitlab-runner-queue-length") + if tc.expectError { + assert.Error(t, err) + assert.Empty(t, metrics, "Expected no metrics") + assert.False(t, active, "Expected not active") + } else { + assert.NoError(t, err) + assert.Len(t, metrics, 1, "Expected one metric") + assert.Equal(t, float64(tc.expectedMetricValue), metrics[0].Value.AsApproximateFloat64(), "Expected metric value") + assert.Equal(t, tc.expectedActive, active, "Expected active") + } + }) + } +} + +func TestGitLabRunnerScaler_GetMetricSpecForScaling(t *testing.T) { + meta := &gitlabRunnerMetadata{ + ProjectID: "12345", + TargetPipelineQueueLength: 5, + TriggerIndex: 0, + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + metricType: v2.AverageValueMetricType, + } + + metricSpecs := scaler.GetMetricSpecForScaling(context.Background()) + assert.Len(t, metricSpecs, 1) + + metricSpec := metricSpecs[0] + assert.Equal(t, v2.ExternalMetricSourceType, metricSpec.Type) + assert.Equal(t, "s0-gitlab-runner-12345", metricSpec.External.Metric.Name) + assert.Equal(t, int64(5), metricSpec.External.Target.AverageValue.Value()) +} + +func TestGitLabRunnerScaler_Close(t *testing.T) { + meta := &gitlabRunnerMetadata{} + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + } + + err := scaler.Close(context.Background()) + assert.NoError(t, err) +} + +func TestConstructGitlabAPIPipelinesURL(t *testing.T) { + baseURL := mustParseURL("https://gitlab.example.com") + projectID := "12345" + status := "waiting_for_resource" + + expectedURL := "https://gitlab.example.com/api/v4/projects/12345/pipelines?per_page=200&status=waiting_for_resource" + + resultURL := constructGitlabAPIPipelinesURL(*baseURL, projectID, status) + assert.Equal(t, expectedURL, resultURL.String()) +} + +func TestPagedURL(t *testing.T) { + baseURL := mustParseURL("https://gitlab.example.com/api/v4/projects/12345/pipelines?per_page=200&status=waiting_for_resource") + page := "2" + + expectedURL := "https://gitlab.example.com/api/v4/projects/12345/pipelines?page=2&per_page=200&status=waiting_for_resource" + + resultURL := pagedURL(*baseURL, page) + assert.Equal(t, expectedURL, resultURL.String()) +} + +func TestGetPipelineCount_RequestError(t *testing.T) { + meta := &gitlabRunnerMetadata{ + GitLabAPIURL: mustParseURL("http://invalid-url"), + PersonalAccessToken: "test-token", + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + logger: logr.Discard(), + } + + _, err := scaler.getPipelineCount(context.Background(), "http://invalid-url") + assert.Error(t, err) +} + +func TestGetPipelineQueueLength_MaxPagesExceeded(t *testing.T) { + serverCallCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + serverCallCount++ + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode([]map[string]interface{}{ + {"id": 1}, + }) + })) + defer server.Close() + + meta := &gitlabRunnerMetadata{ + GitLabAPIURL: mustParseURL(server.URL), + PersonalAccessToken: "test-token", + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + logger: logr.Discard(), + } + + count, err := scaler.getPipelineQueueLength(context.Background()) + assert.NoError(t, err) + assert.Equal(t, int64(maxGitlabAPIPageCount), int64(serverCallCount)) + assert.Equal(t, int64(maxGitlabAPIPageCount), count) +} + +func TestGetPipelineQueueLength_RequestError(t *testing.T) { + meta := &gitlabRunnerMetadata{ + GitLabAPIURL: mustParseURL("http://invalid-url"), + PersonalAccessToken: "test-token", + } + + scaler := gitlabRunnerScaler{ + metadata: meta, + httpClient: http.DefaultClient, + logger: logr.Discard(), + } + + _, err := scaler.getPipelineQueueLength(context.Background()) + assert.Error(t, err) +} + +func TestNewGitLabRunnerScaler_InvalidMetricType(t *testing.T) { + config := &scalersconfig.ScalerConfig{ + TriggerMetadata: map[string]string{ + "projectID": "12345", + }, + AuthParams: map[string]string{ + "personalAccessToken": "test-token", + }, + MetricType: "InvalidType", + } + + _, err := NewGitLabRunnerScaler(config) + assert.Error(t, err) +} diff --git a/pkg/scalers/scalersconfig/typed_config.go b/pkg/scalers/scalersconfig/typed_config.go index ac485fe91f2..90ce16bee18 100644 --- a/pkg/scalers/scalersconfig/typed_config.go +++ b/pkg/scalers/scalersconfig/typed_config.go @@ -295,6 +295,22 @@ func setConfigValueURLParams(params Params, valFromConfig string, field reflect. return nil } +// setConfigValueURL is a function that sets the value of the url.URL field +func setConfigValueURL(valFromConfig string, field reflect.Value) error { + u, err := url.Parse(valFromConfig) + if err != nil { + return fmt.Errorf("expected url.URL or *url.URL, unable to parse %q: %w", valFromConfig, err) + } + + // If the field type is a pointer to url.URL (`*url.URL`), set the value directly + if field.Kind() == reflect.Ptr && field.Type().Elem() == reflect.TypeOf(url.URL{}) { + field.Set(reflect.ValueOf(u)) + return nil + } + + return nil +} + // setConfigValueMap is a function that sets the value of the map field func setConfigValueMap(params Params, valFromConfig string, field reflect.Value) error { field.Set(reflect.MakeMap(reflect.MapOf(field.Type().Key(), field.Type().Elem()))) @@ -404,6 +420,9 @@ func setConfigValueHelper(params Params, valFromConfig string, field reflect.Val if field.Type() == reflect.TypeOf(url.Values{}) { return setConfigValueURLParams(params, valFromConfig, field) } + if field.Type() == reflect.TypeOf(&url.URL{}) { + return setConfigValueURL(valFromConfig, field) + } if field.Kind() == reflect.Map { return setConfigValueMap(params, valFromConfig, field) } diff --git a/pkg/scalers/scalersconfig/typed_config_test.go b/pkg/scalers/scalersconfig/typed_config_test.go index c17952d9908..231c57cc336 100644 --- a/pkg/scalers/scalersconfig/typed_config_test.go +++ b/pkg/scalers/scalersconfig/typed_config_test.go @@ -333,6 +333,29 @@ func TestURLValues(t *testing.T) { Expect(ts.EndpointParams["key2"]).To(ConsistOf("value2")) } +func TestSetConfigValueURL(t *testing.T) { + RegisterTestingT(t) + + sc := &ScalerConfig{ + AuthParams: map[string]string{ + "endpoint": "https://example.com/path?query=1", + }, + } + + type testStruct struct { + Endpoint *url.URL `keda:"name=endpoint, order=authParams"` + } + + ts := testStruct{} + err := sc.TypedConfig(&ts) + Expect(err).To(BeNil()) + Expect(ts.Endpoint).ToNot(BeNil()) + Expect(ts.Endpoint.Scheme).To(Equal("https")) + Expect(ts.Endpoint.Host).To(Equal("example.com")) + Expect(ts.Endpoint.Path).To(Equal("/path")) + Expect(ts.Endpoint.RawQuery).To(Equal("query=1")) +} + // TestGenericMap tests the generic map type that is structurally similar to url.Values func TestGenericMap(t *testing.T) { RegisterTestingT(t) diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 11d294bbec3..4242f58d660 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -190,6 +190,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewGcsScaler(config) case "github-runner": return scalers.NewGitHubRunnerScaler(config) + case "gitlab-runner": + return scalers.NewGitLabRunnerScaler(config) case "graphite": return scalers.NewGraphiteScaler(config) case "huawei-cloudeye":