Skip to content

Commit

Permalink
Disable prometheus port when not allowed (#1078) and add unit test fo…
Browse files Browse the repository at this point in the history
…r opentelemetry (#1088)
  • Loading branch information
tiationg-kho committed Nov 28, 2024
1 parent 66402db commit c6bd492
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 9 deletions.
31 changes: 22 additions & 9 deletions pkg/observability/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package observability
import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -36,6 +38,7 @@ var (
labelNodeStatusKey = attribute.Key("node/status")
labelNodeNameKey = attribute.Key("node/name")
labelEventIDKey = attribute.Key("node/event-id")
metricsEndpoint = "/metrics"
)

// Metrics represents the stats for observability
Expand All @@ -62,13 +65,14 @@ func InitMetrics(enabled bool, port int) (Metrics, error) {

// Starts an async process to collect golang runtime stats
// go.opentelemetry.io/contrib/instrumentation/runtime
if err = runtime.Start(
runtime.WithMeterProvider(provider),
runtime.WithMinimumReadMemStatsInterval(1*time.Second)); err != nil {
err = runtime.Start(runtime.WithMeterProvider(provider), runtime.WithMinimumReadMemStatsInterval(1*time.Second))
if err != nil {
return Metrics{}, fmt.Errorf("failed to start Go runtime metrics collection: %w", err)
}

go serveMetrics(port)
if enabled {
serveMetrics(port)
}

return metrics, nil
}
Expand Down Expand Up @@ -135,10 +139,19 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) {
}, nil
}

func serveMetrics(port int) {
log.Info().Msgf("Starting to serve handler /metrics, port %d", port)
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
log.Err(err).Msg("Failed to listen and serve http server")
func serveMetrics(port int) *http.Server {
http.Handle(metricsEndpoint, promhttp.Handler())

server := &http.Server{
Addr: net.JoinHostPort("", strconv.Itoa(port)),
}

go func() {
log.Info().Msgf("Starting to serve handler %s, port %d", metricsEndpoint, port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Err(err).Msg("Failed to listen and serve http server")
}
}()

return server
}
238 changes: 238 additions & 0 deletions pkg/observability/opentelemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package observability

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"

h "github.com/aws/aws-node-termination-handler/pkg/test"
)

var (
mockNth = "aws.node.termination.handler"
mockErrorEvent = "mockErrorEvent"
mockAction = "cordon-and-drain"
mockNodeName1 = "nodeName1"
mockNodeName2 = "nodeName2"
mockNodeName3 = "nodeName3"
mockEventID1 = "eventID1"
mockEventID2 = "eventID2"
mockEventID3 = "eventID3"
successStatus = "success"
errorStatus = "error"
mockDefaultPort = 9092
mockClosedPort = 9093
)

func TestInitMetrics(t *testing.T) {
getMetrics(t)

rr := mockMetricsRequest()

validateStatus(t, rr)

metricsMap := getMetricsMap(rr.Body.String())

runtimeMetrics := []string{
"go_gc_gogc_percent",
"go_memstats_frees_total",
"go_goroutines",
}

for _, metricName := range runtimeMetrics {
_, exists := metricsMap[metricName]
h.Assert(t, exists, fmt.Sprintf("%v metric should be present", metricName))
}
}

func TestErrorEventsInc(t *testing.T) {
metrics := getMetrics(t)

metrics.ErrorEventsInc(mockErrorEvent)

rr := mockMetricsRequest()

validateStatus(t, rr)

metricsMap := getMetricsMap(rr.Body.String())

validateEventErrorTotal(t, metricsMap, 1)
validateActionTotalV2(t, metricsMap, 0, successStatus)
validateActionTotalV2(t, metricsMap, 0, errorStatus)
}

func TestNodeActionsInc(t *testing.T) {
metrics := getMetrics(t)

metrics.NodeActionsInc(mockAction, mockNodeName1, mockEventID1, nil)
metrics.NodeActionsInc(mockAction, mockNodeName2, mockEventID2, nil)
metrics.NodeActionsInc(mockAction, mockNodeName3, mockEventID3, errors.New("mockError"))

rr := mockMetricsRequest()

validateStatus(t, rr)

metricsMap := getMetricsMap(rr.Body.String())

validateEventErrorTotal(t, metricsMap, 0)
validateActionTotalV2(t, metricsMap, 2, successStatus)
validateActionTotalV2(t, metricsMap, 1, errorStatus)
}

func TestRegisterMetricsWith(t *testing.T) {
const errorEventMetricsTotal = 23
const successActionMetricsTotal = 31
const errorActionMetricsTotal = 97

metrics := getMetrics(t)

errorEventLables := []attribute.KeyValue{labelEventErrorWhereKey.String(mockErrorEvent)}
successActionLables := []attribute.KeyValue{labelNodeActionKey.String(mockAction), labelNodeStatusKey.String(successStatus)}
errorActionLables := []attribute.KeyValue{labelNodeActionKey.String(mockAction), labelNodeStatusKey.String(errorStatus)}

for i := 0; i < errorEventMetricsTotal; i++ {
metrics.errorEventsCounter.Add(context.Background(), 1, api.WithAttributes(errorEventLables...))
}
for i := 0; i < successActionMetricsTotal; i++ {
metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(successActionLables...))
}
for i := 0; i < errorActionMetricsTotal; i++ {
metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionLables...))
}

rr := mockMetricsRequest()

validateStatus(t, rr)

metricsMap := getMetricsMap(rr.Body.String())

validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal)
validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus)
validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus)
}

func TestServeMetrics(t *testing.T) {
server := serveMetrics(mockDefaultPort)

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)

Check failure on line 146 in pkg/observability/opentelemetry_test.go

View workflow job for this annotation

GitHub Actions / Lint Eastwood

Error return value of `server.Shutdown` is not checked (errcheck)
}()

time.Sleep(100 * time.Millisecond)

conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", mockDefaultPort), time.Second)
if err != nil {
t.Errorf("server not listening on port %d: %v", mockDefaultPort, err)
}
conn.Close()

conn, err = net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", mockClosedPort), time.Second)
if err == nil {
conn.Close()
t.Errorf("server should not listening on port %d: %v", mockClosedPort, err)
}
}

func getMetrics(t *testing.T) *Metrics {
exporter, err := prometheus.New()
if err != nil {
t.Errorf("failed to create Prometheus exporter: %v", err)
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
metrics, err := registerMetricsWith(provider)
if err != nil {
t.Errorf("failed to register metrics with Prometheus provider: %v", err)
}
metrics.enabled = true

t.Cleanup(func() {
if provider != nil {
provider.Shutdown(context.Background())

Check failure on line 178 in pkg/observability/opentelemetry_test.go

View workflow job for this annotation

GitHub Actions / Lint Eastwood

Error return value of `provider.Shutdown` is not checked (errcheck)
}
if exporter != nil {
exporter.Shutdown(context.Background())

Check failure on line 181 in pkg/observability/opentelemetry_test.go

View workflow job for this annotation

GitHub Actions / Lint Eastwood

Error return value of `exporter.Shutdown` is not checked (errcheck)
}
})

return &metrics
}

func mockMetricsRequest() *httptest.ResponseRecorder {
handler := promhttp.Handler()
req := httptest.NewRequest("GET", metricsEndpoint, nil)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
return rr
}

func validateStatus(t *testing.T, rr *httptest.ResponseRecorder) {
status := rr.Code
h.Equals(t, http.StatusOK, status)
}

func getMetricsMap(body string) map[string]string {
metricsMap := make(map[string]string)
lines := strings.Split(body, "\n")
for _, line := range lines {
if len(strings.TrimSpace(line)) == 0 {
continue
}
if strings.HasPrefix(strings.TrimSpace(line), "# ") {
continue
}
parts := strings.SplitN(line, " ", 2)
if len(parts) != 2 {
continue
}
key := parts[0]
value := parts[1]
metricsMap[key] = value
}
return metricsMap
}

func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) {
eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth)
actualValue, exists := metricsMap[eventErrorTotalKey]
if !exists {
actualValue = "0"
}
h.Equals(t, strconv.Itoa(expectedTotal), actualValue)
}

func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedTotal int, nodeStatus string) {
actionTotalKey := fmt.Sprintf("actions_total{node_action=\"%v\",node_status=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockAction, nodeStatus, mockNth)
actualValue, exists := metricsMap[actionTotalKey]
if !exists {
actualValue = "0"
}
h.Equals(t, strconv.Itoa(expectedTotal), actualValue)
}

0 comments on commit c6bd492

Please sign in to comment.