diff --git a/.chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml b/.chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml new file mode 100755 index 000000000000..72d57d2fc47e --- /dev/null +++ b/.chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml @@ -0,0 +1,21 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/statsd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add UDS support + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: + - 21385 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index 73cad547990b..8c4f17e09951 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -86,6 +86,7 @@ func TestValidate(t *testing.T) { noObjectNameErr = "must specify object id for all TimerHistogramMappings" statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + invalidHistogramErr = "histogram configuration requires observer_type: histogram" ) tests := []test{ @@ -153,7 +154,7 @@ func TestValidate(t *testing.T) { }, }, }, - expectedErr: "histogram configuration requires observer_type: histogram", + expectedErr: invalidHistogramErr, }, { name: "negativeAggregationInterval", @@ -163,7 +164,7 @@ func TestValidate(t *testing.T) { {StatsdType: "timing", ObserverType: "gauge"}, }, }, - expectedErr: "aggregation_interval must be a positive duration", + expectedErr: negativeAggregationIntervalErr, }, } diff --git a/receiver/statsdreceiver/internal/testutil/client.go b/receiver/statsdreceiver/internal/testutil/client.go new file mode 100644 index 000000000000..0c3ec6126966 --- /dev/null +++ b/receiver/statsdreceiver/internal/testutil/client.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" + +import ( + "fmt" + "io" + "net" + "strings" +) + +// StatsDTestClient defines the properties of a StatsD connection. +type StatsDTestClient struct { + transport string + address string + conn io.Writer +} + +// NewStatsDTestClient creates a new StatsDTestClient instance to support the need for testing +// the statsdreceiver package and is not intended/tested to be used in production. +func NewStatsDTestClient(transport string, address string) (*StatsDTestClient, error) { + statsd := &StatsDTestClient{ + transport: transport, + address: address, + } + + err := statsd.connect() + if err != nil { + return nil, err + } + + return statsd, nil +} + +// connect populates the StatsDTestClient.conn +func (s *StatsDTestClient) connect() error { + switch s.transport { + case "udp": + udpAddr, err := net.ResolveUDPAddr(s.transport, s.address) + if err != nil { + return err + } + s.conn, err = net.DialUDP(s.transport, nil, udpAddr) + if err != nil { + return err + } + case "unixgram": + unixAddr, err := net.ResolveUnixAddr(s.transport, s.address) + if err != nil { + return err + } + s.conn, err = net.DialUnix(s.transport, nil, unixAddr) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown/unsupported transport: %s", s.transport) + } + + return nil +} + +// Disconnect closes the StatsDTestClient.conn. +func (s *StatsDTestClient) Disconnect() error { + var err error + if cl, ok := s.conn.(io.Closer); ok { + err = cl.Close() + } + s.conn = nil + return err +} + +// SendMetric sends the input metric to the StatsDTestClient connection. +func (s *StatsDTestClient) SendMetric(metric Metric) error { + _, err := io.Copy(s.conn, strings.NewReader(metric.String())) + if err != nil { + return fmt.Errorf("send metric on test client: %w", err) + } + return nil +} diff --git a/receiver/statsdreceiver/internal/testutil/metric.go b/receiver/statsdreceiver/internal/testutil/metric.go new file mode 100644 index 000000000000..3ef3aff16829 --- /dev/null +++ b/receiver/statsdreceiver/internal/testutil/metric.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" + +import ( + "fmt" +) + +// Metric contains the metric fields for a StatsDTestClient message. +type Metric struct { + Name string + Value string + Type string +} + +// String formats a Metric into a StatsDTestClient message. +func (m Metric) String() string { + return fmt.Sprintf("%s:%s|%s", m.Name, m.Value, m.Type) +} diff --git a/receiver/statsdreceiver/internal/testutil/temporary_socket.go b/receiver/statsdreceiver/internal/testutil/temporary_socket.go new file mode 100644 index 000000000000..5844a5b89919 --- /dev/null +++ b/receiver/statsdreceiver/internal/testutil/temporary_socket.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" + +import ( + "crypto/rand" + "fmt" + "testing" +) + +func CreateTemporarySocket(t testing.TB, _ string) string { + b := make([]byte, 10) + rand.Read(b) + return fmt.Sprintf("%s/%s", t.TempDir(), fmt.Sprintf("%x", b)) +} diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go deleted file mode 100644 index ba40dbcf1deb..000000000000 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package client // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" - -import ( - "fmt" - "io" - "net" -) - -// StatsD defines the properties of a StatsD connection. -type StatsD struct { - Host string - Port int - Conn io.Writer -} - -// Transport is an enum to select the type of transport. -type Transport int - -const ( - // TCP Transport - TCP Transport = iota - // UDP Transport - UDP -) - -// NewStatsD creates a new StatsD instance to support the need for testing -// the statsdreceiver package and is not intended/tested to be used in production. -func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { - statsd := &StatsD{ - Host: host, - Port: port, - } - err := statsd.connect(transport) - if err != nil { - return nil, err - } - - return statsd, nil -} - -// connect populates the StatsD.Conn -func (s *StatsD) connect(transport Transport) error { - if cl, ok := s.Conn.(io.Closer); ok { - cl.Close() - } - - address := fmt.Sprintf("%s:%d", s.Host, s.Port) - - var err error - switch transport { - case TCP: - // TODO: implement TCP support - return fmt.Errorf("TCP unsupported") - case UDP: - var udpAddr *net.UDPAddr - udpAddr, err = net.ResolveUDPAddr("udp", address) - if err != nil { - return err - } - s.Conn, err = net.DialUDP("udp", nil, udpAddr) - if err != nil { - return err - } - default: - return fmt.Errorf("unknown transport: %d", transport) - } - - return err -} - -// Disconnect closes the StatsD.Conn. -func (s *StatsD) Disconnect() error { - var err error - if cl, ok := s.Conn.(io.Closer); ok { - err = cl.Close() - } - s.Conn = nil - return err -} - -// SendMetric sends the input metric to the StatsD connection. -func (s *StatsD) SendMetric(metric Metric) error { - _, err := fmt.Fprint(s.Conn, metric.String()) - if err != nil { - return err - } - return nil -} - -// Metric contains the metric fields for a StatsD message. -type Metric struct { - Name string - Value string - Type string -} - -// String formats a Metric into a StatsD message. -func (m Metric) String() string { - return fmt.Sprintf("%s:%s|%s", m.Name, m.Value, m.Type) -} diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go new file mode 100644 index 000000000000..78f709c3d6ed --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import ( + "bytes" + "errors" + "fmt" + "io" + "net" + "os" + "strings" + + "go.opentelemetry.io/collector/consumer" + "go.uber.org/multierr" +) + +type packetServer struct { + packetConn net.PacketConn + transport Transport + address string +} + +var ( + // Ensure that Server is implemented on UDP Server. + _ (Server) = (*packetServer)(nil) + + ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") +) + +// NewPacketServer creates a transport.Server using transports based on packets. +func NewPacketServer(transport Transport, address string) (Server, error) { + if !transport.IsPacketTransport() { + return nil, ErrUnsupportedPacketTransport + } + + if transport.IsUnixTransport() { + err := os.Remove(address) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("removing socket failed: %w", err) + } + } + + conn, err := net.ListenPacket(transport.String(), address) + if err != nil { + return nil, fmt.Errorf("starting to listen %s: %w", transport.String(), err) + } + + return &packetServer{ + address: address, + packetConn: conn, + transport: transport, + }, nil +} + +// ListenAndServe starts the server ready to receive metrics. +func (psrv *packetServer) ListenAndServe( + nextConsumer consumer.Metrics, + reporter Reporter, + transferChan chan<- Metric, +) error { + if nextConsumer == nil || reporter == nil { + return errNilListenAndServeParameters + } + + buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) + for { + n, addr, err := psrv.packetConn.ReadFrom(buf) + + if psrv.transport.IsUnixTransport() && addr == nil { + addr = &net.UnixAddr{ + Net: "unixgram", + Name: "UDS", + } + } + + if n > 0 { + bufCopy := make([]byte, n) + copy(bufCopy, buf) + psrv.handlePacket(bufCopy, addr, transferChan) + } + if err != nil { + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + psrv.transport, + psrv.packetConn.LocalAddr(), + err) + var netErr net.Error + if errors.As(err, &netErr) { + if netErr.Timeout() { + continue + } + } + return err + } + } +} + +// Close closes the server. +func (psrv *packetServer) Close() error { + var errs error + + if psrv.transport.IsUnixTransport() { + err := os.Remove(psrv.address) + if err != nil && !errors.Is(err, os.ErrNotExist) { + errs = multierr.Append(errs, fmt.Errorf("removing socket failed: %w", err)) + } + } + + err := psrv.packetConn.Close() + if err != nil { + errs = multierr.Append(errs, err) + } + + return errs +} + +// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. +func (psrv *packetServer) handlePacket( + data []byte, + addr net.Addr, + transferChan chan<- Metric, +) { + buf := bytes.NewBuffer(data) + for { + bytes, err := buf.ReadBytes((byte)('\n')) + if errors.Is(err, io.EOF) { + if len(bytes) == 0 { + // Completed without errors. + break + } + } + line := strings.TrimSpace(string(bytes)) + if line != "" { + transferChan <- Metric{line, addr} + } + } +} diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 7b3cdab6006f..c551e4312cd9 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -9,8 +9,6 @@ import ( "net" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil") @@ -22,7 +20,6 @@ type Server interface { // on the specific transport, and prepares the message to be processed by // the Parser and passed to the next consumer. ListenAndServe( - p protocol.Parser, mc consumer.Metrics, r Reporter, transferChan chan<- Metric, diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 5a7b545a9a75..1391e160bb4b 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -4,9 +4,9 @@ package transport import ( + "io" "net" "runtime" - "strconv" "sync" "testing" "time" @@ -16,52 +16,47 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" + statsdtestutil "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" ) func Test_Server_ListenAndServe(t *testing.T) { tests := []struct { - name string - buildServerFn func(addr string) (Server, error) - buildClientFn func(host string, port int) (*client.StatsD, error) + name string + buildServerFn func(transport Transport, addr string) (Server, error) + getFreeEndpointFn func(t testing.TB, transport string) string + buildClientFn func(transport string, address string) (*statsdtestutil.StatsDTestClient, error) + testSkip bool }{ { - name: "udp", - buildServerFn: NewUDPServer, - buildClientFn: func(host string, port int) (*client.StatsD, error) { - return client.NewStatsD(client.UDP, host, port) - }, + name: "udp", + getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, + buildServerFn: NewPacketServer, + buildClientFn: statsdtestutil.NewStatsDTestClient, + }, + { + name: "unixgram", + getFreeEndpointFn: statsdtestutil.CreateTemporarySocket, + buildServerFn: NewPacketServer, + buildClientFn: statsdtestutil.NewStatsDTestClient, + // Tests on Mac/Windows give a "bind: invalid argument" error as unix sockets are not supported. + testSkip: runtime.GOOS != "linux", }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - addr := testutil.GetAvailableLocalNetworkAddress(t, "udp") - - // Endpoint should be free. - ln0, err := net.ListenPacket("udp", addr) - require.NoError(t, err) - require.NotNil(t, ln0) + if tt.testSkip { + continue + } - // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. - ln1, err := net.ListenPacket("udp", addr) - require.Error(t, err) - require.Nil(t, ln1) - - // Unbind the local address so the mock UDP service can use it - ln0.Close() + t.Run(tt.name, func(t *testing.T) { + trans := Transport(tt.name) + addr := tt.getFreeEndpointFn(t, tt.name) + testFreeEndpoint(t, tt.name, addr) - srv, err := tt.buildServerFn(addr) + srv, err := tt.buildServerFn(trans, addr) require.NoError(t, err) require.NotNil(t, srv) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - mc := new(consumertest.MetricsSink) - p := &protocol.StatsDParser{} require.NoError(t, err) mr := NewMockReporter(1) transferChan := make(chan Metric, 10) @@ -70,15 +65,15 @@ func Test_Server_ListenAndServe(t *testing.T) { wgListenAndServe.Add(1) go func() { defer wgListenAndServe.Done() - assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan)) + assert.Error(t, srv.ListenAndServe(mc, mr, transferChan)) }() runtime.Gosched() - gc, err := tt.buildClientFn(host, port) + gc, err := tt.buildClientFn(tt.name, addr) require.NoError(t, err) require.NotNil(t, gc) - err = gc.SendMetric(client.Metric{ + err = gc.SendMetric(statsdtestutil.Metric{ Name: "test.metric", Value: "42", Type: "c", @@ -102,3 +97,35 @@ func Test_Server_ListenAndServe(t *testing.T) { }) } } + +// testFreeEndpoint is a helper to check if the port we are going to use for testing is deterministic and free for us. +func testFreeEndpoint(t *testing.T, transport string, address string) { + t.Helper() + + var ln0, ln1 io.Closer + var err0, err1 error + + trans := NewTransport(transport) + require.NotEqual(t, trans, Transport("")) + + if trans.IsUnixTransport() { + // unix sockets rely on tempfiles so they are always free + return + } + if trans.IsPacketTransport() { + // Endpoint should be free. + ln0, err0 = net.ListenPacket(transport, address) + ln1, err1 = net.ListenPacket(transport, address) + } + + // Endpoint should be free. + require.NoError(t, err0) + require.NotNil(t, ln0) + + // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. + require.Error(t, err1) + require.Nil(t, ln1) + + // Unbind the local address so the mock UDP service can use it + require.NoError(t, ln0.Close()) +} diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go new file mode 100644 index 000000000000..9addfaa92d5e --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +// Transport is a set of constants of the transport supported by this receiver. +type Transport string + +const ( + UDP Transport = "udp" + UDP4 Transport = "udp4" + UDP6 Transport = "udp6" + UnixGram Transport = "unixgram" +) + +// NewTransport creates a Transport based on the transport string or returns an empty Transport. +func NewTransport(ts string) Transport { + trans := Transport(ts) + switch trans { + case UDP, UDP4, UDP6, UnixGram: + return trans + } + return Transport("") +} + +// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. +func (trans Transport) String() string { + switch trans { + case UDP, UDP4, UDP6, UnixGram: + return string(trans) + } + return "" +} + +// IsPacketTransport returns true if the transport is packet based. +func (trans Transport) IsPacketTransport() bool { + switch trans { + case UDP, UDP4, UDP6: + return true + } + return false +} + +// Returns true if the transport is unix socket based. +func (trans Transport) IsUnixTransport() bool { + switch trans { + case UnixGram: + return true + } + return false +} diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go deleted file mode 100644 index 7a483ce61649..000000000000 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" - -import ( - "bytes" - "errors" - "io" - "net" - "strings" - - "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" -) - -type udpServer struct { - packetConn net.PacketConn - reporter Reporter -} - -var _ (Server) = (*udpServer)(nil) - -// NewUDPServer creates a transport.Server using UDP as its transport. -func NewUDPServer(addr string) (Server, error) { - packetConn, err := net.ListenPacket("udp", addr) - if err != nil { - return nil, err - } - - u := udpServer{ - packetConn: packetConn, - } - return &u, nil -} - -func (u *udpServer) ListenAndServe( - parser protocol.Parser, - nextConsumer consumer.Metrics, - reporter Reporter, - transferChan chan<- Metric, -) error { - if parser == nil || nextConsumer == nil || reporter == nil { - return errNilListenAndServeParameters - } - - u.reporter = reporter - - buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) - for { - n, addr, err := u.packetConn.ReadFrom(buf) - if n > 0 { - bufCopy := make([]byte, n) - copy(bufCopy, buf) - u.handlePacket(bufCopy, addr, transferChan) - } - if err != nil { - u.reporter.OnDebugf("UDP Transport (%s) - ReadFrom error: %v", - u.packetConn.LocalAddr(), - err) - var netErr net.Error - if errors.As(err, &netErr) { - if netErr.Timeout() { - continue - } - } - return err - } - } -} - -func (u *udpServer) Close() error { - return u.packetConn.Close() -} - -func (u *udpServer) handlePacket( - data []byte, - addr net.Addr, - transferChan chan<- Metric, -) { - buf := bytes.NewBuffer(data) - for { - bytes, err := buf.ReadBytes((byte)('\n')) - if errors.Is(err, io.EOF) { - if len(bytes) == 0 { - // Completed without errors. - break - } - } - line := strings.TrimSpace(string(bytes)) - if line != "" { - transferChan <- Metric{line, addr} - } - } -} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index feac277f7716..960b9628d254 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -68,12 +68,11 @@ func New( } func buildTransportServer(config Config) (transport.Server, error) { - // TODO: Add TCP/unix socket transport implementations - switch strings.ToLower(config.NetAddr.Transport) { - case "", "udp": - return transport.NewUDPServer(config.NetAddr.Endpoint) + trans := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) + // TODO: Add TCP transport implementation + if trans.IsPacketTransport() { + return transport.NewPacketServer(trans, config.NetAddr.Endpoint) } - return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) } @@ -96,7 +95,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { return err } go func() { - if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil { + if err := r.server.ListenAndServe(r.nextConsumer, r.reporter, transferChan); err != nil { if !errors.Is(err, net.ErrClosed) { host.ReportFatalError(err) } diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 2332740adb72..3ae7a19dda5e 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -6,8 +6,7 @@ package statsdreceiver import ( "context" "errors" - "net" - "strconv" + "runtime" "testing" "time" @@ -22,8 +21,8 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + statsdtestutil "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) func Test_statsdreceiver_New(t *testing.T) { @@ -113,19 +112,16 @@ func TestStatsdReceiver_Flush(t *testing.T) { } func Test_statsdreceiver_EndToEnd(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - tests := []struct { name string + addr string configFn func() *Config - clientFn func(t *testing.T) *client.StatsD + clientFn func(t *testing.T, addr string) *statsdtestutil.StatsDTestClient + testSkip bool }{ { name: "default_config with 4s interval", + addr: testutil.GetAvailableLocalNetworkAddress(t, "udp"), configFn: func() *Config { return &Config{ NetAddr: confignet.NetAddr{ @@ -135,17 +131,40 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { AggregationInterval: 4 * time.Second, } }, - clientFn: func(t *testing.T) *client.StatsD { - c, err := client.NewStatsD(client.UDP, host, port) + clientFn: func(t *testing.T, addr string) *statsdtestutil.StatsDTestClient { + c, err := statsdtestutil.NewStatsDTestClient("udp", addr) + require.NoError(t, err) + return c + }, + }, + { + name: "default_config with UDS listener (unixgram)", + addr: statsdtestutil.CreateTemporarySocket(t, ""), + configFn: func() *Config { + return &Config{ + NetAddr: confignet.NetAddr{ + Transport: "unixgram", + }, + AggregationInterval: 4 * time.Second, + } + }, + clientFn: func(t *testing.T, addr string) *statsdtestutil.StatsDTestClient { + c, err := statsdtestutil.NewStatsDTestClient("unixgram", addr) require.NoError(t, err) return c }, + // Tests on Mac/Windows give a "bind: invalid argument" error as unix sockets are not supported. + testSkip: runtime.GOOS != "linux", }, } for _, tt := range tests { + if tt.testSkip { + continue + } + t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() - cfg.NetAddr.Endpoint = addr + cfg.NetAddr.Endpoint = tt.addr sink := new(consumertest.MetricsSink) rcv, err := New(receivertest.NewNopCreateSettings(), *cfg, sink) require.NoError(t, err) @@ -159,9 +178,9 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { assert.NoError(t, r.Shutdown(context.Background())) }() - statsdClient := tt.clientFn(t) + statsdClient := tt.clientFn(t, tt.addr) - statsdMetric := client.Metric{ + statsdMetric := statsdtestutil.Metric{ Name: "test.metric", Value: "42", Type: "c",