Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add statsdreceiver Unixgram Support #36608

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
27 changes: 27 additions & 0 deletions .chloggen/statsdreceiver-uds.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: statsdreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add UDS support to statsdreceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21385]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 9 additions & 0 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func (s *StatsD) connect() error {
if err != nil {
return err
}
case "unixgram":
michaelli321 marked this conversation as resolved.
Show resolved Hide resolved
unixAddr, err := net.ResolveUnixAddr(s.transport, s.address)
if err != nil {
return err
}
s.conn, err = net.DialUnix(s.transport, nil, unixAddr)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
}
Expand Down
84 changes: 84 additions & 0 deletions receiver/statsdreceiver/internal/transport/packet_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"errors"
"net"

"go.opentelemetry.io/collector/consumer"
)

type packetServer struct {
packetConn net.PacketConn
transport Transport
}

// ListenAndServe starts the server ready to receive metrics.
func (u *packetServer) ListenAndServe(
nextConsumer consumer.Metrics,
reporter Reporter,
transferChan chan<- Metric,
) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if addr == nil && u.transport == UDS {
addr = &udsAddr{
network: u.transport.String(),
address: u.packetConn.LocalAddr().String(),
}
}

if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
u.transport,
u.packetConn.LocalAddr(),
err)
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
continue
}
}
return err
}
}
}

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *packetServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}

type udsAddr struct {
network string
address string
}

func (u *udsAddr) Network() string {
return u.network
}

func (u *udsAddr) String() string {
return u.address
}
7 changes: 5 additions & 2 deletions receiver/statsdreceiver/internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
TCP Transport = "tcp"
TCP4 Transport = "tcp4"
TCP6 Transport = "tcp6"
UDS Transport = "unixgram"
)

// NewTransport creates a Transport based on the transport string or returns an empty Transport.
Expand All @@ -31,14 +32,16 @@ func NewTransport(ts string) Transport {
return trans
case TCP, TCP4, TCP6:
return trans
case UDS:
return trans
}
return Transport("")
}

// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise.
func (trans Transport) String() string {
switch trans {
case UDP, UDP4, UDP6, TCP, TCP4, TCP6:
case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS:
return string(trans)
}
return ""
Expand All @@ -47,7 +50,7 @@ func (trans Transport) String() string {
// IsPacketTransport returns true if the transport is packet based.
func (trans Transport) IsPacketTransport() bool {
switch trans {
case UDP, UDP4, UDP6:
case UDP, UDP4, UDP6, UDS:
return true
}
return false
Expand Down
60 changes: 5 additions & 55 deletions receiver/statsdreceiver/internal/transport/udp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"errors"
"fmt"
"net"

"go.opentelemetry.io/collector/consumer"
)

type udpServer struct {
packetConn net.PacketConn
transport Transport
packetServer
}

// Ensure that Server is implemented on UDP Server.
Expand All @@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) {
}

return &udpServer{
packetConn: conn,
transport: transport,
packetServer: packetServer{
packetConn: conn,
transport: transport,
},
}, nil
}

// ListenAndServe starts the server ready to receive metrics.
func (u *udpServer) ListenAndServe(
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
nextConsumer consumer.Metrics,
reporter Reporter,
transferChan chan<- Metric,
) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
u.transport,
u.packetConn.LocalAddr(),
err)
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
continue
}
}
return err
}
}
}

// Close closes the server.
func (u *udpServer) Close() error {
return u.packetConn.Close()
}

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *udpServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}
60 changes: 60 additions & 0 deletions receiver/statsdreceiver/internal/transport/uds_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"fmt"
"net"
"os"
)

type udsServer struct {
packetServer
}

// Ensure that Server is implemented on UDS Server.
var _ (Server) = (*udsServer)(nil)

// NewUDSServer creates a transport.Server using Unixgram as its transport.
func NewUDSServer(transport Transport, socketPath string) (Server, error) {
if !transport.IsPacketTransport() {
return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
}

if err := prepareSocket(socketPath); err != nil {
return nil, err
}

conn, err := net.ListenPacket(transport.String(), socketPath)
if err != nil {
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
}

return &udsServer{
packetServer: packetServer{
packetConn: conn,
transport: transport,
},
}, nil
}

// Close closes the server.
func (u *udsServer) Close() error {
os.Remove(u.packetConn.LocalAddr().String())
return u.packetConn.Close()
}

func prepareSocket(socketPath string) error {
if _, err := os.Stat(socketPath); err == nil {
// File exists, remove it
michaelli321 marked this conversation as resolved.
Show resolved Hide resolved
if err = os.Remove(socketPath); err != nil {
return fmt.Errorf("failed to remove existing socket file: %w", err)
}
} else if !os.IsNotExist(err) {
// Return any error that's not "file does not exist"
return fmt.Errorf("failed to stat socket file: %w", err)
}

return nil
}
12 changes: 9 additions & 3 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,21 @@ func newReceiver(
config Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))

if config.NetAddr.Endpoint == "" {
config.NetAddr.Endpoint = "localhost:8125"
if trans == transport.UDS {
config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock"
michaelli321 marked this conversation as resolved.
Show resolved Hide resolved
} else {
config.NetAddr.Endpoint = "localhost:8125"
}
}

rep, err := newReporter(set)
if err != nil {
return nil, err
}

trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
LongLivedCtx: true,
ReceiverID: set.ID,
Expand All @@ -80,13 +85,14 @@ func newReceiver(
}

func buildTransportServer(config Config) (transport.Server, error) {
// TODO: Add unix socket transport implementations
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
switch trans {
case transport.UDP, transport.UDP4, transport.UDP6:
return transport.NewUDPServer(trans, config.NetAddr.Endpoint)
case transport.TCP, transport.TCP4, transport.TCP6:
return transport.NewTCPServer(trans, config.NetAddr.Endpoint)
case transport.UDS:
return transport.NewUDSServer(trans, config.NetAddr.Endpoint)
}

return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))
Expand Down
18 changes: 18 additions & 0 deletions receiver/statsdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
return c
},
},
{
name: "UDS server with 4s interval",
addr: "/tmp/statsd_test.sock",
configFn: func() *Config {
return &Config{
NetAddr: confignet.AddrConfig{
Endpoint: "/tmp/statsd_test.sock",
Transport: confignet.TransportTypeUnixgram,
},
AggregationInterval: 4 * time.Second,
}
},
clientFn: func(t *testing.T, addr string) *client.StatsD {
c, err := client.NewStatsD("unixgram", addr)
require.NoError(t, err)
return c
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading