Skip to content

Commit

Permalink
[chore][receiver/statsd] Refactor of hardcoded tests (#28896)
Browse files Browse the repository at this point in the history
**Description:** 
This is the first step to adding UDS support to the StatsD receiver.

As I started to develop it, I saw that all tests were hardcoded to UDP
and using networking and there was no possibility to add a socket
communication.

PR started to get huge so I split it into two, one to refactor tests and
another one that adds UDS support:
 * Removed all unused references.
* Made a `Transport` helper that allows centralizing all supported
protocols and constants in its package.
* Removed all hardcoded UDP protocols and generalized testing so new
protocols are easy to add.

If you need a rationale about why the changes are like this, this is the
next PR I am going to submit after this one is merged:
kilokang#2

That is the PR that is going to add UDS support properly.

**Link to tracking Issue:**
 - #21385

**Previous closed PR:**
 - #24832
  • Loading branch information
kilokang authored Dec 15, 2023
1 parent 3e81d24 commit 48aa0dd
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 143 deletions.
5 changes: 3 additions & 2 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestValidate(t *testing.T) {
noObjectNameErr = "must specify object id for all TimerHistogramMappings"
statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s"
observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s"
invalidHistogramErr = "histogram configuration requires observer_type: histogram"
)

tests := []test{
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestValidate(t *testing.T) {
},
},
},
expectedErr: "histogram configuration requires observer_type: histogram",
expectedErr: invalidHistogramErr,
},
{
name: "negativeAggregationInterval",
Expand All @@ -170,7 +171,7 @@ func TestValidate(t *testing.T) {
{StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: "aggregation_interval must be a positive duration",
expectedErr: negativeAggregationIntervalErr,
},
}

Expand Down
68 changes: 25 additions & 43 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,72 @@ import (
"fmt"
"io"
"net"
"strings"
)

// StatsD defines the properties of a StatsD connection.
type StatsD struct {
Host string
Port int
Conn io.Writer
transport string
address string
conn io.Writer
}

// Transport is an enum to select the type of transport.
type Transport int

const (
// TCP Transport
TCP Transport = iota
// UDP Transport
UDP
)

// NewStatsD creates a new StatsD instance to support the need for testing
// the statsdreceiver package and is not intended/tested to be used in production.
func NewStatsD(transport Transport, host string, port int) (*StatsD, error) {
func NewStatsD(transport string, address string) (*StatsD, error) {
statsd := &StatsD{
Host: host,
Port: port,
transport: transport,
address: address,
}
err := statsd.connect(transport)

err := statsd.connect()
if err != nil {
return nil, err
}

return statsd, nil
}

// connect populates the StatsD.Conn
func (s *StatsD) connect(transport Transport) error {
if cl, ok := s.Conn.(io.Closer); ok {
err := cl.Close()
// connect populates the StatsD.conn
func (s *StatsD) connect() error {
switch s.transport {
case "udp":
udpAddr, err := net.ResolveUDPAddr(s.transport, s.address)
if err != nil {
return err
}
}

address := fmt.Sprintf("%s:%d", s.Host, s.Port)

var err error
switch transport {
case TCP:
s.Conn, err = net.Dial("tcp", address)
if err != nil {
return err
}
case UDP:
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp", address)
s.conn, err = net.DialUDP(s.transport, nil, udpAddr)
if err != nil {
return err
}
s.Conn, err = net.DialUDP("udp", nil, udpAddr)
case "tcp":
var err error
s.conn, err = net.Dial(s.transport, s.address)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown transport: %d", transport)
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
}

return err
return nil
}

// Disconnect closes the StatsD.Conn.
// Disconnect closes the StatsD.conn.
func (s *StatsD) Disconnect() error {
var err error
if cl, ok := s.Conn.(io.Closer); ok {
if cl, ok := s.conn.(io.Closer); ok {
err = cl.Close()
}
s.Conn = nil
s.conn = nil
return err
}

// SendMetric sends the input metric to the StatsD connection.
func (s *StatsD) SendMetric(metric Metric) error {
_, err := fmt.Fprint(s.Conn, metric.String())
_, err := io.Copy(s.conn, strings.NewReader(metric.String()))
if err != nil {
return err
return fmt.Errorf("send metric on test client: %w", err)
}
return nil
}
Expand Down
3 changes: 0 additions & 3 deletions receiver/statsdreceiver/internal/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"net"

"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
)

var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil")
Expand All @@ -21,7 +19,6 @@ type Server interface {
// on the specific transport, and prepares the message to be processed by
// the Parser and passed to the next consumer.
ListenAndServe(
p protocol.Parser,
mc consumer.Metrics,
r Reporter,
transferChan chan<- Metric,
Expand Down
99 changes: 55 additions & 44 deletions receiver/statsdreceiver/internal/transport/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package transport

import (
"io"
"net"
"runtime"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -16,65 +16,43 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client"
)

func Test_Server_ListenAndServe(t *testing.T) {
tests := []struct {
name string
transport string
buildServerFn func(addr string) (Server, error)
buildClientFn func(host string, port int) (*client.StatsD, error)
name string
transport Transport
buildServerFn func(transport Transport, addr string) (Server, error)
getFreeEndpointFn func(t testing.TB, transport string) string
buildClientFn func(transport string, address string) (*client.StatsD, error)
}{
{
name: "udp",
transport: "udp",
buildServerFn: NewUDPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.UDP, host, port)
},
name: "udp",
transport: UDP,
getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress,
buildServerFn: NewUDPServer,
buildClientFn: client.NewStatsD,
},
{
name: "tcp",
transport: "tcp",
buildServerFn: NewTCPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.TCP, host, port)
},
name: "tcp",
transport: TCP,
getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress,
buildServerFn: NewTCPServer,
buildClientFn: client.NewStatsD,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalNetworkAddress(t, tt.transport)

if tt.transport == "udp" {
// Endpoint should be free.
ln0, err := net.ListenPacket("udp", addr)
require.NoError(t, err)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
ln1, err := net.ListenPacket("udp", addr)
require.Error(t, err)
require.Nil(t, ln1)
addr := tt.getFreeEndpointFn(t, tt.name)
testFreeEndpoint(t, tt.name, addr)

// Unbind the local address so the mock UDP service can use it
err = ln0.Close()
require.NoError(t, err)
}

srv, err := tt.buildServerFn(addr)
srv, err := tt.buildServerFn(tt.transport, addr)
require.NoError(t, err)
require.NotNil(t, srv)

host, portStr, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(portStr)
require.NoError(t, err)

mc := new(consumertest.MetricsSink)
p := &protocol.StatsDParser{}
require.NoError(t, err)
mr := NewMockReporter(1)
transferChan := make(chan Metric, 10)
Expand All @@ -83,12 +61,12 @@ func Test_Server_ListenAndServe(t *testing.T) {
wgListenAndServe.Add(1)
go func() {
defer wgListenAndServe.Done()
assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan))
assert.Error(t, srv.ListenAndServe(mc, mr, transferChan))
}()

runtime.Gosched()

gc, err := tt.buildClientFn(host, port)
gc, err := tt.buildClientFn(tt.transport.String(), addr)
require.NoError(t, err)
require.NotNil(t, gc)
err = gc.SendMetric(client.Metric{
Expand All @@ -115,3 +93,36 @@ func Test_Server_ListenAndServe(t *testing.T) {
})
}
}

func testFreeEndpoint(t *testing.T, transport string, address string) {
t.Helper()

var ln0, ln1 io.Closer
var err0, err1 error

trans := NewTransport(transport)
require.NotEqual(t, trans, Transport(""))

if trans.IsPacketTransport() {
// Endpoint should be free.
ln0, err0 = net.ListenPacket(transport, address)
ln1, err1 = net.ListenPacket(transport, address)
}

if trans.IsStreamTransport() {
// Endpoint should be free.
ln0, err0 = net.Listen(transport, address)
ln1, err1 = net.Listen(transport, address)
}

// Endpoint should be free.
require.NoError(t, err0)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
require.Error(t, err1)
require.Nil(t, ln1)

// Unbind the local address so the mock UDP service can use it
require.NoError(t, ln0.Close())
}
41 changes: 25 additions & 16 deletions receiver/statsdreceiver/internal/transport/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,50 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"

"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
)

var errTCPServerDone = errors.New("server stopped")

type tcpServer struct {
listener net.Listener
reporter Reporter
wg sync.WaitGroup
stopChan chan struct{}
listener net.Listener
reporter Reporter
wg sync.WaitGroup
transport Transport
stopChan chan struct{}
}

// Ensure that Server is implemented on TCP Server.
var _ Server = (*tcpServer)(nil)

// NewTCPServer creates a transport.Server using TCP as its transport.
func NewTCPServer(addr string) (Server, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
func NewTCPServer(transport Transport, address string) (Server, error) {
var tsrv tcpServer
var err error

if !transport.IsStreamTransport() {
return nil, fmt.Errorf("NewTCPServer with %s: %w", transport.String(), ErrUnsupportedStreamTransport)
}

t := tcpServer{
listener: l,
stopChan: make(chan struct{}),
tsrv.transport = transport
tsrv.listener, err = net.Listen(transport.String(), address)
if err != nil {
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
}
return &t, nil

tsrv.stopChan = make(chan struct{})
return &tsrv, nil
}

func (t *tcpServer) ListenAndServe(parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error {
if parser == nil || nextConsumer == nil || reporter == nil {
// ListenAndServe starts the server ready to receive metrics.
func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

Expand Down Expand Up @@ -71,6 +78,7 @@ LOOP:
return errTCPServerDone
}

// handleConn is helper that parses the buffer and split it line by line to be parsed upstream.
func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) {
payload := make([]byte, 4096)
var remainder []byte
Expand Down Expand Up @@ -98,6 +106,7 @@ func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) {
}
}

// Close closes the server.
func (t *tcpServer) Close() error {
close(t.stopChan)
t.wg.Wait()
Expand Down
Loading

0 comments on commit 48aa0dd

Please sign in to comment.