From 51be5d32a083da282f0e85202314736350bf8aff Mon Sep 17 00:00:00 2001 From: Juan Manuel Perez Date: Mon, 30 Oct 2023 11:35:00 +0100 Subject: [PATCH] fix rebase shenanigans --- receiver/statsdreceiver/internal/transport/client/client.go | 2 +- receiver/statsdreceiver/internal/transport/server_test.go | 5 ++--- receiver/statsdreceiver/internal/transport/tcp_server.go | 4 ++++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index fad9df45e0af..8b9fd7f06ba3 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -47,7 +47,7 @@ func (s *StatsD) connect() error { } case "tcp": var err error - s.Conn, err = net.Dial("tcp", s.address) + s.conn, err = net.Dial(s.transport, s.address) if err != nil { return err } diff --git a/receiver/statsdreceiver/internal/transport/server_test.go b/receiver/statsdreceiver/internal/transport/server_test.go index d59934cb630d..a774c0fd4154 100644 --- a/receiver/statsdreceiver/internal/transport/server_test.go +++ b/receiver/statsdreceiver/internal/transport/server_test.go @@ -45,11 +45,10 @@ func Test_Server_ListenAndServe(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - trans := Transport(tt.name) addr := tt.getFreeEndpointFn(t, tt.name) testFreeEndpoint(t, tt.name, addr) - srv, err := tt.buildServerFn(trans, addr) + srv, err := tt.buildServerFn(tt.transport, addr) require.NoError(t, err) require.NotNil(t, srv) @@ -67,7 +66,7 @@ func Test_Server_ListenAndServe(t *testing.T) { runtime.Gosched() - gc, err := tt.buildClientFn(tt.name, addr) + gc, err := tt.buildClientFn(tt.transport.String(), addr) require.NoError(t, err) require.NotNil(t, gc) err = gc.SendMetric(client.Metric{ diff --git a/receiver/statsdreceiver/internal/transport/tcp_server.go b/receiver/statsdreceiver/internal/transport/tcp_server.go index 513532b04d8f..f776bcd88f04 100644 --- a/receiver/statsdreceiver/internal/transport/tcp_server.go +++ b/receiver/statsdreceiver/internal/transport/tcp_server.go @@ -25,6 +25,7 @@ type tcpServer struct { stopChan chan struct{} } +// Ensure that Server is implemented on TCP Server. var _ Server = (*tcpServer)(nil) // NewTCPServer creates a transport.Server using TCP as its transport. @@ -46,6 +47,7 @@ func NewTCPServer(transport Transport, address string) (Server, error) { return &tsrv, nil } +// ListenAndServe starts the server ready to receive metrics. func (t *tcpServer) ListenAndServe(nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error { if nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters @@ -76,6 +78,7 @@ LOOP: return errTCPServerDone } +// handleConn is helper that parses the buffer and split it line by line to be parsed upstream. func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { payload := make([]byte, 4096) var remainder []byte @@ -103,6 +106,7 @@ func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) { } } +// Close closes the server. func (t *tcpServer) Close() error { close(t.stopChan) t.wg.Wait()