Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/statsd] Refactor of hardcoded tests #24832

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestValidate(t *testing.T) {
noObjectNameErr = "must specify object id for all TimerHistogramMappings"
statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s"
observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s"
invalidHistogramErr = "histogram configuration requires observer_type: histogram"
)

tests := []test{
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestValidate(t *testing.T) {
},
},
},
expectedErr: "histogram configuration requires observer_type: histogram",
expectedErr: invalidHistogramErr,
},
{
name: "negativeAggregationInterval",
Expand All @@ -163,7 +164,7 @@ func TestValidate(t *testing.T) {
{StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: "aggregation_interval must be a positive duration",
expectedErr: negativeAggregationIntervalErr,
},
}

Expand Down
63 changes: 22 additions & 41 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,85 +7,66 @@ import (
"fmt"
"io"
"net"
"strings"
)

// StatsD defines the properties of a StatsD connection.
type StatsD struct {
Host string
Port int
Conn io.Writer
transport string
address string
conn io.Writer
}

// Transport is an enum to select the type of transport.
type Transport int

const (
// TCP Transport
TCP Transport = iota
// UDP Transport
UDP
)

// NewStatsD creates a new StatsD instance to support the need for testing
// the statsdreceiver package and is not intended/tested to be used in production.
func NewStatsD(transport Transport, host string, port int) (*StatsD, error) {
func NewStatsD(transport string, address string) (*StatsD, error) {
statsd := &StatsD{
Host: host,
Port: port,
transport: transport,
address: address,
}
err := statsd.connect(transport)

err := statsd.connect()
if err != nil {
return nil, err
}

return statsd, nil
}

// connect populates the StatsD.Conn
func (s *StatsD) connect(transport Transport) error {
if cl, ok := s.Conn.(io.Closer); ok {
cl.Close()
}

address := fmt.Sprintf("%s:%d", s.Host, s.Port)

var err error
switch transport {
case TCP:
// TODO: implement TCP support
return fmt.Errorf("TCP unsupported")
case UDP:
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp", address)
// connect populates the StatsD.conn
func (s *StatsD) connect() error {
switch s.transport {
case "udp":
udpAddr, err := net.ResolveUDPAddr(s.transport, s.address)
if err != nil {
return err
}
s.Conn, err = net.DialUDP("udp", nil, udpAddr)
s.conn, err = net.DialUDP(s.transport, nil, udpAddr)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown transport: %d", transport)
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
}

return err
return nil
}

// Disconnect closes the StatsD.Conn.
// Disconnect closes the StatsD.conn.
func (s *StatsD) Disconnect() error {
var err error
if cl, ok := s.Conn.(io.Closer); ok {
if cl, ok := s.conn.(io.Closer); ok {
err = cl.Close()
}
s.Conn = nil
s.conn = nil
return err
}

// SendMetric sends the input metric to the StatsD connection.
func (s *StatsD) SendMetric(metric Metric) error {
_, err := fmt.Fprint(s.Conn, metric.String())
_, err := io.Copy(s.conn, strings.NewReader(metric.String()))
if err != nil {
return err
return fmt.Errorf("send metric on test client: %w", err)
}
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions receiver/statsdreceiver/internal/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"net"

"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
)

var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil")
Expand All @@ -22,7 +20,6 @@ type Server interface {
// on the specific transport, and prepares the message to be processed by
// the Parser and passed to the next consumer.
ListenAndServe(
p protocol.Parser,
mc consumer.Metrics,
r Reporter,
transferChan chan<- Metric,
Expand Down
75 changes: 42 additions & 33 deletions receiver/statsdreceiver/internal/transport/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package transport

import (
"io"
"net"
"runtime"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -16,52 +16,34 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client"
)

func Test_Server_ListenAndServe(t *testing.T) {
tests := []struct {
name string
buildServerFn func(addr string) (Server, error)
buildClientFn func(host string, port int) (*client.StatsD, error)
name string
buildServerFn func(transport Transport, addr string) (Server, error)
getFreeEndpointFn func(t testing.TB, transport string) string
buildClientFn func(transport string, address string) (*client.StatsD, error)
}{
{
name: "udp",
buildServerFn: NewUDPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.UDP, host, port)
},
name: "udp",
getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress,
buildServerFn: NewUDPServer,
buildClientFn: client.NewStatsD,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalNetworkAddress(t, "udp")
trans := Transport(tt.name)
addr := tt.getFreeEndpointFn(t, tt.name)
testFreeEndpoint(t, tt.name, addr)

// Endpoint should be free.
ln0, err := net.ListenPacket("udp", addr)
require.NoError(t, err)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
ln1, err := net.ListenPacket("udp", addr)
require.Error(t, err)
require.Nil(t, ln1)

// Unbind the local address so the mock UDP service can use it
ln0.Close()

srv, err := tt.buildServerFn(addr)
srv, err := tt.buildServerFn(trans, addr)
require.NoError(t, err)
require.NotNil(t, srv)

host, portStr, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)

mc := new(consumertest.MetricsSink)
p := &protocol.StatsDParser{}
require.NoError(t, err)
mr := NewMockReporter(1)
transferChan := make(chan Metric, 10)
Expand All @@ -70,12 +52,12 @@ func Test_Server_ListenAndServe(t *testing.T) {
wgListenAndServe.Add(1)
go func() {
defer wgListenAndServe.Done()
assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan))
assert.Error(t, srv.ListenAndServe(mc, mr, transferChan))
}()

runtime.Gosched()

gc, err := tt.buildClientFn(host, port)
gc, err := tt.buildClientFn(tt.name, addr)
require.NoError(t, err)
require.NotNil(t, gc)
err = gc.SendMetric(client.Metric{
Expand All @@ -102,3 +84,30 @@ func Test_Server_ListenAndServe(t *testing.T) {
})
}
}

func testFreeEndpoint(t *testing.T, transport string, address string) {
t.Helper()

var ln0, ln1 io.Closer
var err0, err1 error

trans := NewTransport(transport)
require.NotEqual(t, trans, Transport(""))

if trans.IsPacketTransport() {
// Endpoint should be free.
ln0, err0 = net.ListenPacket(transport, address)
ln1, err1 = net.ListenPacket(transport, address)
}

// Endpoint should be free.
require.NoError(t, err0)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
require.Error(t, err1)
require.Nil(t, ln1)

// Unbind the local address so the mock UDP service can use it
require.NoError(t, ln0.Close())
}
41 changes: 41 additions & 0 deletions receiver/statsdreceiver/internal/transport/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

// Transport is a set of constants of the transport supported by this receiver.
type Transport string

const (
UDP Transport = "udp"
UDP4 Transport = "udp4"
UDP6 Transport = "udp6"
)

// NewTransport creates a Transport based on the transport string or returns an empty Transport.
func NewTransport(ts string) Transport {
trans := Transport(ts)
switch trans {
case UDP, UDP4, UDP6:
return trans
}
return Transport("")
}

// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise.
func (trans Transport) String() string {
switch trans {
case UDP, UDP4, UDP6:
return string(trans)
}
return ""
}

// IsPacketTransport returns true if the transport is packet based.
func (trans Transport) IsPacketTransport() bool {
switch trans {
case UDP, UDP4, UDP6:
return true
}
return false
}
Loading