From 3389085aa9d08ce552e819028c1fb19439ec98f8 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Tue, 1 Aug 2023 15:25:20 +0200 Subject: [PATCH 01/17] refactor of the tests so they are not hardcoded --- receiver/statsdreceiver/config_test.go | 14 ++-- .../internal/transport/client/client.go | 58 +++++--------- .../internal/transport/server.go | 3 - .../internal/transport/server_test.go | 80 +++++++++++-------- .../internal/transport/transport.go | 45 +++++++++++ .../internal/transport/udp_server.go | 37 +++++---- receiver/statsdreceiver/receiver.go | 11 +-- receiver/statsdreceiver/receiver_test.go | 29 ++++--- 8 files changed, 158 insertions(+), 119 deletions(-) create mode 100644 receiver/statsdreceiver/internal/transport/transport.go diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index 73cad547990b..eefcda88b485 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -82,10 +82,12 @@ func TestValidate(t *testing.T) { } const ( - negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" - noObjectNameErr = "must specify object id for all TimerHistogramMappings" - statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" - observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" + noObjectNameErr = "must specify object id for all TimerHistogramMappings" + statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" + observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + invalidHistogramErr = "histogram configuration requires observer_type: histogram" + socketConfigurationWithUDSServerErr = "socket configuration is unsupported if the transport is not a unix domain socket" ) tests := []test{ @@ -153,7 +155,7 @@ func TestValidate(t *testing.T) { }, }, }, - expectedErr: "histogram configuration requires observer_type: histogram", + expectedErr: invalidHistogramErr, }, { name: "negativeAggregationInterval", @@ -163,7 +165,7 @@ func TestValidate(t *testing.T) { {StatsdType: "timing", ObserverType: "gauge"}, }, }, - expectedErr: "aggregation_interval must be a positive duration", + expectedErr: negativeAggregationIntervalErr, }, } diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index ba40dbcf1deb..3e0121586e78 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -7,33 +7,25 @@ import ( "fmt" "io" "net" + "strings" ) // StatsD defines the properties of a StatsD connection. type StatsD struct { - Host string - Port int - Conn io.Writer + transport string + address string + Conn io.Writer } -// Transport is an enum to select the type of transport. -type Transport int - -const ( - // TCP Transport - TCP Transport = iota - // UDP Transport - UDP -) - // NewStatsD creates a new StatsD instance to support the need for testing // the statsdreceiver package and is not intended/tested to be used in production. -func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { +func NewStatsD(transport string, address string) (*StatsD, error) { statsd := &StatsD{ - Host: host, - Port: port, + transport: transport, + address: address, } - err := statsd.connect(transport) + + err := statsd.connect() if err != nil { return nil, err } @@ -42,33 +34,22 @@ func NewStatsD(transport Transport, host string, port int) (*StatsD, error) { } // connect populates the StatsD.Conn -func (s *StatsD) connect(transport Transport) error { - if cl, ok := s.Conn.(io.Closer); ok { - cl.Close() - } - - address := fmt.Sprintf("%s:%d", s.Host, s.Port) - - var err error - switch transport { - case TCP: - // TODO: implement TCP support - return fmt.Errorf("TCP unsupported") - case UDP: - var udpAddr *net.UDPAddr - udpAddr, err = net.ResolveUDPAddr("udp", address) +func (s *StatsD) connect() error { + switch s.transport { + case "udp": + udpAddr, err := net.ResolveUDPAddr(s.transport, s.address) if err != nil { return err } - s.Conn, err = net.DialUDP("udp", nil, udpAddr) + s.Conn, err = net.DialUDP(s.transport, nil, udpAddr) if err != nil { return err } default: - return fmt.Errorf("unknown transport: %d", transport) + return fmt.Errorf("unknown/unsupported transport: %s", s.transport) } - return err + return nil } // Disconnect closes the StatsD.Conn. @@ -83,11 +64,8 @@ func (s *StatsD) Disconnect() error { // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { - _, err := fmt.Fprint(s.Conn, metric.String()) - if err != nil { - return err - } - return nil + _, err := io.Copy(s.Conn, strings.NewReader(metric.String())) + return err } // Metric contains the metric fields for a StatsD message. diff --git a/receiver/statsdreceiver/internal/transport/server.go b/receiver/statsdreceiver/internal/transport/server.go index 7b3cdab6006f..c551e4312cd9 100644 --- a/receiver/statsdreceiver/internal/transport/server.go +++ b/receiver/statsdreceiver/internal/transport/server.go @@ -9,8 +9,6 @@ import ( "net" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) var errNilListenAndServeParameters = errors.New("no parameter of ListenAndServe can be nil") @@ -22,7 +20,6 @@ type Server interface { // on the specific transport, and prepares the message to be processed by // the Parser and passed to the next consumer. ListenAndServe( - p protocol.Parser, mc consumer.Metrics, r Reporter, transferChan chan<- Metric, diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 5a7b545a9a75..55b549e31a25 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -4,9 +4,9 @@ package transport import ( + "io" "net" "runtime" - "strconv" "sync" "testing" "time" @@ -16,52 +16,39 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) func Test_Server_ListenAndServe(t *testing.T) { tests := []struct { - name string - buildServerFn func(addr string) (Server, error) - buildClientFn func(host string, port int) (*client.StatsD, error) + name string + buildServerFn func(transport Transport, addr string) (Server, error) + getFreeEndpointFn func(t testing.TB, transport string) string + buildClientFn func(transport string, address string) (*client.StatsD, error) + testSkip bool }{ { - name: "udp", - buildServerFn: NewUDPServer, - buildClientFn: func(host string, port int) (*client.StatsD, error) { - return client.NewStatsD(client.UDP, host, port) - }, + name: "udp", + getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, + buildServerFn: NewUDPServer, + buildClientFn: client.NewStatsD, }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - addr := testutil.GetAvailableLocalNetworkAddress(t, "udp") - - // Endpoint should be free. - ln0, err := net.ListenPacket("udp", addr) - require.NoError(t, err) - require.NotNil(t, ln0) + if tt.testSkip { + continue + } - // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. - ln1, err := net.ListenPacket("udp", addr) - require.Error(t, err) - require.Nil(t, ln1) - - // Unbind the local address so the mock UDP service can use it - ln0.Close() + t.Run(tt.name, func(t *testing.T) { + trans := Transport(tt.name) + addr := tt.getFreeEndpointFn(t, tt.name) + testFreeEndpoint(t, tt.name, addr) - srv, err := tt.buildServerFn(addr) + srv, err := tt.buildServerFn(trans, addr) require.NoError(t, err) require.NotNil(t, srv) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - mc := new(consumertest.MetricsSink) - p := &protocol.StatsDParser{} require.NoError(t, err) mr := NewMockReporter(1) transferChan := make(chan Metric, 10) @@ -70,12 +57,12 @@ func Test_Server_ListenAndServe(t *testing.T) { wgListenAndServe.Add(1) go func() { defer wgListenAndServe.Done() - assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan)) + assert.Error(t, srv.ListenAndServe(mc, mr, transferChan)) }() runtime.Gosched() - gc, err := tt.buildClientFn(host, port) + gc, err := tt.buildClientFn(tt.name, addr) require.NoError(t, err) require.NotNil(t, gc) err = gc.SendMetric(client.Metric{ @@ -102,3 +89,30 @@ func Test_Server_ListenAndServe(t *testing.T) { }) } } + +func testFreeEndpoint(t *testing.T, transport string, address string) { + t.Helper() + + var ln0, ln1 io.Closer + var err0, err1 error + + trans, err := NewTransport(transport) + require.NoError(t, err) + + if trans.IsPacketTransport() { + // Endpoint should be free. + ln0, err0 = net.ListenPacket(transport, address) + ln1, err1 = net.ListenPacket(transport, address) + } + + // Endpoint should be free. + require.NoError(t, err0) + require.NotNil(t, ln0) + + // Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail. + require.Error(t, err1) + require.Nil(t, ln1) + + // Unbind the local address so the mock UDP service can use it + require.NoError(t, ln0.Close()) +} diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go new file mode 100644 index 000000000000..39eee2fa4e6d --- /dev/null +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -0,0 +1,45 @@ +package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" + +import "errors" + +// Transport is a set of constants of the transport supported by this receiver. +type Transport string + +var ( + ErrUnsupportedTransport = errors.New("unsupported transport") + ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") +) + +const ( + UDP Transport = "udp" + UDP4 Transport = "udp4" + UDP6 Transport = "udp6" +) + +// Create a Transport based on the transport string or return error if it is not supported. +func NewTransport(ts string) (Transport, error) { + trans := Transport(ts) + switch trans { + case UDP, UDP4, UDP6: + return trans, nil + } + return Transport(""), ErrUnsupportedTransport +} + +// Returns the string of this transport. +func (trans Transport) String() string { + switch trans { + case UDP, UDP4, UDP6: + return string(trans) + } + return "" +} + +// Returns true if the transport us packet based. +func (trans Transport) IsPacketTransport() bool { + switch trans { + case UDP, UDP4, UDP6: + return true + } + return false +} diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 7a483ce61649..7245ac2aa564 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -6,47 +6,50 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c import ( "bytes" "errors" + "fmt" "io" "net" "strings" "go.opentelemetry.io/collector/consumer" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol" ) type udpServer struct { packetConn net.PacketConn - reporter Reporter + transport Transport } +// Ensure that Server is implemented on UDP Server. var _ (Server) = (*udpServer)(nil) // NewUDPServer creates a transport.Server using UDP as its transport. -func NewUDPServer(addr string) (Server, error) { - packetConn, err := net.ListenPacket("udp", addr) - if err != nil { - return nil, err +func NewUDPServer(transport Transport, address string) (Server, error) { + var usrv udpServer + var err error + + if !transport.IsPacketTransport() { + return nil, ErrUnsupportedPacketTransport } - u := udpServer{ - packetConn: packetConn, + usrv.transport = transport + usrv.packetConn, err = net.ListenPacket(transport.String(), address) + if err != nil { + return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &u, nil + + return &usrv, nil } +// Start the server ready to recevie metrics. func (u *udpServer) ListenAndServe( - parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric, ) error { - if parser == nil || nextConsumer == nil || reporter == nil { + if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters } - u.reporter = reporter - buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) for { n, addr, err := u.packetConn.ReadFrom(buf) @@ -56,10 +59,8 @@ func (u *udpServer) ListenAndServe( u.handlePacket(bufCopy, addr, transferChan) } if err != nil { - u.reporter.OnDebugf("UDP Transport (%s) - ReadFrom error: %v", - u.packetConn.LocalAddr(), - err) var netErr net.Error + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", u.transport, u.packetConn.LocalAddr(), err) if errors.As(err, &netErr) { if netErr.Timeout() { continue @@ -70,10 +71,12 @@ func (u *udpServer) ListenAndServe( } } +// Closes the server func (u *udpServer) Close() error { return u.packetConn.Close() } +// This helper parses the buffer and split it line bye line to be parsed upstream. func (u *udpServer) handlePacket( data []byte, addr net.Addr, diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index feac277f7716..2602424da960 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -69,12 +69,13 @@ func New( func buildTransportServer(config Config) (transport.Server, error) { // TODO: Add TCP/unix socket transport implementations - switch strings.ToLower(config.NetAddr.Transport) { - case "", "udp": - return transport.NewUDPServer(config.NetAddr.Endpoint) + trans, err := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) + switch trans { + case transport.UDP, transport.UDP4, transport.UDP6: + return transport.NewUDPServer(trans, config.NetAddr.Endpoint) } - return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) + return nil, fmt.Errorf("error building server with transport %q: %w", config.NetAddr.Transport, err) } // Start starts a UDP server that can process StatsD messages. @@ -96,7 +97,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { return err } go func() { - if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil { + if err := r.server.ListenAndServe(r.nextConsumer, r.reporter, transferChan); err != nil { if !errors.Is(err, net.ErrClosed) { host.ReportFatalError(err) } diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 2332740adb72..ae4e0e47efe6 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -6,8 +6,6 @@ package statsdreceiver import ( "context" "errors" - "net" - "strconv" "testing" "time" @@ -74,7 +72,7 @@ func Test_statsdreceiver_Start(t *testing.T) { }, nextConsumer: consumertest.NewNop(), }, - wantErr: errors.New("unsupported transport \"unknown\""), + wantErr: errors.New("error building server with transport \"unknown\": unsupported transport"), }, } for _, tt := range tests { @@ -82,7 +80,7 @@ func Test_statsdreceiver_Start(t *testing.T) { receiver, err := New(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer) require.NoError(t, err) err = receiver.Start(context.Background(), componenttest.NewNopHost()) - assert.Equal(t, tt.wantErr, err) + assert.Equal(t, tt.wantErr.Error(), err.Error()) assert.NoError(t, receiver.Shutdown(context.Background())) }) @@ -113,19 +111,16 @@ func TestStatsdReceiver_Flush(t *testing.T) { } func Test_statsdreceiver_EndToEnd(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - host, portStr, err := net.SplitHostPort(addr) - require.NoError(t, err) - port, err := strconv.Atoi(portStr) - require.NoError(t, err) - tests := []struct { name string + addr string configFn func() *Config - clientFn func(t *testing.T) *client.StatsD + clientFn func(t *testing.T, addr string) *client.StatsD + testSkip bool }{ { name: "default_config with 4s interval", + addr: testutil.GetAvailableLocalNetworkAddress(t, "udp"), configFn: func() *Config { return &Config{ NetAddr: confignet.NetAddr{ @@ -135,17 +130,21 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { AggregationInterval: 4 * time.Second, } }, - clientFn: func(t *testing.T) *client.StatsD { - c, err := client.NewStatsD(client.UDP, host, port) + clientFn: func(t *testing.T, addr string) *client.StatsD { + c, err := client.NewStatsD("udp", addr) require.NoError(t, err) return c }, }, } for _, tt := range tests { + if tt.testSkip { + continue + } + t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() - cfg.NetAddr.Endpoint = addr + cfg.NetAddr.Endpoint = tt.addr sink := new(consumertest.MetricsSink) rcv, err := New(receivertest.NewNopCreateSettings(), *cfg, sink) require.NoError(t, err) @@ -159,7 +158,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { assert.NoError(t, r.Shutdown(context.Background())) }() - statsdClient := tt.clientFn(t) + statsdClient := tt.clientFn(t, tt.addr) statsdMetric := client.Metric{ Name: "test.metric", From 448cdb05f07b92de2aa0f40c61232c572bb37174 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 10:34:57 +0200 Subject: [PATCH 02/17] address christian comments --- .../internal/transport/server_test.go | 4 ++-- .../statsdreceiver/internal/transport/transport.go | 13 +++---------- .../statsdreceiver/internal/transport/udp_server.go | 13 ++++++++++--- receiver/statsdreceiver/receiver.go | 7 +++---- receiver/statsdreceiver/receiver_test.go | 9 ++------- 5 files changed, 20 insertions(+), 26 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 55b549e31a25..8734feac6858 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -96,8 +96,8 @@ func testFreeEndpoint(t *testing.T, transport string, address string) { var ln0, ln1 io.Closer var err0, err1 error - trans, err := NewTransport(transport) - require.NoError(t, err) + trans := NewTransport(transport) + require.NotEqual(t, trans, Transport("")) if trans.IsPacketTransport() { // Endpoint should be free. diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index 39eee2fa4e6d..849ec456d7ac 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -1,15 +1,8 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" -import "errors" - // Transport is a set of constants of the transport supported by this receiver. type Transport string -var ( - ErrUnsupportedTransport = errors.New("unsupported transport") - ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") -) - const ( UDP Transport = "udp" UDP4 Transport = "udp4" @@ -17,13 +10,13 @@ const ( ) // Create a Transport based on the transport string or return error if it is not supported. -func NewTransport(ts string) (Transport, error) { +func NewTransport(ts string) Transport { trans := Transport(ts) switch trans { case UDP, UDP4, UDP6: - return trans, nil + return trans } - return Transport(""), ErrUnsupportedTransport + return Transport("") } // Returns the string of this transport. diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 7245ac2aa564..0f2014e7ec72 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -19,8 +19,12 @@ type udpServer struct { transport Transport } -// Ensure that Server is implemented on UDP Server. -var _ (Server) = (*udpServer)(nil) +var ( + // Ensure that Server is implemented on UDP Server. + _ (Server) = (*udpServer)(nil) + + ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") +) // NewUDPServer creates a transport.Server using UDP as its transport. func NewUDPServer(transport Transport, address string) (Server, error) { @@ -59,8 +63,11 @@ func (u *udpServer) ListenAndServe( u.handlePacket(bufCopy, addr, transferChan) } if err != nil { + reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", + u.transport, + u.packetConn.LocalAddr(), + err) var netErr net.Error - reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", u.transport, u.packetConn.LocalAddr(), err) if errors.As(err, &netErr) { if netErr.Timeout() { continue diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 2602424da960..ffef4396623c 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -68,14 +68,13 @@ func New( } func buildTransportServer(config Config) (transport.Server, error) { + trans := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) // TODO: Add TCP/unix socket transport implementations - trans, err := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) - switch trans { - case transport.UDP, transport.UDP4, transport.UDP6: + if trans.IsPacketTransport() { return transport.NewUDPServer(trans, config.NetAddr.Endpoint) } - return nil, fmt.Errorf("error building server with transport %q: %w", config.NetAddr.Transport, err) + return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) } // Start starts a UDP server that can process StatsD messages. diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index ae4e0e47efe6..1c034bf5a858 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -72,7 +72,7 @@ func Test_statsdreceiver_Start(t *testing.T) { }, nextConsumer: consumertest.NewNop(), }, - wantErr: errors.New("error building server with transport \"unknown\": unsupported transport"), + wantErr: errors.New("unsupported transport \"unknown\""), }, } for _, tt := range tests { @@ -80,7 +80,7 @@ func Test_statsdreceiver_Start(t *testing.T) { receiver, err := New(receivertest.NewNopCreateSettings(), tt.args.config, tt.args.nextConsumer) require.NoError(t, err) err = receiver.Start(context.Background(), componenttest.NewNopHost()) - assert.Equal(t, tt.wantErr.Error(), err.Error()) + assert.Equal(t, tt.wantErr, err) assert.NoError(t, receiver.Shutdown(context.Background())) }) @@ -116,7 +116,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { addr string configFn func() *Config clientFn func(t *testing.T, addr string) *client.StatsD - testSkip bool }{ { name: "default_config with 4s interval", @@ -138,10 +137,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { }, } for _, tt := range tests { - if tt.testSkip { - continue - } - t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() cfg.NetAddr.Endpoint = tt.addr From 42833bbd2b62bd8085c14064d144a8e5037a959e Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 10:41:30 +0200 Subject: [PATCH 03/17] address marc comments --- receiver/statsdreceiver/internal/transport/transport.go | 6 +++--- receiver/statsdreceiver/internal/transport/udp_server.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index 849ec456d7ac..87593f397e34 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -9,7 +9,7 @@ const ( UDP6 Transport = "udp6" ) -// Create a Transport based on the transport string or return error if it is not supported. +// NewTransport creates a Transport based on the transport string or returns an empty Transport. func NewTransport(ts string) Transport { trans := Transport(ts) switch trans { @@ -19,7 +19,7 @@ func NewTransport(ts string) Transport { return Transport("") } -// Returns the string of this transport. +// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. func (trans Transport) String() string { switch trans { case UDP, UDP4, UDP6: @@ -28,7 +28,7 @@ func (trans Transport) String() string { return "" } -// Returns true if the transport us packet based. +// IsPacketTransport returns true if the transport is packet based. func (trans Transport) IsPacketTransport() bool { switch trans { case UDP, UDP4, UDP6: diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 0f2014e7ec72..72abca77ce9c 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -44,7 +44,7 @@ func NewUDPServer(transport Transport, address string) (Server, error) { return &usrv, nil } -// Start the server ready to recevie metrics. +// ListenAndServe starts the server ready to receive metrics. func (u *udpServer) ListenAndServe( nextConsumer consumer.Metrics, reporter Reporter, @@ -78,12 +78,12 @@ func (u *udpServer) ListenAndServe( } } -// Closes the server +// Close closes the server. func (u *udpServer) Close() error { return u.packetConn.Close() } -// This helper parses the buffer and split it line bye line to be parsed upstream. +// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. func (u *udpServer) handlePacket( data []byte, addr net.Addr, From d13cb5f7a1ada0793bc38178a7b61f7c3df36e31 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 10:53:49 +0200 Subject: [PATCH 04/17] address paolo comments --- .../internal/transport/client/client.go | 16 ++++++++-------- .../internal/transport/udp_server.go | 11 +++++------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 3e0121586e78..4ef4189a9f64 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -14,7 +14,7 @@ import ( type StatsD struct { transport string address string - Conn io.Writer + conn io.Writer } // NewStatsD creates a new StatsD instance to support the need for testing @@ -33,7 +33,7 @@ func NewStatsD(transport string, address string) (*StatsD, error) { return statsd, nil } -// connect populates the StatsD.Conn +// connect populates the StatsD.conn func (s *StatsD) connect() error { switch s.transport { case "udp": @@ -41,7 +41,7 @@ func (s *StatsD) connect() error { if err != nil { return err } - s.Conn, err = net.DialUDP(s.transport, nil, udpAddr) + s.conn, err = net.DialUDP(s.transport, nil, udpAddr) if err != nil { return err } @@ -52,20 +52,20 @@ func (s *StatsD) connect() error { return nil } -// Disconnect closes the StatsD.Conn. +// Disconnect closes the StatsD.conn. func (s *StatsD) Disconnect() error { var err error - if cl, ok := s.Conn.(io.Closer); ok { + if cl, ok := s.conn.(io.Closer); ok { err = cl.Close() } - s.Conn = nil + s.conn = nil return err } // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { - _, err := io.Copy(s.Conn, strings.NewReader(metric.String())) - return err + _, err := io.Copy(s.conn, strings.NewReader(metric.String())) + return fmt.Errorf("send metric on test client: %w", err) } // Metric contains the metric fields for a StatsD message. diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/udp_server.go index 72abca77ce9c..bc29d32dc997 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/udp_server.go @@ -28,20 +28,19 @@ var ( // NewUDPServer creates a transport.Server using UDP as its transport. func NewUDPServer(transport Transport, address string) (Server, error) { - var usrv udpServer - var err error - if !transport.IsPacketTransport() { return nil, ErrUnsupportedPacketTransport } - usrv.transport = transport - usrv.packetConn, err = net.ListenPacket(transport.String(), address) + conn, err := net.ListenPacket(transport.String(), address) if err != nil { return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) } - return &usrv, nil + return &udpServer{ + packetConn: conn, + transport: transport, + }, nil } // ListenAndServe starts the server ready to receive metrics. From 3af53095400905bb1ba6280f02ef5da1da8db57c Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 11:09:47 +0200 Subject: [PATCH 05/17] add changelog --- ...ceiver_statsdreceiver_add-uds-support.yaml | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100755 .chloggen/receiver_statsdreceiver_add-uds-support.yaml diff --git a/.chloggen/receiver_statsdreceiver_add-uds-support.yaml b/.chloggen/receiver_statsdreceiver_add-uds-support.yaml new file mode 100755 index 000000000000..ef1c79e17d57 --- /dev/null +++ b/.chloggen/receiver_statsdreceiver_add-uds-support.yaml @@ -0,0 +1,21 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "bug_fix" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/statsd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: refactor of hardcoded tests for UDP Transport + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: + - 21385 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 1bf88d01cc593021ba627b436a39def5da49da0e Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 11:13:01 +0200 Subject: [PATCH 06/17] remove leftovers --- receiver/statsdreceiver/config_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index eefcda88b485..8c4f17e09951 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -82,12 +82,11 @@ func TestValidate(t *testing.T) { } const ( - negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" - noObjectNameErr = "must specify object id for all TimerHistogramMappings" - statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" - observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" - invalidHistogramErr = "histogram configuration requires observer_type: histogram" - socketConfigurationWithUDSServerErr = "socket configuration is unsupported if the transport is not a unix domain socket" + negativeAggregationIntervalErr = "aggregation_interval must be a positive duration" + noObjectNameErr = "must specify object id for all TimerHistogramMappings" + statsdTypeNotSupportErr = "statsd_type is not a supported mapping for histogram and timing metrics: %s" + observerTypeNotSupportErr = "observer_type is not supported for histogram and timing metrics: %s" + invalidHistogramErr = "histogram configuration requires observer_type: histogram" ) tests := []test{ From f1848f767c5f267b610439b06fdd5fe36be364fb Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 12:37:01 +0200 Subject: [PATCH 07/17] return err only if it is an error --- receiver/statsdreceiver/internal/transport/client/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 4ef4189a9f64..3692b2406ac8 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -65,7 +65,10 @@ func (s *StatsD) Disconnect() error { // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { _, err := io.Copy(s.conn, strings.NewReader(metric.String())) - return fmt.Errorf("send metric on test client: %w", err) + if err != nil { + return fmt.Errorf("send metric on test client: %w", err) + } + return nil } // Metric contains the metric fields for a StatsD message. From 33b1172a96e61bc4ba57df2c0a80381c76bfb5e7 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:02:37 +0200 Subject: [PATCH 08/17] fix linter license --- receiver/statsdreceiver/internal/transport/transport.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index 87593f397e34..862e4d0666c5 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" // Transport is a set of constants of the transport supported by this receiver. From 013f14589d2f6fd79a78ce69c0d93f14147a79d7 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:30:55 +0200 Subject: [PATCH 09/17] remove more leftovers --- receiver/statsdreceiver/internal/transport/server_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 8734feac6858..6ca437d1ef3c 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -25,7 +25,6 @@ func Test_Server_ListenAndServe(t *testing.T) { buildServerFn func(transport Transport, addr string) (Server, error) getFreeEndpointFn func(t testing.TB, transport string) string buildClientFn func(transport string, address string) (*client.StatsD, error) - testSkip bool }{ { name: "udp", @@ -35,10 +34,6 @@ func Test_Server_ListenAndServe(t *testing.T) { }, } for _, tt := range tests { - if tt.testSkip { - continue - } - t.Run(tt.name, func(t *testing.T) { trans := Transport(tt.name) addr := tt.getFreeEndpointFn(t, tt.name) From e4f823821934d81cffc2fbfdb7edfa3bdd6e25df Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Mon, 21 Aug 2023 12:52:01 +0200 Subject: [PATCH 10/17] remove changelog --- ...ceiver_statsdreceiver_add-uds-support.yaml | 21 ------------------- 1 file changed, 21 deletions(-) delete mode 100755 .chloggen/receiver_statsdreceiver_add-uds-support.yaml diff --git a/.chloggen/receiver_statsdreceiver_add-uds-support.yaml b/.chloggen/receiver_statsdreceiver_add-uds-support.yaml deleted file mode 100755 index ef1c79e17d57..000000000000 --- a/.chloggen/receiver_statsdreceiver_add-uds-support.yaml +++ /dev/null @@ -1,21 +0,0 @@ -# Use this changelog template to create an entry for release notes. -# If your change doesn't affect end users, such as a test fix or a tooling change, -# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. - -# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: "bug_fix" - -# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: receiver/statsd - -# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: refactor of hardcoded tests for UDP Transport - -# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: - - 21385 - -# (Optional) One or more lines of additional information to render under the primary note. -# These lines will be padded with 2 spaces and then inserted directly into the document. -# Use pipe (|) for multiline entries. -subtext: From 01c8ed20439de6747f6af24f7e029c8e2ccbf3af Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Tue, 1 Aug 2023 15:25:20 +0200 Subject: [PATCH 11/17] change UDP server name to a generic one --- .../{udp_server.go => packet_server.go} | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) rename receiver/statsdreceiver/internal/transport/{udp_server.go => packet_server.go} (75%) diff --git a/receiver/statsdreceiver/internal/transport/udp_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go similarity index 75% rename from receiver/statsdreceiver/internal/transport/udp_server.go rename to receiver/statsdreceiver/internal/transport/packet_server.go index bc29d32dc997..b5cc9b729a55 100644 --- a/receiver/statsdreceiver/internal/transport/udp_server.go +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -14,37 +14,37 @@ import ( "go.opentelemetry.io/collector/consumer" ) -type udpServer struct { +type packetServer struct { packetConn net.PacketConn transport Transport } var ( // Ensure that Server is implemented on UDP Server. - _ (Server) = (*udpServer)(nil) + _ (Server) = (*packetServer)(nil) ErrUnsupportedPacketTransport = errors.New("unsupported Packet transport") ) -// NewUDPServer creates a transport.Server using UDP as its transport. -func NewUDPServer(transport Transport, address string) (Server, error) { +// NewPacketServer creates a transport.Server using transports based on packets. +func NewPacketServer(transport Transport, address string) (Server, error) { if !transport.IsPacketTransport() { return nil, ErrUnsupportedPacketTransport } conn, err := net.ListenPacket(transport.String(), address) if err != nil { - return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err) + return nil, fmt.Errorf("starting to listen %s: %w", transport.String(), err) } - return &udpServer{ + return &packetServer{ packetConn: conn, transport: transport, }, nil } // ListenAndServe starts the server ready to receive metrics. -func (u *udpServer) ListenAndServe( +func (psrv *packetServer) ListenAndServe( nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric, @@ -55,16 +55,16 @@ func (u *udpServer) ListenAndServe( buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) for { - n, addr, err := u.packetConn.ReadFrom(buf) + n, addr, err := psrv.packetConn.ReadFrom(buf) if n > 0 { bufCopy := make([]byte, n) copy(bufCopy, buf) - u.handlePacket(bufCopy, addr, transferChan) + psrv.handlePacket(bufCopy, addr, transferChan) } if err != nil { reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v", - u.transport, - u.packetConn.LocalAddr(), + psrv.transport, + psrv.packetConn.LocalAddr(), err) var netErr net.Error if errors.As(err, &netErr) { @@ -78,12 +78,12 @@ func (u *udpServer) ListenAndServe( } // Close closes the server. -func (u *udpServer) Close() error { +func (psrv *packetServer) Close() error { return u.packetConn.Close() } // handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. -func (u *udpServer) handlePacket( +func (psrv *packetServer) handlePacket( data []byte, addr net.Addr, transferChan chan<- Metric, From ed30a215b92f148ed16ea8d5f6326068c3cb2311 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Tue, 1 Aug 2023 16:17:29 +0200 Subject: [PATCH 12/17] Add UDS support to the receiver --- .../internal/testutil/temporary_socket.go | 13 +++++++ .../internal/transport/client/client.go | 9 +++++ .../internal/transport/packet_server.go | 35 ++++++++++++++++++- .../internal/transport/server_test.go | 15 +++++++- .../internal/transport/transport.go | 20 ++++++++--- receiver/statsdreceiver/receiver.go | 5 ++- receiver/statsdreceiver/receiver_test.go | 20 +++++++++++ 7 files changed, 107 insertions(+), 10 deletions(-) create mode 100644 receiver/statsdreceiver/internal/testutil/temporary_socket.go diff --git a/receiver/statsdreceiver/internal/testutil/temporary_socket.go b/receiver/statsdreceiver/internal/testutil/temporary_socket.go new file mode 100644 index 000000000000..c19152719ce1 --- /dev/null +++ b/receiver/statsdreceiver/internal/testutil/temporary_socket.go @@ -0,0 +1,13 @@ +package testutil + +import ( + "crypto/rand" + "fmt" + "testing" +) + +func CreateTemporarySocket(t testing.TB, _ string) string { + b := make([]byte, 10) + rand.Read(b) + return fmt.Sprintf("%s/%s", t.TempDir(), fmt.Sprintf("%x", b)) +} diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 3692b2406ac8..8b6e38ac44f2 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -45,6 +45,15 @@ func (s *StatsD) connect() error { if err != nil { return err } + case "unixgram": + unixAddr, err := net.ResolveUnixAddr(s.transport, s.address) + if err != nil { + return err + } + s.Conn, err = net.DialUnix(s.transport, nil, unixAddr) + if err != nil { + return err + } default: return fmt.Errorf("unknown/unsupported transport: %s", s.transport) } diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go index b5cc9b729a55..dbfec422080f 100644 --- a/receiver/statsdreceiver/internal/transport/packet_server.go +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -9,14 +9,17 @@ import ( "fmt" "io" "net" + "os" "strings" "go.opentelemetry.io/collector/consumer" + "go.uber.org/multierr" ) type packetServer struct { packetConn net.PacketConn transport Transport + address string } var ( @@ -32,12 +35,20 @@ func NewPacketServer(transport Transport, address string) (Server, error) { return nil, ErrUnsupportedPacketTransport } + if transport.IsUnixTransport() { + err := os.Remove(address) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("removing socket failed: %w", err) + } + } + conn, err := net.ListenPacket(transport.String(), address) if err != nil { return nil, fmt.Errorf("starting to listen %s: %w", transport.String(), err) } return &packetServer{ + address: address, packetConn: conn, transport: transport, }, nil @@ -56,6 +67,14 @@ func (psrv *packetServer) ListenAndServe( buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6) for { n, addr, err := psrv.packetConn.ReadFrom(buf) + + if _, ok := psrv.packetConn.(*net.UnixConn); ok && addr == nil { + addr = &net.UnixAddr{ + Net: "unixgram", + Name: "UDS", + } + } + if n > 0 { bufCopy := make([]byte, n) copy(bufCopy, buf) @@ -79,7 +98,21 @@ func (psrv *packetServer) ListenAndServe( // Close closes the server. func (psrv *packetServer) Close() error { - return u.packetConn.Close() + var errs error + + if psrv.transport.IsUnixTransport() { + err := os.Remove(psrv.address) + if err != nil && !errors.Is(err, os.ErrNotExist) { + errs = multierr.Append(errs, fmt.Errorf("removing socket failed: %w", err)) + } + } + + err := psrv.packetConn.Close() + if err != nil { + errs = multierr.Append(errs, err) + } + + return errs } // handlePacket is helper that parses the buffer and split it line by line to be parsed upstream. diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 6ca437d1ef3c..e28f28ce2678 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + statsdtestutil "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) @@ -29,9 +30,16 @@ func Test_Server_ListenAndServe(t *testing.T) { { name: "udp", getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, - buildServerFn: NewUDPServer, + buildServerFn: NewPacketServer, buildClientFn: client.NewStatsD, }, + { + name: "unixgram", + getFreeEndpointFn: statsdtestutil.CreateTemporarySocket, + buildServerFn: NewPacketServer, + buildClientFn: client.NewStatsD, + testSkip: runtime.GOOS != "linux", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -85,6 +93,7 @@ func Test_Server_ListenAndServe(t *testing.T) { } } +// Util to see that the port we are going to use for testing is deterministic and free for us. func testFreeEndpoint(t *testing.T, transport string, address string) { t.Helper() @@ -99,6 +108,10 @@ func testFreeEndpoint(t *testing.T, transport string, address string) { ln0, err0 = net.ListenPacket(transport, address) ln1, err1 = net.ListenPacket(transport, address) } + if trans.IsUnixTransport() { + // unix sockets rely on tempfiles so they are always free + return + } // Endpoint should be free. require.NoError(t, err0) diff --git a/receiver/statsdreceiver/internal/transport/transport.go b/receiver/statsdreceiver/internal/transport/transport.go index 862e4d0666c5..9addfaa92d5e 100644 --- a/receiver/statsdreceiver/internal/transport/transport.go +++ b/receiver/statsdreceiver/internal/transport/transport.go @@ -7,16 +7,17 @@ package transport // import "github.com/open-telemetry/opentelemetry-collector-c type Transport string const ( - UDP Transport = "udp" - UDP4 Transport = "udp4" - UDP6 Transport = "udp6" + UDP Transport = "udp" + UDP4 Transport = "udp4" + UDP6 Transport = "udp6" + UnixGram Transport = "unixgram" ) // NewTransport creates a Transport based on the transport string or returns an empty Transport. func NewTransport(ts string) Transport { trans := Transport(ts) switch trans { - case UDP, UDP4, UDP6: + case UDP, UDP4, UDP6, UnixGram: return trans } return Transport("") @@ -25,7 +26,7 @@ func NewTransport(ts string) Transport { // String casts the transport to a String if the Transport is supported. Return an empty Transport overwise. func (trans Transport) String() string { switch trans { - case UDP, UDP4, UDP6: + case UDP, UDP4, UDP6, UnixGram: return string(trans) } return "" @@ -39,3 +40,12 @@ func (trans Transport) IsPacketTransport() bool { } return false } + +// Returns true if the transport is unix socket based. +func (trans Transport) IsUnixTransport() bool { + switch trans { + case UnixGram: + return true + } + return false +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index ffef4396623c..073df3c98f46 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -68,12 +68,11 @@ func New( } func buildTransportServer(config Config) (transport.Server, error) { + // TODO: Add TCP transport implementation trans := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) - // TODO: Add TCP/unix socket transport implementations if trans.IsPacketTransport() { - return transport.NewUDPServer(trans, config.NetAddr.Endpoint) + return transport.NewPacketServer(trans, config.NetAddr.Endpoint) } - return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport) } diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 1c034bf5a858..c62627d26b1b 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -6,6 +6,7 @@ package statsdreceiver import ( "context" "errors" + "runtime" "testing" "time" @@ -20,6 +21,7 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + statsdtestutil "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) @@ -135,6 +137,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { return c }, }, + { + name: "default_config with UDS listener (unixgram)", + addr: statsdtestutil.CreateTemporarySocket(t, ""), + configFn: func() *Config { + return &Config{ + NetAddr: confignet.NetAddr{ + Transport: "unixgram", + }, + AggregationInterval: 4 * time.Second, + } + }, + clientFn: func(t *testing.T, addr string) *client.StatsD { + c, err := client.NewStatsD("unixgram", addr) + require.NoError(t, err) + return c + }, + testSkip: runtime.GOOS != "linux", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From c7a82c12a712b3807cfceb1c9da883ea3c707c0e Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:34:11 +0200 Subject: [PATCH 13/17] fix rebase shenanigans --- .../statsdreceiver/internal/transport/client/client.go | 2 +- receiver/statsdreceiver/internal/transport/server_test.go | 8 +++++++- receiver/statsdreceiver/receiver_test.go | 6 ++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 8b6e38ac44f2..fe4e009d2a2a 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -50,7 +50,7 @@ func (s *StatsD) connect() error { if err != nil { return err } - s.Conn, err = net.DialUnix(s.transport, nil, unixAddr) + s.conn, err = net.DialUnix(s.transport, nil, unixAddr) if err != nil { return err } diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index e28f28ce2678..3ea6ea47172a 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -26,6 +26,7 @@ func Test_Server_ListenAndServe(t *testing.T) { buildServerFn func(transport Transport, addr string) (Server, error) getFreeEndpointFn func(t testing.TB, transport string) string buildClientFn func(transport string, address string) (*client.StatsD, error) + testSkip bool }{ { name: "udp", @@ -38,10 +39,15 @@ func Test_Server_ListenAndServe(t *testing.T) { getFreeEndpointFn: statsdtestutil.CreateTemporarySocket, buildServerFn: NewPacketServer, buildClientFn: client.NewStatsD, - testSkip: runtime.GOOS != "linux", + // Tests on Mac/Windows give a "bind: invalid argument" error as unix sockets are not supported. + testSkip: runtime.GOOS != "linux", }, } for _, tt := range tests { + if tt.testSkip { + continue + } + t.Run(tt.name, func(t *testing.T) { trans := Transport(tt.name) addr := tt.getFreeEndpointFn(t, tt.name) diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index c62627d26b1b..ce301cc92514 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -118,6 +118,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { addr string configFn func() *Config clientFn func(t *testing.T, addr string) *client.StatsD + testSkip bool }{ { name: "default_config with 4s interval", @@ -153,10 +154,15 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { require.NoError(t, err) return c }, + // Tests on Mac/Windows give a "bind: invalid argument" error as unix sockets are not supported. testSkip: runtime.GOOS != "linux", }, } for _, tt := range tests { + if tt.testSkip { + continue + } + t.Run(tt.name, func(t *testing.T) { cfg := tt.configFn() cfg.NetAddr.Endpoint = tt.addr From bfa94439a5a81a458ea1caffe051f6ec718dd2af Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:35:51 +0200 Subject: [PATCH 14/17] add changelog --- ...statsdreceiver_add-uds-support-part-2.yaml | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100755 .chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml diff --git a/.chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml b/.chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml new file mode 100755 index 000000000000..72d57d2fc47e --- /dev/null +++ b/.chloggen/receiver_statsdreceiver_add-uds-support-part-2.yaml @@ -0,0 +1,21 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/statsd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add UDS support + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: + - 21385 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file From ed1819b6c33ad423cf28fc5957947ca7838baba0 Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 13:47:55 +0200 Subject: [PATCH 15/17] refactor client to test util package --- .../{transport/client => testutil}/client.go | 36 +++++++------------ .../internal/testutil/metric.go | 20 +++++++++++ .../internal/testutil/temporary_socket.go | 5 ++- .../internal/transport/server_test.go | 9 +++-- receiver/statsdreceiver/receiver_test.go | 13 ++++--- 5 files changed, 46 insertions(+), 37 deletions(-) rename receiver/statsdreceiver/internal/{transport/client => testutil}/client.go (56%) create mode 100644 receiver/statsdreceiver/internal/testutil/metric.go diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/testutil/client.go similarity index 56% rename from receiver/statsdreceiver/internal/transport/client/client.go rename to receiver/statsdreceiver/internal/testutil/client.go index fe4e009d2a2a..165fa9596e88 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/testutil/client.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package client // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/testutil" import ( "fmt" @@ -10,17 +10,17 @@ import ( "strings" ) -// StatsD defines the properties of a StatsD connection. -type StatsD struct { +// StatsDTestClient defines the properties of a StatsD connection. +type StatsDTestClient struct { transport string address string conn io.Writer } -// NewStatsD creates a new StatsD instance to support the need for testing +// NewStatsDTestClient creates a new StatsDTestClient instance to support the need for testing // the statsdreceiver package and is not intended/tested to be used in production. -func NewStatsD(transport string, address string) (*StatsD, error) { - statsd := &StatsD{ +func NewStatsDTestClient(transport string, address string) (*StatsDTestClient, error) { + statsd := &StatsDTestClient{ transport: transport, address: address, } @@ -33,8 +33,8 @@ func NewStatsD(transport string, address string) (*StatsD, error) { return statsd, nil } -// connect populates the StatsD.conn -func (s *StatsD) connect() error { +// connect populates the StatsDTestClient.conn +func (s *StatsDTestClient) connect() error { switch s.transport { case "udp": udpAddr, err := net.ResolveUDPAddr(s.transport, s.address) @@ -61,8 +61,8 @@ func (s *StatsD) connect() error { return nil } -// Disconnect closes the StatsD.conn. -func (s *StatsD) Disconnect() error { +// Disconnect closes the StatsDTestClient.conn. +func (s *StatsDTestClient) Disconnect() error { var err error if cl, ok := s.conn.(io.Closer); ok { err = cl.Close() @@ -71,23 +71,11 @@ func (s *StatsD) Disconnect() error { return err } -// SendMetric sends the input metric to the StatsD connection. -func (s *StatsD) SendMetric(metric Metric) error { +// SendMetric sends the input metric to the StatsDTestClient connection. +func (s *StatsDTestClient) SendMetric(metric Metric) error { _, err := io.Copy(s.conn, strings.NewReader(metric.String())) if err != nil { return fmt.Errorf("send metric on test client: %w", err) } return nil } - -// Metric contains the metric fields for a StatsD message. -type Metric struct { - Name string - Value string - Type string -} - -// String formats a Metric into a StatsD message. -func (m Metric) String() string { - return fmt.Sprintf("%s:%s|%s", m.Name, m.Value, m.Type) -} diff --git a/receiver/statsdreceiver/internal/testutil/metric.go b/receiver/statsdreceiver/internal/testutil/metric.go new file mode 100644 index 000000000000..4d648a42fe26 --- /dev/null +++ b/receiver/statsdreceiver/internal/testutil/metric.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/testutil" + +import ( + "fmt" +) + +// Metric contains the metric fields for a StatsDTestClient message. +type Metric struct { + Name string + Value string + Type string +} + +// String formats a Metric into a StatsDTestClient message. +func (m Metric) String() string { + return fmt.Sprintf("%s:%s|%s", m.Name, m.Value, m.Type) +} diff --git a/receiver/statsdreceiver/internal/testutil/temporary_socket.go b/receiver/statsdreceiver/internal/testutil/temporary_socket.go index c19152719ce1..e1e657e9db00 100644 --- a/receiver/statsdreceiver/internal/testutil/temporary_socket.go +++ b/receiver/statsdreceiver/internal/testutil/temporary_socket.go @@ -1,4 +1,7 @@ -package testutil +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/transport/testutil" import ( "crypto/rand" diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 3ea6ea47172a..2c175fb8d71f 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -17,7 +17,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" statsdtestutil "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) func Test_Server_ListenAndServe(t *testing.T) { @@ -25,20 +24,20 @@ func Test_Server_ListenAndServe(t *testing.T) { name string buildServerFn func(transport Transport, addr string) (Server, error) getFreeEndpointFn func(t testing.TB, transport string) string - buildClientFn func(transport string, address string) (*client.StatsD, error) + buildClientFn func(transport string, address string) (*statsdtestutil.StatsDTestClient, error) testSkip bool }{ { name: "udp", getFreeEndpointFn: testutil.GetAvailableLocalNetworkAddress, buildServerFn: NewPacketServer, - buildClientFn: client.NewStatsD, + buildClientFn: statsdtestutil.NewStatsDTestClient, }, { name: "unixgram", getFreeEndpointFn: statsdtestutil.CreateTemporarySocket, buildServerFn: NewPacketServer, - buildClientFn: client.NewStatsD, + buildClientFn: statsdtestutil.NewStatsDTestClient, // Tests on Mac/Windows give a "bind: invalid argument" error as unix sockets are not supported. testSkip: runtime.GOOS != "linux", }, @@ -74,7 +73,7 @@ func Test_Server_ListenAndServe(t *testing.T) { gc, err := tt.buildClientFn(tt.name, addr) require.NoError(t, err) require.NotNil(t, gc) - err = gc.SendMetric(client.Metric{ + err = gc.SendMetric(statsdtestutil.Metric{ Name: "test.metric", Value: "42", Type: "c", diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index ce301cc92514..3ae7a19dda5e 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -23,7 +23,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" statsdtestutil "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/client" ) func Test_statsdreceiver_New(t *testing.T) { @@ -117,7 +116,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { name string addr string configFn func() *Config - clientFn func(t *testing.T, addr string) *client.StatsD + clientFn func(t *testing.T, addr string) *statsdtestutil.StatsDTestClient testSkip bool }{ { @@ -132,8 +131,8 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { AggregationInterval: 4 * time.Second, } }, - clientFn: func(t *testing.T, addr string) *client.StatsD { - c, err := client.NewStatsD("udp", addr) + clientFn: func(t *testing.T, addr string) *statsdtestutil.StatsDTestClient { + c, err := statsdtestutil.NewStatsDTestClient("udp", addr) require.NoError(t, err) return c }, @@ -149,8 +148,8 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { AggregationInterval: 4 * time.Second, } }, - clientFn: func(t *testing.T, addr string) *client.StatsD { - c, err := client.NewStatsD("unixgram", addr) + clientFn: func(t *testing.T, addr string) *statsdtestutil.StatsDTestClient { + c, err := statsdtestutil.NewStatsDTestClient("unixgram", addr) require.NoError(t, err) return c }, @@ -181,7 +180,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { statsdClient := tt.clientFn(t, tt.addr) - statsdMetric := client.Metric{ + statsdMetric := statsdtestutil.Metric{ Name: "test.metric", Value: "42", Type: "c", From beb01bf08ef8ae061d98c376e6e243fc3051372c Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 15:18:56 +0200 Subject: [PATCH 16/17] address internal comments --- .../statsdreceiver/internal/transport/packet_server.go | 2 +- .../statsdreceiver/internal/transport/server_test.go | 10 +++++----- receiver/statsdreceiver/receiver.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/packet_server.go b/receiver/statsdreceiver/internal/transport/packet_server.go index dbfec422080f..78f709c3d6ed 100644 --- a/receiver/statsdreceiver/internal/transport/packet_server.go +++ b/receiver/statsdreceiver/internal/transport/packet_server.go @@ -68,7 +68,7 @@ func (psrv *packetServer) ListenAndServe( for { n, addr, err := psrv.packetConn.ReadFrom(buf) - if _, ok := psrv.packetConn.(*net.UnixConn); ok && addr == nil { + if psrv.transport.IsUnixTransport() && addr == nil { addr = &net.UnixAddr{ Net: "unixgram", Name: "UDS", diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index 2c175fb8d71f..1391e160bb4b 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -98,7 +98,7 @@ func Test_Server_ListenAndServe(t *testing.T) { } } -// Util to see that the port we are going to use for testing is deterministic and free for us. +// testFreeEndpoint is a helper to check if the port we are going to use for testing is deterministic and free for us. func testFreeEndpoint(t *testing.T, transport string, address string) { t.Helper() @@ -108,15 +108,15 @@ func testFreeEndpoint(t *testing.T, transport string, address string) { trans := NewTransport(transport) require.NotEqual(t, trans, Transport("")) + if trans.IsUnixTransport() { + // unix sockets rely on tempfiles so they are always free + return + } if trans.IsPacketTransport() { // Endpoint should be free. ln0, err0 = net.ListenPacket(transport, address) ln1, err1 = net.ListenPacket(transport, address) } - if trans.IsUnixTransport() { - // unix sockets rely on tempfiles so they are always free - return - } // Endpoint should be free. require.NoError(t, err0) diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 073df3c98f46..960b9628d254 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -68,8 +68,8 @@ func New( } func buildTransportServer(config Config) (transport.Server, error) { - // TODO: Add TCP transport implementation trans := transport.NewTransport(strings.ToLower(config.NetAddr.Transport)) + // TODO: Add TCP transport implementation if trans.IsPacketTransport() { return transport.NewPacketServer(trans, config.NetAddr.Endpoint) } From 116da7b20fd0cd5cd749fc55f7db7865ca2ba73d Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Thu, 3 Aug 2023 15:31:20 +0200 Subject: [PATCH 17/17] fix goporto imports --- receiver/statsdreceiver/internal/testutil/client.go | 2 +- receiver/statsdreceiver/internal/testutil/metric.go | 2 +- receiver/statsdreceiver/internal/testutil/temporary_socket.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/statsdreceiver/internal/testutil/client.go b/receiver/statsdreceiver/internal/testutil/client.go index 165fa9596e88..0c3ec6126966 100644 --- a/receiver/statsdreceiver/internal/testutil/client.go +++ b/receiver/statsdreceiver/internal/testutil/client.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/testutil" +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" import ( "fmt" diff --git a/receiver/statsdreceiver/internal/testutil/metric.go b/receiver/statsdreceiver/internal/testutil/metric.go index 4d648a42fe26..3ef3aff16829 100644 --- a/receiver/statsdreceiver/internal/testutil/metric.go +++ b/receiver/statsdreceiver/internal/testutil/metric.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport/testutil" +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" import ( "fmt" diff --git a/receiver/statsdreceiver/internal/testutil/temporary_socket.go b/receiver/statsdreceiver/internal/testutil/temporary_socket.go index e1e657e9db00..5844a5b89919 100644 --- a/receiver/statsdreceiver/internal/testutil/temporary_socket.go +++ b/receiver/statsdreceiver/internal/testutil/temporary_socket.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/transport/testutil" +package testutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/testutil" import ( "crypto/rand"