From 86d66ccce78562db131646b944f4b515c9d94d5b Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Tue, 1 Aug 2023 15:25:20 +0200 Subject: [PATCH 01/11] refactor of the tests so they are not hardcoded --- receiver/statsdreceiver/config_test.go | 14 ++- .../internal/transport/client/client.go | 63 ++++------- .../internal/transport/server.go | 3 - .../internal/transport/server_test.go | 105 ++++++++++-------- .../internal/transport/tcp_server.go | 37 +++--- .../internal/transport/transport.go | 60 ++++++++++ .../internal/transport/udp_server.go | 37 +++--- receiver/statsdreceiver/receiver.go | 15 +-- receiver/statsdreceiver/receiver_test.go | 29 +++-- 9 files changed, 213 insertions(+), 150 deletions(-) create mode 100644 receiver/statsdreceiver/internal/transport/transport.go diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index ecfb38ecc0e6..9f4a8dc4e4c4 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -89,10 +89,12 @@ func TestValidate(t *testing.T) { } const ( - negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" - noObjectNameErr = "must specify object id for all TimerHistogramMappings" - statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" - observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" + noObjectNameErr = "must specify object id for all TimerHistogramMappings" + statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" + observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + invalidHistogramErr = "histogram configuration requires observer_type: histogram" + socketConfigurationWithUDSServerErr = "socket configuration is unsupported if the transport is not a unix domain socket" ) tests := []test{ @@ -160,7 +162,7 @@ func TestValidate(t *testing.T) { }, }, }, - expectedErr: "histogram configuration requires observer_type: histogram", + expectedErr: invalidHistogramErr, }, { name: "negativeAggregationInterval", @@ -170,7 +172,7 @@ func TestValidate(t *testing.T) { {StatsdType: "timing", ObserverType: "gauge"}, }, }, - expectedErr: "aggregation_interval must be a positive duration", + expectedErr: negativeAggregationIntervalErr, }, } diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 710aecc5e491..65ec1dce28b5 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -7,33 +7,25 @@ import ( "fmt" "io" "net" + "strings" ) // StatsD defines the properties of a StatsD connection. type StatsD struct { - Host string - Port int - Conn io.Writer + transport string + address string + Conn io.Writer } -// Transport is an enum to select the type of transport. -type Transport int - -const ( - // TCP Transport - TCP Transport = iota - // UDP Transport - UDP -) - // NewStatsD creates a new StatsD instance to support the need for testing // the statsdreceiver package and is not intended/tested to be used in production. -func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { +func NewStatsD(transport string, address string) (*StatsD, error) { statsd := &StatsD{ - Host: host, - Port: port, + transport: transport, + address: address, } - err := statsd.connect(transport) + + err := statsd.connect() if err != nil { return nil, err } @@ -42,38 +34,28 @@ func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { } // connect populates the StatsD.Conn -func (s *StatsD) connect(transport Transport) error { - if cl, ok := s.Conn.(io.Closer); ok { - err := cl.Close() - if err != nil { - return err - } - } - - address := fmt.Sprintf("%s:%d", s.Host, s.Port) - - var err error - switch transport { - case TCP: - s.Conn, err = net.Dial("tcp", address) +func (s *StatsD) connect() error { + switch s.transport { + case "udp": + udpAddr, err := net.ResolveUDPAddr(s.transport, s.address) if err != nil { return err } - case UDP: - var udpAddr *net.UDPAddr - udpAddr, err = net.ResolveUDPAddr("udp", address) + s.Conn, err = net.DialUDP(s.transport, nil, udpAddr) if err != nil { return err } - s.Conn, err = net.DialUDP("udp", nil, udpAddr) + case "tcp": + var err error + s.Conn, err = net.Dial("tcp", s.address) if err != nil { return err } default: - return fmt.Errorf("unknown transport: %d", transport) + return fmt.Errorf("unknown/unsupported transport: %s", s.transport) } - return err + return nil } // Disconnect closes the StatsD.Conn. @@ -88,11 +70,8 @@ func (s *StatsD) Disconnect() error { // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { - _, err := fmt.Fprint(s.Conn, metric.String()) - if err != nil { - return err - } - return nil + _, err := io.Copy(s.Conn, strings.NewReader(metric.String())) + return err } // Metric contains the metric fields for a StatsD message. diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 5cc0474bf9de..7e1855fa0d9a 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -9,8 +9,6 @@ import ( "net" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil") @@ -22,7 +20,6 @@ type Server interface { // on the specific transport, and prepares the message to be processed by // the Parser and passed to the next consumer. ListenAndServe( - p protocol.Parser, mc consumer.Metrics, r Reporter, transferChan chan<- Metric, diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index c92f4b373397..8664bf42857c 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -4,9 +4,9 @@ package transport import ( + "io" "net" "runtime" - "strconv" "sync" "testing" "time" @@ -16,65 +16,49 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) func Test_Server_ListenAndServe(t *testing.T) { tests := []struct { - name string - transport string - buildServerFn func(addr string) (Server, error) - buildClientFn func(host string, port int) (*client.StatsD, error) + name string + transport Transport + buildServerFn func(transport Transport, addr string) (Server, error) + getFreeEndpointFn func(t testing.TB, transport string) string + buildClientFn func(transport string, address string) (*client.StatsD, error) + testSkip bool }{ { - name: "udp", - transport: "udp", - buildServerFn: NewUDPServer, - buildClientFn: func(host string, port int) (*client.StatsD, error) { - return client.NewStatsD(client.UDP, host, port) - }, + name: "udp", + transport: UDP, + getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, + buildServerFn: NewUDPServer, + buildClientFn: client.NewStatsD, }, { - name: "tcp", - transport: "tcp", - buildServerFn: NewTCPServer, - buildClientFn: func(host string, port int) (*client.StatsD, error) { - return client.NewStatsD(client.TCP, host, port) - }, + name: "tcp", + transport: TCP, + getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, + buildServerFn: NewTCPServer, + buildClientFn: client.NewStatsD, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - addr := testutil.GetAvailableLocalNetworkAddress(t, tt.transport) - - if tt.transport == "udp" { - // Endpoint should be free. - ln0, err := net.ListenPacket("udp", addr) - require.NoError(t, err) - require.NotNil(t, ln0) - // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. - ln1, err := net.ListenPacket("udp", addr) - require.Error(t, err) - require.Nil(t, ln1) + for _, tt := range tests { + if tt.testSkip { + continue + } - // Unbind the local address so the mock UDP service can use it - err = ln0.Close() - require.NoError(t, err) - } + t.Run(tt.name, func(t *testing.T) { + trans := Transport(tt.name) + addr := tt.getFreeEndpointFn(t, tt.name) + testFreeEndpoint(t, tt.name, addr) - srv, err := tt.buildServerFn(addr) + srv, err := tt.buildServerFn(trans, addr) require.NoError(t, err) require.NotNil(t, srv) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - mc := new(consumertest.MetricsSink) - p := &protocol.StatsDParser{} require.NoError(t, err) mr := NewMockReporter(1) transferChan := make(chan Metric, 10) @@ -83,12 +67,12 @@ func Test_Server_ListenAndServe(t *testing.T) { wgListenAndServe.Add(1) go func() { defer wgListenAndServe.Done() - assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan)) + assert.Error(t, srv.ListenAndServe(mc, mr, transferChan)) }() runtime.Gosched() - gc, err := tt.buildClientFn(host, port) + gc, err := tt.buildClientFn(tt.name, addr) require.NoError(t, err) require.NotNil(t, gc) err = gc.SendMetric(client.Metric{ @@ -115,3 +99,36 @@ func Test_Server_ListenAndServe(t *testing.T) { }) } } + +func testFreeEndpoint(t *testing.T, transport string, address string) { + t.Helper() + + var ln0, ln1 io.Closer + var err0, err1 error + + trans, err := NewTransport(transport) + require.NoError(t, err) + + if trans.IsPacketTransport() { + // Endpoint should be free. + ln0, err0 = net.ListenPacket(transport, address) + ln1, err1 = net.ListenPacket(transport, address) + } + + if trans.IsStreamTransport() { + // Endpoint should be free. + ln0, err0 = net.Listen(transport, address) + ln1, err1 = net.Listen(transport, address) + } + + // Endpoint should be free. + require.NoError(t, err0) + require.NotNil(t, ln0) + + // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. + require.Error(t, err1) + require.Nil(t, ln1) + + // Unbind the local address so the mock UDP service can use it + require.NoError(t, ln0.Close()) +} diff --git a/receiver/statsdreceiver/internal/transport/tcp_server.go b/receiver/statsdreceiver/internal/transport/tcp_server.go index a464b1927c88..513532b04d8f 100644 --- a/receiver/statsdreceiver/internal/transport/tcp_server.go +++ b/receiver/statsdreceiver/internal/transport/tcp_server.go @@ -6,43 +6,48 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c import ( "bytes" "errors" + "fmt" "io" "net" "strings" "sync" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) var errTCPServerDone = errors.New("server stopped") type tcpServer struct { - listener net.Listener - reporter Reporter - wg sync.WaitGroup - stopChan chan struct{} + listener net.Listener + reporter Reporter + wg sync.WaitGroup + transport Transport + stopChan chan struct{} } var _ Server = (*tcpServer)(nil) // NewTCPServer creates a transport.Server using TCP as its transport. -func NewTCPServer(addr string) (Server, error) { - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, err +func NewTCPServer(transport Transport, address string) (Server, error) { + var tsrv tcpServer + var err error + + if !transport.IsStreamTransport() { + return nil, fmt.Errorf("NewTCPServer with %s: %w", transport.String(), ErrUnsupportedStreamTransport) } - t := tcpServer{ - listener: l, - stopChan: make(chan struct{}), + tsrv.transport = transport + tsrv.listener, err = net.Listen(transport.String(), address) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &t, nil + + tsrv.stopChan = make(chan struct{}) + return &tsrv, nil } -func (t *tcpServer) ListenAndServe(parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { - if parser == nil || nextConsumer == nil || reporter == nil { +func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { + if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters } diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go new file mode 100644 index 000000000000..a769f2649277 --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -0,0 +1,60 @@ +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import "errors" + +// Transport is a set of constants of the transport supported by this receiver. +type Transport string + +var ( + ErrUnsupportedTransport = errors.New("unsupported transport") + ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") + ErrUnsupportedStreamTransport = errors.New("unsupported Stream transport") +) + +const ( + UDP Transport = "udp" + UDP4 Transport = "udp4" + UDP6 Transport = "udp6" + TCP Transport = "tcp" + TCP4 Transport = "tcp4" + TCP6 Transport = "tcp6" +) + +// Create a Transport based on the transport string or return error if it is not supported. +func NewTransport(ts string) (Transport, error) { + trans := Transport(ts) + switch trans { + case UDP, UDP4, UDP6: + return trans, nil + case TCP, TCP4, TCP6: + return trans, nil + } + return Transport(""), ErrUnsupportedTransport +} + +// Returns the string of this transport. +func (trans Transport) String() string { + switch trans { + case UDP, UDP4, UDP6, TCP, TCP4, TCP6: + return string(trans) + } + return "" +} + +// Returns true if the transport is packet based. +func (trans Transport) IsPacketTransport() bool { + switch trans { + case UDP, UDP4, UDP6: + return true + } + return false +} + +// Returns true if the transport is stream based. +func (trans Transport) IsStreamTransport() bool { + switch trans { + case TCP, TCP4, TCP6: + return true + } + return false +} diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 7a483ce61649..4b02e0d69dec 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -6,47 +6,50 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c import ( "bytes" "errors" + "fmt" "io" "net" "strings" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) type udpServer struct { packetConn net.PacketConn - reporter Reporter + transport Transport } +// Ensure that Server is implemented on UDP Server. var _ (Server) = (*udpServer)(nil) // NewUDPServer creates a transport.Server using UDP as its transport. -func NewUDPServer(addr string) (Server, error) { - packetConn, err := net.ListenPacket("udp", addr) - if err != nil { - return nil, err +func NewUDPServer(transport Transport, address string) (Server, error) { + var usrv udpServer + var err error + + if !transport.IsPacketTransport() { + return nil, fmt.Errorf("NewUDPServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) } - u := udpServer{ - packetConn: packetConn, + usrv.transport = transport + usrv.packetConn, err = net.ListenPacket(transport.String(), address) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &u, nil + + return &usrv, nil } +// Start the server ready to recevie metrics. func (u *udpServer) ListenAndServe( - parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric, ) error { - if parser == nil || nextConsumer == nil || reporter == nil { + if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters } - u.reporter = reporter - buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) for { n, addr, err := u.packetConn.ReadFrom(buf) @@ -56,10 +59,8 @@ func (u *udpServer) ListenAndServe( u.handlePacket(bufCopy, addr, transferChan) } if err != nil { - u.reporter.OnDebugf("UDP Transport (%s) - ReadFrom error: %v", - u.packetConn.LocalAddr(), - err) var netErr net.Error + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", u.transport, u.packetConn.LocalAddr(), err) if errors.As(err, &netErr) { if netErr.Timeout() { continue @@ -70,10 +71,12 @@ func (u *udpServer) ListenAndServe( } } +// Closes the server func (u *udpServer) Close() error { return u.packetConn.Close() } +// This helper parses the buffer and split it line bye line to be parsed upstream. func (u *udpServer) handlePacket( data []byte, addr net.Addr, diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 74d4354e4a62..19ebd6becc6b 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -69,14 +69,15 @@ func newReceiver( func buildTransportServer(config Config) (transport.Server, error) { // TODO: Add TCP/unix socket transport implementations - switch strings.ToLower(config.NetAddr.Transport) { - case "", "udp": - return transport.NewUDPServer(config.NetAddr.Endpoint) - case "tcp": - return transport.NewTCPServer(config.NetAddr.Endpoint) + trans, err := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) + switch trans { + case transport.UDP, transport.UDP4, transport.UDP6: + return transport.NewUDPServer(trans, config.NetAddr.Endpoint) + case transport.TCP, transport.TCP4, transport.TCP6: + return transport.NewTCPServer(trans, config.NetAddr.Endpoint) } - return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) + return nil, fmt.Errorf("error building server with transport %q: %w", config.NetAddr.Transport, err) } // Start starts a UDP server that can process StatsD messages. @@ -98,7 +99,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { return err } go func() { - if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil { + if err := r.server.ListenAndServe(r.nextConsumer, r.reporter, transferChan); err != nil { if !errors.Is(err, net.ErrClosed) { host.ReportFatalError(err) } diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 77f49c49008e..a5db7d3358e9 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -6,8 +6,6 @@ package statsdreceiver import ( "context" "errors" - "net" - "strconv" "testing" "time" @@ -74,7 +72,7 @@ func Test_statsdreceiver_Start(t *testing.T) { }, nextConsumer: consumertest.NewNop(), }, - wantErr: errors.New("unsupported transport \"unknown\""), + wantErr: errors.New("error building server with transport \"unknown\": unsupported transport"), }, } for _, tt := range tests { @@ -82,7 +80,7 @@ func Test_statsdreceiver_Start(t *testing.T) { receiver, err := newReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer) require.NoError(t, err) err = receiver.Start(context.Background(), componenttest.NewNopHost()) - assert.Equal(t, tt.wantErr, err) + assert.Equal(t, tt.wantErr.Error(), err.Error()) assert.NoError(t, receiver.Shutdown(context.Background())) }) @@ -113,19 +111,16 @@ func TestStatsdReceiver_Flush(t *testing.T) { } func Test_statsdreceiver_EndToEnd(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - tests := []struct { name string + addr string configFn func() *Config - clientFn func(t *testing.T) *client.StatsD + clientFn func(t *testing.T, addr string) *client.StatsD + testSkip bool }{ { name: "default_config with 4s interval", + addr: testutil.GetAvailableLocalNetworkAddress(t, "udp"), configFn: func() *Config { return &Config{ NetAddr: confignet.NetAddr{ @@ -135,17 +130,21 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { AggregationInterval: 4 * time.Second, } }, - clientFn: func(t *testing.T) *client.StatsD { - c, err := client.NewStatsD(client.UDP, host, port) + clientFn: func(t *testing.T, addr string) *client.StatsD { + c, err := client.NewStatsD("udp", addr) require.NoError(t, err) return c }, }, } for _, tt := range tests { + if tt.testSkip { + continue + } + t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() - cfg.NetAddr.Endpoint = addr + cfg.NetAddr.Endpoint = tt.addr sink := new(consumertest.MetricsSink) rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink) require.NoError(t, err) @@ -159,7 +158,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { assert.NoError(t, r.Shutdown(context.Background())) }() - statsdClient := tt.clientFn(t) + statsdClient := tt.clientFn(t, tt.addr) statsdMetric := client.Metric{ Name: "test.metric", From fafdb82f8c58a5a3f53a34b219242722046d3bdb Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 10:34:57 +0200 Subject: [PATCH 02/11] address christian comments --- .../statsdreceiver/internal/transport/server_test.go | 4 ++-- receiver/statsdreceiver/internal/transport/transport.go | 8 ++++---- receiver/statsdreceiver/internal/transport/udp_server.go | 5 ++++- receiver/statsdreceiver/receiver.go | 6 +++--- receiver/statsdreceiver/receiver_test.go | 9 ++------- 5 files changed, 15 insertions(+), 17 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 8664bf42857c..22efd4bf66a0 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -106,8 +106,8 @@ func testFreeEndpoint(t *testing.T, transport string, address string) { var ln0, ln1 io.Closer var err0, err1 error - trans, err := NewTransport(transport) - require.NoError(t, err) + trans := NewTransport(transport) + require.NotEqual(t, trans, Transport("")) if trans.IsPacketTransport() { // Endpoint should be free. diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index a769f2649277..5a6b28733793 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -21,15 +21,15 @@ const ( ) // Create a Transport based on the transport string or return error if it is not supported. -func NewTransport(ts string) (Transport, error) { +func NewTransport(ts string) Transport { trans := Transport(ts) switch trans { case UDP, UDP4, UDP6: - return trans, nil + return trans case TCP, TCP4, TCP6: - return trans, nil + return trans } - return Transport(""), ErrUnsupportedTransport + return Transport("") } // Returns the string of this transport. diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 4b02e0d69dec..1a99368154f5 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -59,8 +59,11 @@ func (u *udpServer) ListenAndServe( u.handlePacket(bufCopy, addr, transferChan) } if err != nil { + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + u.transport, + u.packetConn.LocalAddr(), + err) var netErr net.Error - reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", u.transport, u.packetConn.LocalAddr(), err) if errors.As(err, &netErr) { if netErr.Timeout() { continue diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 19ebd6becc6b..d22520593dae 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -68,8 +68,8 @@ func newReceiver( } func buildTransportServer(config Config) (transport.Server, error) { - // TODO: Add TCP/unix socket transport implementations - trans, err := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) + // TODO: Add unix socket transport implementations + trans := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) switch trans { case transport.UDP, transport.UDP4, transport.UDP6: return transport.NewUDPServer(trans, config.NetAddr.Endpoint) @@ -77,7 +77,7 @@ func buildTransportServer(config Config) (transport.Server, error) { return transport.NewTCPServer(trans, config.NetAddr.Endpoint) } - return nil, fmt.Errorf("error building server with transport %q: %w", config.NetAddr.Transport, err) + return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) } // Start starts a UDP server that can process StatsD messages. diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index a5db7d3358e9..ec2b5cb90942 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -72,7 +72,7 @@ func Test_statsdreceiver_Start(t *testing.T) { }, nextConsumer: consumertest.NewNop(), }, - wantErr: errors.New("error building server with transport \"unknown\": unsupported transport"), + wantErr: errors.New("unsupported transport \"unknown\""), }, } for _, tt := range tests { @@ -80,7 +80,7 @@ func Test_statsdreceiver_Start(t *testing.T) { receiver, err := newReceiver(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer) require.NoError(t, err) err = receiver.Start(context.Background(), componenttest.NewNopHost()) - assert.Equal(t, tt.wantErr.Error(), err.Error()) + assert.Equal(t, tt.wantErr, err) assert.NoError(t, receiver.Shutdown(context.Background())) }) @@ -116,7 +116,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { addr string configFn func() *Config clientFn func(t *testing.T, addr string) *client.StatsD - testSkip bool }{ { name: "default_config with 4s interval", @@ -138,10 +137,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { }, } for _, tt := range tests { - if tt.testSkip { - continue - } - t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() cfg.NetAddr.Endpoint = tt.addr From dabf42a3ff36e9d8a804b47899716d96900424a0 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 10:41:30 +0200 Subject: [PATCH 03/11] address marc comments --- receiver/statsdreceiver/internal/transport/transport.go | 8 ++++---- receiver/statsdreceiver/internal/transport/udp_server.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index 5a6b28733793..98c822138161 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -20,7 +20,7 @@ const ( TCP6 Transport = "tcp6" ) -// Create a Transport based on the transport string or return error if it is not supported. +// NewTransport creates a Transport based on the transport string or returns an empty Transport. func NewTransport(ts string) Transport { trans := Transport(ts) switch trans { @@ -32,7 +32,7 @@ func NewTransport(ts string) Transport { return Transport("") } -// Returns the string of this transport. +// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. func (trans Transport) String() string { switch trans { case UDP, UDP4, UDP6, TCP, TCP4, TCP6: @@ -41,7 +41,7 @@ func (trans Transport) String() string { return "" } -// Returns true if the transport is packet based. +// IsPacketTransport returns true if the transport is packet based. func (trans Transport) IsPacketTransport() bool { switch trans { case UDP, UDP4, UDP6: @@ -50,7 +50,7 @@ func (trans Transport) IsPacketTransport() bool { return false } -// Returns true if the transport is stream based. +// IsStreamTransport returns true if the transport is stream based. func (trans Transport) IsStreamTransport() bool { switch trans { case TCP, TCP4, TCP6: diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 1a99368154f5..a939cce63848 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -40,7 +40,7 @@ func NewUDPServer(transport Transport, address string) (Server, error) { return &usrv, nil } -// Start the server ready to recevie metrics. +// ListenAndServe starts the server ready to receive metrics. func (u *udpServer) ListenAndServe( nextConsumer consumer.Metrics, reporter Reporter, @@ -74,12 +74,12 @@ func (u *udpServer) ListenAndServe( } } -// Closes the server +// Close closes the server. func (u *udpServer) Close() error { return u.packetConn.Close() } -// This helper parses the buffer and split it line bye line to be parsed upstream. +// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. func (u *udpServer) handlePacket( data []byte, addr net.Addr, From df709ae03c4443d5af41a9c46b5c926987e6d260 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 10:53:49 +0200 Subject: [PATCH 04/11] address paolo comments --- .../internal/transport/client/client.go | 16 ++++++++-------- .../internal/transport/udp_server.go | 11 +++++------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 65ec1dce28b5..32bcc974d3ba 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -14,7 +14,7 @@ import ( type StatsD struct { transport string address string - Conn io.Writer + conn io.Writer } // NewStatsD creates a new StatsD instance to support the need for testing @@ -33,7 +33,7 @@ func NewStatsD(transport string, address string) (*StatsD, error) { return statsd, nil } -// connect populates the StatsD.Conn +// connect populates the StatsD.conn func (s *StatsD) connect() error { switch s.transport { case "udp": @@ -41,7 +41,7 @@ func (s *StatsD) connect() error { if err != nil { return err } - s.Conn, err = net.DialUDP(s.transport, nil, udpAddr) + s.conn, err = net.DialUDP(s.transport, nil, udpAddr) if err != nil { return err } @@ -58,20 +58,20 @@ func (s *StatsD) connect() error { return nil } -// Disconnect closes the StatsD.Conn. +// Disconnect closes the StatsD.conn. func (s *StatsD) Disconnect() error { var err error - if cl, ok := s.Conn.(io.Closer); ok { + if cl, ok := s.conn.(io.Closer); ok { err = cl.Close() } - s.Conn = nil + s.conn = nil return err } // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { - _, err := io.Copy(s.Conn, strings.NewReader(metric.String())) - return err + _, err := io.Copy(s.conn, strings.NewReader(metric.String())) + return fmt.Errorf("send metric on test client: %w", err) } // Metric contains the metric fields for a StatsD message. diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index a939cce63848..19ad0803d8fe 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -24,20 +24,19 @@ var _ (Server) = (*udpServer)(nil) // NewUDPServer creates a transport.Server using UDP as its transport. func NewUDPServer(transport Transport, address string) (Server, error) { - var usrv udpServer - var err error - if !transport.IsPacketTransport() { return nil, fmt.Errorf("NewUDPServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport) } - usrv.transport = transport - usrv.packetConn, err = net.ListenPacket(transport.String(), address) + conn, err := net.ListenPacket(transport.String(), address) if err != nil { return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &usrv, nil + return &udpServer{ + packetConn: conn, + transport: transport, + }, nil } // ListenAndServe starts the server ready to receive metrics. From 72b3494f19ad8e71559d3f11160d01ff351c2e84 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 11:09:47 +0200 Subject: [PATCH 05/11] add changelog --- ...ceiver_statsdreceiver_add-uds-support.yaml | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100755 .chloggen/receiver_statsdreceiver_add-uds-support.yaml diff --git a/.chloggen/receiver_statsdreceiver_add-uds-support.yaml b/.chloggen/receiver_statsdreceiver_add-uds-support.yaml new file mode 100755 index 000000000000..ef1c79e17d57 --- /dev/null +++ b/.chloggen/receiver_statsdreceiver_add-uds-support.yaml @@ -0,0 +1,21 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "bug_fix" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/statsd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: refactor of hardcoded tests for UDP Transport + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: + - 21385 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 9725ce07508b8bec4fd1c23bfd4268be67de5ae5 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 11:13:01 +0200 Subject: [PATCH 06/11] remove leftovers --- receiver/statsdreceiver/config_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index 9f4a8dc4e4c4..b9d726e0baca 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -89,12 +89,11 @@ func TestValidate(t *testing.T) { } const ( - negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" - noObjectNameErr = "must specify object id for all TimerHistogramMappings" - statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" - observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" - invalidHistogramErr = "histogram configuration requires observer_type: histogram" - socketConfigurationWithUDSServerErr = "socket configuration is unsupported if the transport is not a unix domain socket" + negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" + noObjectNameErr = "must specify object id for all TimerHistogramMappings" + statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" + observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + invalidHistogramErr = "histogram configuration requires observer_type: histogram" ) tests := []test{ From ac5c4a5a2b5ec96793845ec97088530dae5bff77 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 12:37:01 +0200 Subject: [PATCH 07/11] return err only if it is an error --- receiver/statsdreceiver/internal/transport/client/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 32bcc974d3ba..fad9df45e0af 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -71,7 +71,10 @@ func (s *StatsD) Disconnect() error { // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { _, err := io.Copy(s.conn, strings.NewReader(metric.String())) - return fmt.Errorf("send metric on test client: %w", err) + if err != nil { + return fmt.Errorf("send metric on test client: %w", err) + } + return nil } // Metric contains the metric fields for a StatsD message. From 47e59e7761dfde66f51fca57db118736983b1692 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:02:37 +0200 Subject: [PATCH 08/11] fix linter license --- receiver/statsdreceiver/internal/transport/transport.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index 98c822138161..c065e30c746f 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" import "errors" From e3661d2368e67f7a52af1595538a5bf5840985c8 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:30:55 +0200 Subject: [PATCH 09/11] remove more leftovers --- receiver/statsdreceiver/internal/transport/server_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 22efd4bf66a0..d59934cb630d 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -26,7 +26,6 @@ func Test_Server_ListenAndServe(t *testing.T) { buildServerFn func(transport Transport, addr string) (Server, error) getFreeEndpointFn func(t testing.TB, transport string) string buildClientFn func(transport string, address string) (*client.StatsD, error) - testSkip bool }{ { name: "udp", @@ -45,10 +44,6 @@ func Test_Server_ListenAndServe(t *testing.T) { } for _, tt := range tests { - if tt.testSkip { - continue - } - t.Run(tt.name, func(t *testing.T) { trans := Transport(tt.name) addr := tt.getFreeEndpointFn(t, tt.name) From 331323ed4df7bf698508a2d4e96ebe882c9677de Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Mon, 21 Aug 2023 12:52:01 +0200 Subject: [PATCH 10/11] remove changelog --- ...ceiver_statsdreceiver_add-uds-support.yaml | 21 ------------------- 1 file changed, 21 deletions(-) delete mode 100755 .chloggen/receiver_statsdreceiver_add-uds-support.yaml diff --git a/.chloggen/receiver_statsdreceiver_add-uds-support.yaml b/.chloggen/receiver_statsdreceiver_add-uds-support.yaml deleted file mode 100755 index ef1c79e17d57..000000000000 --- a/.chloggen/receiver_statsdreceiver_add-uds-support.yaml +++ /dev/null @@ -1,21 +0,0 @@ -# Use this changelog template to create an entry for release notes. -# If your change doesn't affect end users, such as a test fix or a tooling change, -# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. - -# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: "bug_fix" - -# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: receiver/statsd - -# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: refactor of hardcoded tests for UDP Transport - -# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: - - 21385 - -# (Optional) One or more lines of additional information to render under the primary note. -# These lines will be padded with 2 spaces and then inserted directly into the document. -# Use pipe (|) for multiline entries. -subtext: From 51be5d32a083da282f0e85202314736350bf8aff Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Mon, 30 Oct 2023 11:35:00 +0100 Subject: [PATCH 11/11] fix rebase shenanigans --- receiver/statsdreceiver/internal/transport/client/client.go | 2 +- receiver/statsdreceiver/internal/transport/server_test.go | 5 ++--- receiver/statsdreceiver/internal/transport/tcp_server.go | 4 ++++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index fad9df45e0af..8b9fd7f06ba3 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -47,7 +47,7 @@ func (s *StatsD) connect() error { } case "tcp": var err error - s.Conn, err = net.Dial("tcp", s.address) + s.conn, err = net.Dial(s.transport, s.address) if err != nil { return err } diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index d59934cb630d..a774c0fd4154 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -45,11 +45,10 @@ func Test_Server_ListenAndServe(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - trans := Transport(tt.name) addr := tt.getFreeEndpointFn(t, tt.name) testFreeEndpoint(t, tt.name, addr) - srv, err := tt.buildServerFn(trans, addr) + srv, err := tt.buildServerFn(tt.transport, addr) require.NoError(t, err) require.NotNil(t, srv) @@ -67,7 +66,7 @@ func Test_Server_ListenAndServe(t *testing.T) { runtime.Gosched() - gc, err := tt.buildClientFn(tt.name, addr) + gc, err := tt.buildClientFn(tt.transport.String(), addr) require.NoError(t, err) require.NotNil(t, gc) err = gc.SendMetric(client.Metric{ diff --git a/receiver/statsdreceiver/internal/transport/tcp_server.go b/receiver/statsdreceiver/internal/transport/tcp_server.go index 513532b04d8f..f776bcd88f04 100644 --- a/receiver/statsdreceiver/internal/transport/tcp_server.go +++ b/receiver/statsdreceiver/internal/transport/tcp_server.go @@ -25,6 +25,7 @@ type tcpServer struct { stopChan chan struct{} } +// Ensure that Server is implemented on TCP Server. var _ Server = (*tcpServer)(nil) // NewTCPServer creates a transport.Server using TCP as its transport. @@ -46,6 +47,7 @@ func NewTCPServer(transport Transport, address string) (Server, error) { return &tsrv, nil } +// ListenAndServe starts the server ready to receive metrics. func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters @@ -76,6 +78,7 @@ LOOP: return errTCPServerDone } +// handleConn is helper that parses the buffer and split it line by line to be parsed upstream. func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { payload := make([]byte, 4096) var remainder []byte @@ -103,6 +106,7 @@ func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { } } +// Close closes the server. func (t *tcpServer) Close() error { close(t.stopChan) t.wg.Wait()