Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][receiver/statsd] Refactor of hardcoded tests #28896

5 changes: 3 additions & 2 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestValidate(t *testing.T) {
noObjectNameErr = "must specify object id for all TimerHistogramMappings"
statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s"
observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s"
invalidHistogramErr = "histogram configuration requires observer_type: histogram"
)

tests := []test{
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestValidate(t *testing.T) {
},
},
},
expectedErr: "histogram configuration requires observer_type: histogram",
expectedErr: invalidHistogramErr,
},
{
name: "negativeAggregationInterval",
Expand All @@ -170,7 +171,7 @@ func TestValidate(t *testing.T) {
{StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: "aggregation_interval must be a positive duration",
expectedErr: negativeAggregationIntervalErr,
},
}

Expand Down
68 changes: 25 additions & 43 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,72 @@ import (
"fmt"
"io"
"net"
"strings"
)

// StatsD defines the properties of a StatsD connection.
type StatsD struct {
Host string
Port int
Conn io.Writer
transport string
address string
conn io.Writer
}

// Transport is an enum to select the type of transport.
type Transport int

const (
// TCP Transport
TCP Transport = iota
// UDP Transport
UDP
)

// NewStatsD creates a new StatsD instance to support the need for testing
// the statsdreceiver package and is not intended/tested to be used in production.
func NewStatsD(transport Transport, host string, port int) (*StatsD, error) {
func NewStatsD(transport string, address string) (*StatsD, error) {
statsd := &StatsD{
Host: host,
Port: port,
transport: transport,
address: address,
}
err := statsd.connect(transport)

err := statsd.connect()
if err != nil {
return nil, err
}

return statsd, nil
}

// connect populates the StatsD.Conn
func (s *StatsD) connect(transport Transport) error {
if cl, ok := s.Conn.(io.Closer); ok {
err := cl.Close()
// connect populates the StatsD.conn
func (s *StatsD) connect() error {
switch s.transport {
case "udp":
udpAddr, err := net.ResolveUDPAddr(s.transport, s.address)
if err != nil {
return err
}
}

address := fmt.Sprintf("%s:%d", s.Host, s.Port)

var err error
switch transport {
case TCP:
s.Conn, err = net.Dial("tcp", address)
if err != nil {
return err
}
case UDP:
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp", address)
s.conn, err = net.DialUDP(s.transport, nil, udpAddr)
if err != nil {
return err
}
s.Conn, err = net.DialUDP("udp", nil, udpAddr)
case "tcp":
var err error
s.conn, err = net.Dial(s.transport, s.address)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown transport: %d", transport)
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
}

return err
return nil
}

// Disconnect closes the StatsD.Conn.
// Disconnect closes the StatsD.conn.
func (s *StatsD) Disconnect() error {
var err error
if cl, ok := s.Conn.(io.Closer); ok {
if cl, ok := s.conn.(io.Closer); ok {
err = cl.Close()
}
s.Conn = nil
s.conn = nil
return err
}

// SendMetric sends the input metric to the StatsD connection.
func (s *StatsD) SendMetric(metric Metric) error {
_, err := fmt.Fprint(s.Conn, metric.String())
_, err := io.Copy(s.conn, strings.NewReader(metric.String()))
if err != nil {
return err
return fmt.Errorf("send metric on test client: %w", err)
}
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions receiver/statsdreceiver/internal/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"net"

"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
)

var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil")
Expand All @@ -22,7 +20,6 @@ type Server interface {
// on the specific transport, and prepares the message to be processed by
// the Parser and passed to the next consumer.
ListenAndServe(
p protocol.Parser,
mc consumer.Metrics,
r Reporter,
transferChan chan<- Metric,
Expand Down
99 changes: 55 additions & 44 deletions receiver/statsdreceiver/internal/transport/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package transport

import (
"io"
"net"
"runtime"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -16,65 +16,43 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client"
)

func Test_Server_ListenAndServe(t *testing.T) {
tests := []struct {
name string
transport string
buildServerFn func(addr string) (Server, error)
buildClientFn func(host string, port int) (*client.StatsD, error)
name string
transport Transport
buildServerFn func(transport Transport, addr string) (Server, error)
getFreeEndpointFn func(t testing.TB, transport string) string
buildClientFn func(transport string, address string) (*client.StatsD, error)
}{
{
name: "udp",
transport: "udp",
buildServerFn: NewUDPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.UDP, host, port)
},
name: "udp",
transport: UDP,
getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress,
buildServerFn: NewUDPServer,
buildClientFn: client.NewStatsD,
},
{
name: "tcp",
transport: "tcp",
buildServerFn: NewTCPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.TCP, host, port)
},
name: "tcp",
transport: TCP,
getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress,
buildServerFn: NewTCPServer,
buildClientFn: client.NewStatsD,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalNetworkAddress(t, tt.transport)

if tt.transport == "udp" {
// Endpoint should be free.
ln0, err := net.ListenPacket("udp", addr)
require.NoError(t, err)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
ln1, err := net.ListenPacket("udp", addr)
require.Error(t, err)
require.Nil(t, ln1)
addr := tt.getFreeEndpointFn(t, tt.name)
testFreeEndpoint(t, tt.name, addr)

// Unbind the local address so the mock UDP service can use it
err = ln0.Close()
require.NoError(t, err)
}

srv, err := tt.buildServerFn(addr)
srv, err := tt.buildServerFn(tt.transport, addr)
require.NoError(t, err)
require.NotNil(t, srv)

host, portStr, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)

mc := new(consumertest.MetricsSink)
p := &protocol.StatsDParser{}
require.NoError(t, err)
mr := NewMockReporter(1)
transferChan := make(chan Metric, 10)
Expand All @@ -83,12 +61,12 @@ func Test_Server_ListenAndServe(t *testing.T) {
wgListenAndServe.Add(1)
go func() {
defer wgListenAndServe.Done()
assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan))
assert.Error(t, srv.ListenAndServe(mc, mr, transferChan))
}()

runtime.Gosched()

gc, err := tt.buildClientFn(host, port)
gc, err := tt.buildClientFn(tt.transport.String(), addr)
require.NoError(t, err)
require.NotNil(t, gc)
err = gc.SendMetric(client.Metric{
Expand All @@ -115,3 +93,36 @@ func Test_Server_ListenAndServe(t *testing.T) {
})
}
}

func testFreeEndpoint(t *testing.T, transport string, address string) {
t.Helper()

var ln0, ln1 io.Closer
var err0, err1 error

trans := NewTransport(transport)
require.NotEqual(t, trans, Transport(""))

if trans.IsPacketTransport() {
// Endpoint should be free.
ln0, err0 = net.ListenPacket(transport, address)
ln1, err1 = net.ListenPacket(transport, address)
}

if trans.IsStreamTransport() {
// Endpoint should be free.
ln0, err0 = net.Listen(transport, address)
ln1, err1 = net.Listen(transport, address)
}

// Endpoint should be free.
require.NoError(t, err0)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
require.Error(t, err1)
require.Nil(t, ln1)

// Unbind the local address so the mock UDP service can use it
require.NoError(t, ln0.Close())
}
41 changes: 25 additions & 16 deletions receiver/statsdreceiver/internal/transport/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,50 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"

"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
)

var errTCPServerDone = errors.New("server stopped")

type tcpServer struct {
listener net.Listener
reporter Reporter
wg sync.WaitGroup
stopChan chan struct{}
listener net.Listener
reporter Reporter
wg sync.WaitGroup
transport Transport
stopChan chan struct{}
}

// Ensure that Server is implemented on TCP Server.
var _ Server = (*tcpServer)(nil)

// NewTCPServer creates a transport.Server using TCP as its transport.
func NewTCPServer(addr string) (Server, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
func NewTCPServer(transport Transport, address string) (Server, error) {
var tsrv tcpServer
var err error

if !transport.IsStreamTransport() {
return nil, fmt.Errorf("NewTCPServer with %s: %w", transport.String(), ErrUnsupportedStreamTransport)
}

t := tcpServer{
listener: l,
stopChan: make(chan struct{}),
tsrv.transport = transport
tsrv.listener, err = net.Listen(transport.String(), address)
if err != nil {
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
}
return &t, nil

tsrv.stopChan = make(chan struct{})
return &tsrv, nil
}

func (t *tcpServer) ListenAndServe(parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error {
if parser == nil || nextConsumer == nil || reporter == nil {
// ListenAndServe starts the server ready to receive metrics.
func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

Expand Down Expand Up @@ -71,6 +78,7 @@ LOOP:
return errTCPServerDone
}

// handleConn is helper that parses the buffer and split it line by line to be parsed upstream.
func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) {
payload := make([]byte, 4096)
var remainder []byte
Expand Down Expand Up @@ -98,6 +106,7 @@ func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) {
}
}

// Close closes the server.
func (t *tcpServer) Close() error {
close(t.stopChan)
t.wg.Wait()
Expand Down
Loading