diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index ecfb38ecc0e6..b9d726e0baca 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -93,6 +93,7 @@ func TestValidate(t *testing.T) { noObjectNameErr = "must specify object id for all TimerHistogramMappings" statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + invalidHistogramErr = "histogram configuration requires observer_type: histogram" ) tests := []test{ @@ -160,7 +161,7 @@ func TestValidate(t *testing.T) { }, }, }, - expectedErr: "histogram configuration requires observer_type: histogram", + expectedErr: invalidHistogramErr, }, { name: "negativeAggregationInterval", @@ -170,7 +171,7 @@ func TestValidate(t *testing.T) { {StatsdType: "timing", ObserverType: "gauge"}, }, }, - expectedErr: "aggregation_interval must be a positive duration", + expectedErr: negativeAggregationIntervalErr, }, } diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 710aecc5e491..8b9fd7f06ba3 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -7,33 +7,25 @@ import ( "fmt" "io" "net" + "strings" ) // StatsD defines the properties of a StatsD connection. type StatsD struct { - Host string - Port int - Conn io.Writer + transport string + address string + conn io.Writer } -// Transport is an enum to select the type of transport. -type Transport int - -const ( - // TCP Transport - TCP Transport = iota - // UDP Transport - UDP -) - // NewStatsD creates a new StatsD instance to support the need for testing // the statsdreceiver package and is not intended/tested to be used in production. -func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { +func NewStatsD(transport string, address string) (*StatsD, error) { statsd := &StatsD{ - Host: host, - Port: port, + transport: transport, + address: address, } - err := statsd.connect(transport) + + err := statsd.connect() if err != nil { return nil, err } @@ -41,56 +33,46 @@ func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { return statsd, nil } -// connect populates the StatsD.Conn -func (s *StatsD) connect(transport Transport) error { - if cl, ok := s.Conn.(io.Closer); ok { - err := cl.Close() +// connect populates the StatsD.conn +func (s *StatsD) connect() error { + switch s.transport { + case "udp": + udpAddr, err := net.ResolveUDPAddr(s.transport, s.address) if err != nil { return err } - } - - address := fmt.Sprintf("%s:%d", s.Host, s.Port) - - var err error - switch transport { - case TCP: - s.Conn, err = net.Dial("tcp", address) - if err != nil { - return err - } - case UDP: - var udpAddr *net.UDPAddr - udpAddr, err = net.ResolveUDPAddr("udp", address) + s.conn, err = net.DialUDP(s.transport, nil, udpAddr) if err != nil { return err } - s.Conn, err = net.DialUDP("udp", nil, udpAddr) + case "tcp": + var err error + s.conn, err = net.Dial(s.transport, s.address) if err != nil { return err } default: - return fmt.Errorf("unknown transport: %d", transport) + return fmt.Errorf("unknown/unsupported transport: %s", s.transport) } - return err + return nil } -// Disconnect closes the StatsD.Conn. +// Disconnect closes the StatsD.conn. func (s *StatsD) Disconnect() error { var err error - if cl, ok := s.Conn.(io.Closer); ok { + if cl, ok := s.conn.(io.Closer); ok { err = cl.Close() } - s.Conn = nil + s.conn = nil return err } // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { - _, err := fmt.Fprint(s.Conn, metric.String()) + _, err := io.Copy(s.conn, strings.NewReader(metric.String())) if err != nil { - return err + return fmt.Errorf("send metric on test client: %w", err) } return nil } diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 5cc0474bf9de..7e1855fa0d9a 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -9,8 +9,6 @@ import ( "net" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil") @@ -22,7 +20,6 @@ type Server interface { // on the specific transport, and prepares the message to be processed by // the Parser and passed to the next consumer. ListenAndServe( - p protocol.Parser, mc consumer.Metrics, r Reporter, transferChan chan<- Metric, diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index c92f4b373397..a774c0fd4154 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -4,9 +4,9 @@ package transport import ( + "io" "net" "runtime" - "strconv" "sync" "testing" "time" @@ -16,65 +16,43 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) func Test_Server_ListenAndServe(t *testing.T) { tests := []struct { - name string - transport string - buildServerFn func(addr string) (Server, error) - buildClientFn func(host string, port int) (*client.StatsD, error) + name string + transport Transport + buildServerFn func(transport Transport, addr string) (Server, error) + getFreeEndpointFn func(t testing.TB, transport string) string + buildClientFn func(transport string, address string) (*client.StatsD, error) }{ { - name: "udp", - transport: "udp", - buildServerFn: NewUDPServer, - buildClientFn: func(host string, port int) (*client.StatsD, error) { - return client.NewStatsD(client.UDP, host, port) - }, + name: "udp", + transport: UDP, + getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, + buildServerFn: NewUDPServer, + buildClientFn: client.NewStatsD, }, { - name: "tcp", - transport: "tcp", - buildServerFn: NewTCPServer, - buildClientFn: func(host string, port int) (*client.StatsD, error) { - return client.NewStatsD(client.TCP, host, port) - }, + name: "tcp", + transport: TCP, + getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, + buildServerFn: NewTCPServer, + buildClientFn: client.NewStatsD, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - addr := testutil.GetAvailableLocalNetworkAddress(t, tt.transport) - - if tt.transport == "udp" { - // Endpoint should be free. - ln0, err := net.ListenPacket("udp", addr) - require.NoError(t, err) - require.NotNil(t, ln0) - - // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. - ln1, err := net.ListenPacket("udp", addr) - require.Error(t, err) - require.Nil(t, ln1) + addr := tt.getFreeEndpointFn(t, tt.name) + testFreeEndpoint(t, tt.name, addr) - // Unbind the local address so the mock UDP service can use it - err = ln0.Close() - require.NoError(t, err) - } - - srv, err := tt.buildServerFn(addr) + srv, err := tt.buildServerFn(tt.transport, addr) require.NoError(t, err) require.NotNil(t, srv) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - mc := new(consumertest.MetricsSink) - p := &protocol.StatsDParser{} require.NoError(t, err) mr := NewMockReporter(1) transferChan := make(chan Metric, 10) @@ -83,12 +61,12 @@ func Test_Server_ListenAndServe(t *testing.T) { wgListenAndServe.Add(1) go func() { defer wgListenAndServe.Done() - assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan)) + assert.Error(t, srv.ListenAndServe(mc, mr, transferChan)) }() runtime.Gosched() - gc, err := tt.buildClientFn(host, port) + gc, err := tt.buildClientFn(tt.transport.String(), addr) require.NoError(t, err) require.NotNil(t, gc) err = gc.SendMetric(client.Metric{ @@ -115,3 +93,36 @@ func Test_Server_ListenAndServe(t *testing.T) { }) } } + +func testFreeEndpoint(t *testing.T, transport string, address string) { + t.Helper() + + var ln0, ln1 io.Closer + var err0, err1 error + + trans := NewTransport(transport) + require.NotEqual(t, trans, Transport("")) + + if trans.IsPacketTransport() { + // Endpoint should be free. + ln0, err0 = net.ListenPacket(transport, address) + ln1, err1 = net.ListenPacket(transport, address) + } + + if trans.IsStreamTransport() { + // Endpoint should be free. + ln0, err0 = net.Listen(transport, address) + ln1, err1 = net.Listen(transport, address) + } + + // Endpoint should be free. + require.NoError(t, err0) + require.NotNil(t, ln0) + + // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. + require.Error(t, err1) + require.Nil(t, ln1) + + // Unbind the local address so the mock UDP service can use it + require.NoError(t, ln0.Close()) +} diff --git a/receiver/statsdreceiver/internal/transport/tcp_server.go b/receiver/statsdreceiver/internal/transport/tcp_server.go index a464b1927c88..f776bcd88f04 100644 --- a/receiver/statsdreceiver/internal/transport/tcp_server.go +++ b/receiver/statsdreceiver/internal/transport/tcp_server.go @@ -6,43 +6,50 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c import ( "bytes" "errors" + "fmt" "io" "net" "strings" "sync" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) var errTCPServerDone = errors.New("server stopped") type tcpServer struct { - listener net.Listener - reporter Reporter - wg sync.WaitGroup - stopChan chan struct{} + listener net.Listener + reporter Reporter + wg sync.WaitGroup + transport Transport + stopChan chan struct{} } +// Ensure that Server is implemented on TCP Server. var _ Server = (*tcpServer)(nil) // NewTCPServer creates a transport.Server using TCP as its transport. -func NewTCPServer(addr string) (Server, error) { - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, err +func NewTCPServer(transport Transport, address string) (Server, error) { + var tsrv tcpServer + var err error + + if !transport.IsStreamTransport() { + return nil, fmt.Errorf("NewTCPServer with %s: %w", transport.String(), ErrUnsupportedStreamTransport) } - t := tcpServer{ - listener: l, - stopChan: make(chan struct{}), + tsrv.transport = transport + tsrv.listener, err = net.Listen(transport.String(), address) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &t, nil + + tsrv.stopChan = make(chan struct{}) + return &tsrv, nil } -func (t *tcpServer) ListenAndServe(parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { - if parser == nil || nextConsumer == nil || reporter == nil { +// ListenAndServe starts the server ready to receive metrics. +func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { + if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters } @@ -71,6 +78,7 @@ LOOP: return errTCPServerDone } +// handleConn is helper that parses the buffer and split it line by line to be parsed upstream. func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { payload := make([]byte, 4096) var remainder []byte @@ -98,6 +106,7 @@ func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { } } +// Close closes the server. func (t *tcpServer) Close() error { close(t.stopChan) t.wg.Wait() diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go new file mode 100644 index 000000000000..c065e30c746f --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import "errors" + +// Transport is a set of constants of the transport supported by this receiver. +type Transport string + +var ( + ErrUnsupportedTransport = errors.New("unsupported transport") + ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") + ErrUnsupportedStreamTransport = errors.New("unsupported Stream transport") +) + +const ( + UDP Transport = "udp" + UDP4 Transport = "udp4" + UDP6 Transport = "udp6" + TCP Transport = "tcp" + TCP4 Transport = "tcp4" + TCP6 Transport = "tcp6" +) + +// NewTransport creates a Transport based on the transport string or returns an empty Transport. +func NewTransport(ts string) Transport { + trans := Transport(ts) + switch trans { + case UDP, UDP4, UDP6: + return trans + case TCP, TCP4, TCP6: + return trans + } + return Transport("") +} + +// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. +func (trans Transport) String() string { + switch trans { + case UDP, UDP4, UDP6, TCP, TCP4, TCP6: + return string(trans) + } + return "" +} + +// IsPacketTransport returns true if the transport is packet based. +func (trans Transport) IsPacketTransport() bool { + switch trans { + case UDP, UDP4, UDP6: + return true + } + return false +} + +// IsStreamTransport returns true if the transport is stream based. +func (trans Transport) IsStreamTransport() bool { + switch trans { + case TCP, TCP4, TCP6: + return true + } + return false +} diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 7a483ce61649..19ad0803d8fe 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -6,47 +6,49 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c import ( "bytes" "errors" + "fmt" "io" "net" "strings" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) type udpServer struct { packetConn net.PacketConn - reporter Reporter + transport Transport } +// Ensure that Server is implemented on UDP Server. var _ (Server) = (*udpServer)(nil) // NewUDPServer creates a transport.Server using UDP as its transport. -func NewUDPServer(addr string) (Server, error) { - packetConn, err := net.ListenPacket("udp", addr) - if err != nil { - return nil, err +func NewUDPServer(transport Transport, address string) (Server, error) { + if !transport.IsPacketTransport() { + return nil, fmt.Errorf("NewUDPServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) } - u := udpServer{ - packetConn: packetConn, + conn, err := net.ListenPacket(transport.String(), address) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &u, nil + + return &udpServer{ + packetConn: conn, + transport: transport, + }, nil } +// ListenAndServe starts the server ready to receive metrics. func (u *udpServer) ListenAndServe( - parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric, ) error { - if parser == nil || nextConsumer == nil || reporter == nil { + if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters } - u.reporter = reporter - buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) for { n, addr, err := u.packetConn.ReadFrom(buf) @@ -56,7 +58,8 @@ func (u *udpServer) ListenAndServe( u.handlePacket(bufCopy, addr, transferChan) } if err != nil { - u.reporter.OnDebugf("UDP Transport (%s) - ReadFrom error: %v", + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + u.transport, u.packetConn.LocalAddr(), err) var netErr net.Error @@ -70,10 +73,12 @@ func (u *udpServer) ListenAndServe( } } +// Close closes the server. func (u *udpServer) Close() error { return u.packetConn.Close() } +// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. func (u *udpServer) handlePacket( data []byte, addr net.Addr, diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 74d4354e4a62..d22520593dae 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -68,12 +68,13 @@ func newReceiver( } func buildTransportServer(config Config) (transport.Server, error) { - // TODO: Add TCP/unix socket transport implementations - switch strings.ToLower(config.NetAddr.Transport) { - case "", "udp": - return transport.NewUDPServer(config.NetAddr.Endpoint) - case "tcp": - return transport.NewTCPServer(config.NetAddr.Endpoint) + // TODO: Add unix socket transport implementations + trans := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) + switch trans { + case transport.UDP, transport.UDP4, transport.UDP6: + return transport.NewUDPServer(trans, config.NetAddr.Endpoint) + case transport.TCP, transport.TCP4, transport.TCP6: + return transport.NewTCPServer(trans, config.NetAddr.Endpoint) } return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) @@ -98,7 +99,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { return err } go func() { - if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil { + if err := r.server.ListenAndServe(r.nextConsumer, r.reporter, transferChan); err != nil { if !errors.Is(err, net.ErrClosed) { host.ReportFatalError(err) } diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 77f49c49008e..ec2b5cb90942 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -6,8 +6,6 @@ package statsdreceiver import ( "context" "errors" - "net" - "strconv" "testing" "time" @@ -113,19 +111,15 @@ func TestStatsdReceiver_Flush(t *testing.T) { } func Test_statsdreceiver_EndToEnd(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - tests := []struct { name string + addr string configFn func() *Config - clientFn func(t *testing.T) *client.StatsD + clientFn func(t *testing.T, addr string) *client.StatsD }{ { name: "default_config with 4s interval", + addr: testutil.GetAvailableLocalNetworkAddress(t, "udp"), configFn: func() *Config { return &Config{ NetAddr: confignet.NetAddr{ @@ -135,8 +129,8 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { AggregationInterval: 4 * time.Second, } }, - clientFn: func(t *testing.T) *client.StatsD { - c, err := client.NewStatsD(client.UDP, host, port) + clientFn: func(t *testing.T, addr string) *client.StatsD { + c, err := client.NewStatsD("udp", addr) require.NoError(t, err) return c }, @@ -145,7 +139,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() - cfg.NetAddr.Endpoint = addr + cfg.NetAddr.Endpoint = tt.addr sink := new(consumertest.MetricsSink) rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink) require.NoError(t, err) @@ -159,7 +153,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { assert.NoError(t, r.Shutdown(context.Background())) }() - statsdClient := tt.clientFn(t) + statsdClient := tt.clientFn(t, tt.addr) statsdMetric := client.Metric{ Name: "test.metric",