diff --git a/CHANGELOG.md b/CHANGELOG.md index df2a1987770..be694eb88b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) #### Experimental diff --git a/pkg/scalers/nsq_scaler.go b/pkg/scalers/nsq_scaler.go new file mode 100644 index 00000000000..651f1947d92 --- /dev/null +++ b/pkg/scalers/nsq_scaler.go @@ -0,0 +1,369 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "sync" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type nsqScaler struct { + metricType v2.MetricTargetType + metadata nsqMetadata + httpClient *http.Client + scheme string + logger logr.Logger +} + +type nsqMetadata struct { + NSQLookupdHTTPAddresses []string `keda:"name=nsqLookupdHTTPAddresses, order=triggerMetadata;resolvedEnv"` + Topic string `keda:"name=topic, order=triggerMetadata;resolvedEnv"` + Channel string `keda:"name=channel, order=triggerMetadata;resolvedEnv"` + DepthThreshold int64 `keda:"name=depthThreshold, order=triggerMetadata;resolvedEnv, default=10"` + ActivationDepthThreshold int64 `keda:"name=activationDepthThreshold, order=triggerMetadata;resolvedEnv, default=0"` + UseHTTPS bool `keda:"name=useHttps, order=triggerMetadata;resolvedEnv, default=false"` + UnsafeSSL bool `keda:"name=unsafeSsl, order=triggerMetadata;resolvedEnv, default=false"` + + triggerIndex int +} + +const ( + nsqMetricType = "External" +) + +func NewNSQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + logger := InitializeLogger(config, "nsq_scaler") + + nsqMetadata, err := parseNSQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing NSQ metadata: %w", err) + } + + scheme := "http" + if nsqMetadata.UseHTTPS { + scheme = "https" + } + + return &nsqScaler{ + metricType: metricType, + metadata: nsqMetadata, + httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, nsqMetadata.UnsafeSSL), + scheme: scheme, + logger: logger, + }, nil +} + +func (m nsqMetadata) Validate() error { + if len(m.NSQLookupdHTTPAddresses) == 0 { + return fmt.Errorf("no nsqLookupdHTTPAddresses given") + } + + if m.DepthThreshold <= 0 { + return fmt.Errorf("depthThreshold must be a positive integer") + } + + if m.ActivationDepthThreshold < 0 { + return fmt.Errorf("activationDepthThreshold must be greater than or equal to 0") + } + + return nil +} + +func parseNSQMetadata(config *scalersconfig.ScalerConfig) (nsqMetadata, error) { + meta := nsqMetadata{triggerIndex: config.TriggerIndex} + if err := config.TypedConfig(&meta); err != nil { + return meta, fmt.Errorf("error parsing nsq metadata: %w", err) + } + + return meta, nil +} + +func (s nsqScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + depth, err := s.getTopicChannelDepth(ctx) + + if err != nil { + return []external_metrics.ExternalMetricValue{}, false, err + } + + s.logger.V(1).Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth) + + metric := GenerateMetricInMili(metricName, float64(depth)) + + return []external_metrics.ExternalMetricValue{metric}, depth > s.metadata.ActivationDepthThreshold, nil +} + +func (s nsqScaler) getTopicChannelDepth(ctx context.Context) (int64, error) { + nsqdHosts, err := s.getTopicProducers(ctx, s.metadata.Topic) + if err != nil { + return -1, fmt.Errorf("error getting nsqd hosts: %w", err) + } + + if len(nsqdHosts) == 0 { + s.logger.V(1).Info("no nsqd hosts found for topic", "topic", s.metadata.Topic) + return 0, nil + } + + depth, err := s.aggregateDepth(ctx, nsqdHosts, s.metadata.Topic, s.metadata.Channel) + if err != nil { + return -1, fmt.Errorf("error getting topic/channel depth: %w", err) + } + + return depth, nil +} + +func (s nsqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + metricName := fmt.Sprintf("nsq-%s-%s", s.metadata.Topic, s.metadata.Channel) + + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)), + }, + Target: GetMetricTarget(s.metricType, s.metadata.DepthThreshold), + } + metricSpec := v2.MetricSpec{External: externalMetric, Type: nsqMetricType} + return []v2.MetricSpec{metricSpec} +} + +func (s nsqScaler) Close(context.Context) error { + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + return nil +} + +type lookupResponse struct { + Producers []struct { + HTTPPort int `json:"http_port"` + BroadcastAddress string `json:"broadcast_address"` + } +} + +type lookupResult struct { + host string + lookupResponse *lookupResponse + err error +} + +func (s *nsqScaler) getTopicProducers(ctx context.Context, topic string) ([]string, error) { + var wg sync.WaitGroup + resultCh := make(chan lookupResult, len(s.metadata.NSQLookupdHTTPAddresses)) + + for _, host := range s.metadata.NSQLookupdHTTPAddresses { + wg.Add(1) + go func(host string, topic string) { + defer wg.Done() + resp, err := s.getLookup(ctx, host, topic) + resultCh <- lookupResult{host, resp, err} + }(host, topic) + } + + wg.Wait() + close(resultCh) + + var nsqdHostMap = make(map[string]bool) + for result := range resultCh { + if result.err != nil { + return nil, fmt.Errorf("error getting lookup from host '%s': %w", result.host, result.err) + } + + if result.lookupResponse == nil { + // topic is not found on a single nsqlookupd host, it may exist on another + continue + } + + for _, producer := range result.lookupResponse.Producers { + nsqdHost := net.JoinHostPort(producer.BroadcastAddress, strconv.Itoa(producer.HTTPPort)) + nsqdHostMap[nsqdHost] = true + } + } + + var nsqdHosts []string + for nsqdHost := range nsqdHostMap { + nsqdHosts = append(nsqdHosts, nsqdHost) + } + + return nsqdHosts, nil +} + +func (s *nsqScaler) getLookup(ctx context.Context, host string, topic string) (*lookupResponse, error) { + lookupURL := url.URL{ + Scheme: s.scheme, + Host: host, + Path: "lookup", + } + req, err := http.NewRequestWithContext(ctx, "GET", lookupURL.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json; charset=utf-8") + + params := url.Values{"topic": {topic}} + req.URL.RawQuery = params.Encode() + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return nil, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code '%s'", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var lookupResponse lookupResponse + err = json.Unmarshal(body, &lookupResponse) + if err != nil { + return nil, err + } + + return &lookupResponse, nil +} + +type statsResponse struct { + Topics []struct { + TopicName string `json:"topic_name"` + Depth int64 `json:"depth"` + Channels []struct { + ChannelName string `json:"channel_name"` + Depth int64 `json:"depth"` // num messages in the queue (mem + disk) + Paused bool `json:"paused"` // if paused, consumers will not receive messages + } + } +} + +type statsResult struct { + host string + statsResponse *statsResponse + err error +} + +func (s *nsqScaler) aggregateDepth(ctx context.Context, nsqdHosts []string, topic string, channel string) (int64, error) { + wg := sync.WaitGroup{} + resultCh := make(chan statsResult, len(nsqdHosts)) + + for _, host := range nsqdHosts { + wg.Add(1) + go func(host string, topic string) { + defer wg.Done() + resp, err := s.getStats(ctx, host, topic) + resultCh <- statsResult{host, resp, err} + }(host, topic) + } + + wg.Wait() + close(resultCh) + + var depth int64 + for result := range resultCh { + if result.err != nil { + return -1, fmt.Errorf("error getting stats from host '%s': %w", result.host, result.err) + } + + for _, t := range result.statsResponse.Topics { + if t.TopicName != topic { + // this should never happen as we make the /stats call with the "topic" param + continue + } + + if len(t.Channels) == 0 { + // topic exists with no channels, but there are messages in the topic -> we should still scale to bootstrap + s.logger.V(1).Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host) + depth += t.Depth + continue + } + + channelExists := false + for _, ch := range t.Channels { + if ch.ChannelName != channel { + continue + } + channelExists = true + if ch.Paused { + // if it's paused on a single nsqd host, it's depth should not go into the aggregate + // meaning if paused on all nsqd hosts => depth == 0 + s.logger.V(1).Info("channel is paused", "topic", topic, "channel", channel, "host", result.host) + continue + } + depth += ch.Depth + } + if !channelExists { + // topic exists with channels, but not the one in question - fallback to topic depth + s.logger.V(1).Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host) + depth += t.Depth + } + } + } + + return depth, nil +} + +func (s *nsqScaler) getStats(ctx context.Context, host string, topic string) (*statsResponse, error) { + statsURL := url.URL{ + Scheme: s.scheme, + Host: host, + Path: "stats", + } + req, err := http.NewRequestWithContext(ctx, "GET", statsURL.String(), nil) + if err != nil { + return nil, err + } + + // "channel" is a query param as well, but if used and the channel does not exist + // we do not receive any stats for the existing topic + params := url.Values{ + "format": {"json"}, + "include_clients": {"false"}, + "include_mem": {"false"}, + "topic": {topic}, + } + req.URL.RawQuery = params.Encode() + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code '%s'", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var statsResponse statsResponse + err = json.Unmarshal(body, &statsResponse) + if err != nil { + return nil, err + } + + return &statsResponse, nil +} diff --git a/pkg/scalers/nsq_scaler_test.go b/pkg/scalers/nsq_scaler_test.go new file mode 100644 index 00000000000..956f9f1621e --- /dev/null +++ b/pkg/scalers/nsq_scaler_test.go @@ -0,0 +1,664 @@ +package scalers + +import ( + "context" + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" + v2 "k8s.io/api/autoscaling/v2" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +type nsqMetadataTestData struct { + metadata map[string]string + numNSQLookupdHTTPAddresses int + nsqLookupdHTTPAddresses []string + topic string + channel string + depthThreshold int64 + activationDepthThreshold int64 + useHTTPS bool + unsafeSsl bool + isError bool + description string +} + +type nsqMetricIdentifier struct { + metadataTestData *nsqMetadataTestData + triggerIndex int + name string + metricType string +} + +var parseNSQMetadataTestDataset = []nsqMetadataTestData{ + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel"}, + numNSQLookupdHTTPAddresses: 1, + nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161"}, + topic: "topic", + channel: "channel", + depthThreshold: 10, + activationDepthThreshold: 0, + isError: false, + description: "Success", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161,nsqlookupd-1:4161", "topic": "topic", "channel": "channel"}, + numNSQLookupdHTTPAddresses: 2, + nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161", "nsqlookupd-1:4161"}, + topic: "topic", + channel: "channel", + depthThreshold: 10, + activationDepthThreshold: 0, + isError: false, + description: "Success, multiple nsqlookupd addresses", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "depthThreshold": "100", "activationDepthThreshold": "1", "useHttps": "true", "unsafeSsl": "true"}, + numNSQLookupdHTTPAddresses: 1, + nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161"}, + topic: "topic", + channel: "channel", + depthThreshold: 100, + activationDepthThreshold: 1, + useHTTPS: true, + unsafeSsl: true, + isError: false, + description: "Success - setting optional fields", + }, + { + metadata: map[string]string{"topic": "topic", "channel": "channel"}, + isError: true, + description: "Error, no nsqlookupd addresses", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "channel": "channel"}, + isError: true, + description: "Error, no topic", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic"}, + isError: true, + description: "Error, no channel", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "depthThreshold": "0"}, + isError: true, + description: "Error, depthThreshold is <=0", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "activationDepthThreshold": "-1"}, + isError: true, + description: "Error, activationDepthThreshold is <0", + }, +} + +var nsqMetricIdentifiers = []nsqMetricIdentifier{ + {&parseNSQMetadataTestDataset[0], 0, "s0-nsq-topic-channel", "Value"}, + {&parseNSQMetadataTestDataset[0], 1, "s1-nsq-topic-channel", "AverageValue"}, +} + +func TestNSQParseMetadata(t *testing.T) { + for _, testData := range parseNSQMetadataTestDataset { + config := scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata} + + meta, err := parseNSQMetadata(&config) + if err != nil { + if testData.isError { + continue + } + t.Error("Expected success, got error", err, testData.description) + } + if err == nil && testData.isError { + t.Error("Expected error, got success", testData.description) + } + + assert.Equal(t, testData.numNSQLookupdHTTPAddresses, len(meta.NSQLookupdHTTPAddresses), testData.description) + assert.Equal(t, testData.nsqLookupdHTTPAddresses, meta.NSQLookupdHTTPAddresses, testData.description) + assert.Equal(t, testData.topic, meta.Topic, testData.description) + assert.Equal(t, testData.channel, meta.Channel, testData.description) + assert.Equal(t, testData.depthThreshold, meta.DepthThreshold, testData.description) + assert.Equal(t, testData.activationDepthThreshold, meta.ActivationDepthThreshold, testData.description) + assert.Equal(t, testData.useHTTPS, meta.UseHTTPS, testData.description) + assert.Equal(t, testData.unsafeSsl, meta.UnsafeSSL, testData.description) + } +} + +func TestNSQGetMetricsAndActivity(t *testing.T) { + type testCase struct { + lookupError bool + statsError bool + expectedDepth int64 + expectedActive bool + activationdDepthThreshold int64 + } + testCases := []testCase{ + { + lookupError: true, + }, + { + statsError: true, + }, + { + expectedDepth: 100, + expectedActive: true, + }, + { + expectedDepth: 0, + expectedActive: false, + }, + { + expectedDepth: 9, + activationdDepthThreshold: 10, + expectedActive: false, + }, + } + for _, tc := range testCases { + mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprintf(w, `{"topics":[{"topic_name":"topic","channels":[{"channel_name":"channel","depth":%d}]}]}`, tc.expectedDepth) + })) + defer mockNSQdServer.Close() + + parsedNSQdURL, err := url.Parse(mockNSQdServer.URL) + assert.Nil(t, err) + + mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port()) + })) + defer mockNSQLookupdServer.Close() + + parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL) + assert.Nil(t, err) + + nsqlookupdHost := net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port()) + + config := scalersconfig.ScalerConfig{TriggerMetadata: map[string]string{ + "nsqLookupdHTTPAddresses": nsqlookupdHost, + "topic": "topic", + "channel": "channel", + "activationDepthThreshold": fmt.Sprintf("%d", tc.activationdDepthThreshold), + }} + meta, err := parseNSQMetadata(&config) + assert.Nil(t, err) + + s := nsqScaler{v2.AverageValueMetricType, meta, http.DefaultClient, "http", logr.Discard()} + + metricName := "s0-nsq-topic-channel" + metrics, activity, err := s.GetMetricsAndActivity(context.Background(), metricName) + + if err != nil && (tc.lookupError || tc.statsError) { + assert.Equal(t, 0, len(metrics)) + assert.False(t, activity) + continue + } + + assert.Nil(t, err) + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, metricName, metrics[0].MetricName) + assert.Equal(t, tc.expectedDepth, metrics[0].Value.Value()) + if tc.expectedActive { + assert.True(t, activity) + } else { + assert.False(t, activity) + } + } +} + +func TestNSQGetMetricSpecForScaling(t *testing.T) { + for _, testData := range nsqMetricIdentifiers { + meta, err := parseNSQMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + metricType := v2.MetricTargetType(testData.metricType) + mockNSQScaler := nsqScaler{metricType, meta, nil, "http", logr.Discard()} + + metricSpec := mockNSQScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + assert.Equal(t, testData.name, metricName) + assert.Equal(t, 1, len(metricSpec)) + assert.Equal(t, metricType, metricSpec[0].External.Target.Type) + depthThreshold := meta.DepthThreshold + if metricType == v2.AverageValueMetricType { + assert.Equal(t, depthThreshold, metricSpec[0].External.Target.AverageValue.Value()) + } else { + assert.Equal(t, depthThreshold, metricSpec[0].External.Target.Value.Value()) + } + } +} + +func TestNSQGetTopicChannelDepth(t *testing.T) { + type testCase struct { + lookupError bool + topicNotExist bool + producersNotExist bool + statsError bool + channelPaused bool + expectedDepth int64 + description string + } + testCases := []testCase{ + { + lookupError: true, + description: "nsqlookupd call failed", + }, + { + topicNotExist: true, + expectedDepth: 0, + description: "Topic does not exist", + }, + { + producersNotExist: true, + expectedDepth: 0, + description: "No producers for topic", + }, + { + statsError: true, + description: "nsqd call failed", + }, + { + channelPaused: true, + expectedDepth: 0, + description: "Channel is paused", + }, + { + expectedDepth: 100, + description: "successfully retrieved depth", + }, + } + + for _, tc := range testCases { + mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.statsError { + w.WriteHeader(http.StatusInternalServerError) + return + } + if tc.channelPaused { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`) + return + } + + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`) + })) + defer mockNSQdServer.Close() + + parsedNSQdURL, err := url.Parse(mockNSQdServer.URL) + assert.Nil(t, err) + + mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.lookupError { + w.WriteHeader(http.StatusInternalServerError) + return + } + if tc.topicNotExist { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"message": "TOPIC_NOT_FOUND"}`) + return + } + if tc.producersNotExist { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"producers":[]}`) + return + } + + w.WriteHeader(http.StatusOK) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port()) + })) + defer mockNSQLookupdServer.Close() + + parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL) + assert.Nil(t, err) + + nsqLookupdHosts := []string{net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port())} + + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http", metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts, Topic: "topic", Channel: "channel"}} + + depth, err := s.getTopicChannelDepth(context.Background()) + + if err != nil && (tc.lookupError || tc.statsError) { + continue + } + + assert.Nil(t, err) + assert.Equal(t, tc.expectedDepth, depth) + } +} + +func TestNSQGetTopicProducers(t *testing.T) { + type statusAndResponse struct { + status int + response string + } + type testCase struct { + statusAndResponses []statusAndResponse + expectedNSQdHosts []string + isError bool + description string + } + testCases := []testCase{ + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[], "channels":[]}`}, + }, + expectedNSQdHosts: []string{}, + description: "No producers or channels", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + }, + expectedNSQdHosts: []string{"nsqd-0:4161"}, + description: "Single nsqd host", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}, {"broadcast_address":"nsqd-1","http_port":4161}]}`}, + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-2","http_port":8161}]}`}, + }, + expectedNSQdHosts: []string{"nsqd-0:4161", "nsqd-1:4161", "nsqd-2:8161"}, + description: "Multiple nsqd hosts", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + }, + expectedNSQdHosts: []string{"nsqd-0:4161"}, + description: "De-dupe nsqd hosts", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + {http.StatusInternalServerError, ""}, + }, + isError: true, + description: "At least one host responded with error", + }, + } + + for _, tc := range testCases { + callCount := atomic.NewInt32(-1) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount.Inc() + w.WriteHeader(tc.statusAndResponses[callCount.Load()].status) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + var nsqLookupdHosts []string + nsqLookupdHost := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + for i := 0; i < len(tc.statusAndResponses); i++ { + nsqLookupdHosts = append(nsqLookupdHosts, nsqLookupdHost) + } + + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http", metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts}} + + nsqdHosts, err := s.getTopicProducers(context.Background(), "topic") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err) + assert.ElementsMatch(t, tc.expectedNSQdHosts, nsqdHosts) + } +} + +func TestNSQGetLookup(t *testing.T) { + type testCase struct { + serverStatus int + serverResponse string + isError bool + description string + } + testCases := []testCase{ + { + serverStatus: http.StatusNotFound, + serverResponse: `{"message": "TOPIC_NOT_FOUND"}`, + isError: false, + description: "Topic does not exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"producers":[{"broadcast_address":"nsqd-0","http_port":4151}], "channels":[]}`, + isError: false, + description: "Channel does not exist", + }, + { + serverStatus: http.StatusNotFound, + serverResponse: `{"producers":[], "channels":["channel"]}`, + isError: false, + description: "No nsqd producers exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"producers":[{"broadcast_address":"nsqd-0", "http_port":4151}], "channels":["channel"]}`, + isError: false, + description: "Topic and channel exist with nsqd producers", + }, + { + serverStatus: http.StatusInternalServerError, + isError: true, + description: "Host responds with error", + }, + } + + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"} + for _, tc := range testCases { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.serverStatus) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.serverResponse) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + host := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + + resp, err := s.getLookup(context.Background(), host, "topic") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err, tc.description) + + if tc.serverStatus != http.StatusNotFound { + assert.NotNil(t, resp, tc.description) + } else { + assert.Nil(t, resp, tc.description) + } + } +} + +func TestNSQAggregateDepth(t *testing.T) { + type statusAndResponse struct { + status int + response string + } + type testCase struct { + statusAndResponses []statusAndResponse + expectedDepth int64 + isError bool + description string + } + testCases := []testCase{ + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":null}`}, + }, + expectedDepth: 0, + isError: false, + description: "Topic does not exist", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[]}]}`}, + }, + expectedDepth: 250, + isError: false, + description: "Topic exists with no channels", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"other_channel", "depth":100}]}]}`}, + }, + expectedDepth: 250, + isError: false, + description: "Topic exists with different channels", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + }, + expectedDepth: 100, + isError: false, + description: "Topic and channel exist", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`}, + }, + expectedDepth: 0, + isError: false, + description: "Channel is paused", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":50}]}]}`}, + }, + expectedDepth: 150, + isError: false, + description: "Sum multiple depth values", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":500, "channels":[]}]}`}, + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":400, "channels":[{"channel_name":"other_channel", "depth":300}]}]}`}, + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":200, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + }, + expectedDepth: 1000, + isError: false, + description: "Channel doesn't exist on all nsqd hosts", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + {http.StatusInternalServerError, ""}, + }, + expectedDepth: -1, + isError: true, + description: "At least one host responded with error", + }, + } + + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"} + for _, tc := range testCases { + callCount := atomic.NewInt32(-1) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount.Inc() + w.WriteHeader(tc.statusAndResponses[callCount.Load()].status) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + var nsqdHosts []string + nsqdHost := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + for i := 0; i < len(tc.statusAndResponses); i++ { + nsqdHosts = append(nsqdHosts, nsqdHost) + } + + depth, err := s.aggregateDepth(context.Background(), nsqdHosts, "topic", "channel") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err, tc.description) + assert.Equal(t, tc.expectedDepth, depth, tc.description) + } +} + +func TestNSQGetStats(t *testing.T) { + type testCase struct { + serverStatus int + serverResponse string + isError bool + description string + } + testCases := []testCase{ + { + serverStatus: http.StatusOK, + serverResponse: `{"topics":null}`, + isError: false, + description: "Topic does not exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"topics":[{"topic_name":"topic", "depth":250, "channels":[]}]}`, + isError: false, + description: "Channel does not exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":250}]}]}`, + isError: false, + description: "Topic and channel exist", + }, + { + serverStatus: http.StatusInternalServerError, + isError: true, + description: "Host responds with error", + }, + } + + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"} + for _, tc := range testCases { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.serverStatus) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.serverResponse) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + host := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + resp, err := s.getStats(context.Background(), host, "topic") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err, tc.description) + assert.NotNil(t, resp, tc.description) + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 80bfb40658f..9c44ac5004e 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -217,6 +217,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewNATSJetStreamScaler(config) case "new-relic": return scalers.NewNewRelicScaler(config) + case "nsq": + return scalers.NewNSQScaler(config) case "openstack-metric": return scalers.NewOpenstackMetricScaler(ctx, config) case "openstack-swift": diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go new file mode 100644 index 00000000000..cb81c506bdd --- /dev/null +++ b/tests/scalers/nsq/nsq_test.go @@ -0,0 +1,224 @@ +//go:build e2e +// +build e2e + +package nsq_test + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +var _ = godotenv.Load("../../.env") + +const ( + testName = "nsq-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-consumer-deployment", testName) + jobName = fmt.Sprintf("%s-producer-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + nsqNamespace = "nsq" + nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" + minReplicaCount = 0 + maxReplicaCount = 2 + topicName = "test_topic" + channelName = "test_channel" + depthThreshold = 1 + activationDepthThreshold = 5 +) + +const ( + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - image: ghcr.io/kedacore/tests-nsq:latest + name: {{.DeploymentName}} + args: + - "--mode=consumer" + - "--topic={{.TopicName}}" + - "--channel={{.ChannelName}}" + - "--sleep-duration=1s" + - "--nsqlookupd-http-address=nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" + imagePullPolicy: Always +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 10 + maxReplicaCount: {{.MaxReplicaCount}} + minReplicaCount: {{.MinReplicaCount}} + scaleTargetRef: + apiVersion: "apps/v1" + kind: "Deployment" + name: {{.DeploymentName}} + triggers: + - type: nsq + metricType: "AverageValue" + metadata: + nsqLookupdHTTPAddresses: "nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" + topic: "{{.TopicName}}" + channel: "{{.ChannelName}}" + depthThreshold: "{{.DepthThreshold}}" + activationDepthThreshold: "{{.ActivationDepthThreshold}}" +` + + jobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - image: ghcr.io/kedacore/tests-nsq:latest + name: {{.JobName}} + args: + - "--mode=producer" + - "--topic={{.TopicName}}" + - "--nsqd-tcp-address=nsq-nsqd.{{.NSQNamespace}}.svc.cluster.local:4150" + - "--message-count={{.MessageCount}}" + imagePullPolicy: Always + restartPolicy: Never +` +) + +type templateData struct { + TestNamespace string + NSQNamespace string + DeploymentName string + ScaledObjectName string + JobName string + MinReplicaCount int + MaxReplicaCount int + TopicName string + ChannelName string + DepthThreshold int + ActivationDepthThreshold int + MessageCount int +} + +func TestNSQScaler(t *testing.T) { + kc := GetKubernetesClient(t) + + t.Cleanup(func() { + data, templates := getTemplateData() + uninstallNSQ(t) + KubectlDeleteWithTemplate(t, data, "jobTemplate", jobTemplate) + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + + installNSQ(t, kc) + + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + require.True(t, WaitForPodsTerminated(t, kc, fmt.Sprintf("app=%s", deploymentName), testNamespace, 60, 1), + "Replica count should start out as 0") + + testActivation(t, kc, data) + testScaleOut(t, kc, data) + testScaleIn(t, kc) +} + +func installNSQ(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- installing NSQ ---") + CreateNamespace(t, kc, nsqNamespace) + + _, err := ExecuteCommand("which helm") + require.NoErrorf(t, err, "nsq test requires helm - %s", err) + + _, err = ExecuteCommand(fmt.Sprintf("helm repo add nsqio %s", nsqHelmRepoURL)) + require.NoErrorf(t, err, "error while adding nsqio helm repo - %s", err) + + _, err = ExecuteCommand(fmt.Sprintf("helm install nsq nsqio/nsq --namespace %s --set nsqd.replicaCount=1 --set nsqlookupd.replicaCount=1 --set nsqadmin.enabled=false --wait", nsqNamespace)) + require.NoErrorf(t, err, "error while installing nsq - %s", err) +} + +func uninstallNSQ(t *testing.T) { + t.Log("--- uninstalling NSQ ---") + _, err := ExecuteCommand(fmt.Sprintf("helm uninstall nsq --namespace %s", nsqNamespace)) + require.NoErrorf(t, err, "error while uninstalling nsq - %s", err) + DeleteNamespace(t, nsqNamespace) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + NSQNamespace: nsqNamespace, + DeploymentName: deploymentName, + JobName: jobName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + TopicName: topicName, + ChannelName: channelName, + DepthThreshold: depthThreshold, + ActivationDepthThreshold: activationDepthThreshold, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation ---") + + data.MessageCount = activationDepthThreshold + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 20) + + data.MessageCount = 1 + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), + "replica count should reach 1 in under 1 minute") +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + + data.MessageCount = 80 + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 1), + "replica count should reach 2 in under 1 minute") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +}