Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix linter findings for revive:exported in plugins/inputs/n* #16205

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plugins/inputs/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
var sampleConfig string

type Nats struct {
Server string
ResponseTimeout config.Duration
Server string `toml:"server"`
ResponseTimeout config.Duration `toml:"response_timeout"`

client *http.Client
}
Expand Down
85 changes: 43 additions & 42 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,12 @@ import (
//go:embed sample.conf
var sampleConfig string

var once sync.Once

var (
once sync.Once
defaultMaxUndeliveredMessages = 1000
)

type empty struct{}
type semaphore chan empty

type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}

func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}

type natsConsumer struct {
type NatsConsumer struct {
QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"`
Expand Down Expand Up @@ -70,24 +55,32 @@ type natsConsumer struct {
cancel context.CancelFunc
}

func (*natsConsumer) SampleConfig() string {
return sampleConfig
type (
empty struct{}
semaphore chan empty
)

type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}

func (n *natsConsumer) SetParser(parser telegraf.Parser) {
n.parser = parser
func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}

func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
select {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
func (*NatsConsumer) SampleConfig() string {
return sampleConfig
}

// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
func (n *NatsConsumer) SetParser(parser telegraf.Parser) {
n.parser = parser
}

// Start the nats consumer. Caller must call *NatsConsumer.Stop() to clean up.
func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)

options := []nats.Option{
Expand Down Expand Up @@ -193,9 +186,27 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
return nil
}

func (n *NatsConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}

func (n *NatsConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.clean()
}

func (n *NatsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
select {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
}

// receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics.
func (n *natsConsumer) receiver(ctx context.Context) {
func (n *NatsConsumer) receiver(ctx context.Context) {
sem := make(semaphore, n.MaxUndeliveredMessages)

for {
Expand Down Expand Up @@ -237,7 +248,7 @@ func (n *natsConsumer) receiver(ctx context.Context) {
}
}

func (n *natsConsumer) clean() {
func (n *NatsConsumer) clean() {
for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
Expand All @@ -257,19 +268,9 @@ func (n *natsConsumer) clean() {
}
}

func (n *natsConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.clean()
}

func (n *natsConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}

func init() {
inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{
return &NatsConsumer{
Servers: []string{"nats://localhost:4222"},
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
Expand Down
28 changes: 12 additions & 16 deletions plugins/inputs/nats_consumer/nats_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestStartStop(t *testing.T) {
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()

plugin := &natsConsumer{
plugin := &NatsConsumer{
Servers: []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])},
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSendReceive(t *testing.T) {
}

// Setup the plugin
plugin := &natsConsumer{
plugin := &NatsConsumer{
Servers: []string{addr},
Subjects: subjects,
QueueGroup: "telegraf_consumers",
Expand All @@ -161,15 +161,15 @@ func TestSendReceive(t *testing.T) {
defer plugin.Stop()

// Send all messages to the topics (random order due to Golang map)
publisher := &sender{Addr: addr}
require.NoError(t, publisher.Connect())
defer publisher.Disconnect()
publisher := &sender{addr: addr}
require.NoError(t, publisher.connect())
defer publisher.disconnect()
for topic, msgs := range tt.msgs {
for _, msg := range msgs {
require.NoError(t, publisher.Send(topic, msg))
require.NoError(t, publisher.send(topic, msg))
}
}
publisher.Disconnect()
publisher.disconnect()

// Wait for the metrics to be collected
require.Eventually(t, func() bool {
Expand All @@ -185,16 +185,12 @@ func TestSendReceive(t *testing.T) {
}

type sender struct {
Addr string

Username string
Password string

addr string
conn *nats.Conn
}

func (s *sender) Connect() error {
conn, err := nats.Connect(s.Addr)
func (s *sender) connect() error {
conn, err := nats.Connect(s.addr)
if err != nil {
return err
}
Expand All @@ -203,14 +199,14 @@ func (s *sender) Connect() error {
return nil
}

func (s *sender) Disconnect() {
func (s *sender) disconnect() {
if s.conn != nil && !s.conn.IsClosed() {
_ = s.conn.Flush()
s.conn.Close()
}
s.conn = nil
}

func (s *sender) Send(topic, msg string) error {
func (s *sender) send(topic, msg string) error {
return s.conn.Publish(topic, []byte(msg))
}
12 changes: 6 additions & 6 deletions plugins/inputs/neoom_beaam/neoom_beaam.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func (n *NeoomBeaam) Start(telegraf.Accumulator) error {
return n.updateConfiguration()
}

func (n *NeoomBeaam) Stop() {
if n.client != nil {
n.client.CloseIdleConnections()
}
}

func (n *NeoomBeaam) Gather(acc telegraf.Accumulator) error {
// Refresh the config if requested
if n.RefreshConfig {
Expand All @@ -97,6 +91,12 @@ func (n *NeoomBeaam) Gather(acc telegraf.Accumulator) error {
return nil
}

func (n *NeoomBeaam) Stop() {
if n.client != nil {
n.client.CloseIdleConnections()
}
}

func (n *NeoomBeaam) updateConfiguration() error {
endpoint := n.Address + "/api/v1/site/configuration"
request, err := http.NewRequest("GET", endpoint, nil)
Expand Down
14 changes: 6 additions & 8 deletions plugins/inputs/neptune_apex/neptune_apex.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ var sampleConfig string
// Measurement is constant across all metrics.
const Measurement = "neptune_apex"

type NeptuneApex struct {
Servers []string `toml:"servers"`
ResponseTimeout config.Duration `toml:"response_timeout"`
httpClient *http.Client
}

type xmlReply struct {
SoftwareVersion string `xml:"software,attr"`
HardwareVersion string `xml:"hardware,attr"`
Expand Down Expand Up @@ -54,18 +60,10 @@ type outlet struct {
Xstatus *string `xml:"xstatus"`
}

// NeptuneApex implements telegraf.Input.
type NeptuneApex struct {
Servers []string
ResponseTimeout config.Duration
httpClient *http.Client
}

func (*NeptuneApex) SampleConfig() string {
return sampleConfig
}

// Gather implements telegraf.Input.Gather
func (n *NeptuneApex) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, server := range n.Servers {
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/neptune_apex/neptune_apex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestParseXML(t *testing.T) {
}{
{
name: "Good test",
xmlResponse: []byte(APEX2016),
xmlResponse: []byte(apex2016),
wantMetrics: []telegraf.Metric{
testutil.MustMetric(
Measurement,
Expand Down Expand Up @@ -532,7 +532,7 @@ func fakeHTTPClient(h http.Handler) (*http.Client, func()) {
}

// Sample configuration from a 2016 version Neptune Apex.
const APEX2016 = `<?xml version="1.0"?>
const apex2016 = `<?xml version="1.0"?>
<status software="5.04_7A18" hardware="1.0">
<hostname>apex</hostname>
<serial>AC5:12345</serial>
Expand Down
20 changes: 10 additions & 10 deletions plugins/inputs/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import (
//go:embed sample.conf
var sampleConfig string

type NetIOStats struct {
filter filter.Filter
ps system.PS
type Net struct {
Interfaces []string `toml:"interfaces"`
IgnoreProtocolStats bool `toml:"ignore_protocol_stats"`

skipChecks bool
IgnoreProtocolStats bool
Interfaces []string
filter filter.Filter
ps system.PS
skipChecks bool
}

func (*NetIOStats) SampleConfig() string {
func (*Net) SampleConfig() string {
return sampleConfig
}

func (n *NetIOStats) Init() error {
func (n *Net) Init() error {
if !n.IgnoreProtocolStats {
config.PrintOptionValueDeprecationNotice("inputs.net", "ignore_protocol_stats", "false",
telegraf.DeprecationInfo{
Expand All @@ -48,7 +48,7 @@ func (n *NetIOStats) Init() error {
return nil
}

func (n *NetIOStats) Gather(acc telegraf.Accumulator) error {
func (n *Net) Gather(acc telegraf.Accumulator) error {
netio, err := n.ps.NetIO()
if err != nil {
return fmt.Errorf("error getting net io info: %w", err)
Expand Down Expand Up @@ -153,6 +153,6 @@ func getInterfaceSpeed(ioName string) int64 {

func init() {
inputs.Add("net", func() telegraf.Input {
return &NetIOStats{ps: system.NewSystemPS()}
return &Net{ps: system.NewSystemPS()}
})
}
6 changes: 3 additions & 3 deletions plugins/inputs/net/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestNetIOStats(t *testing.T) {

t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys"))

plugin := &NetIOStats{ps: &mps, skipChecks: true}
plugin := &Net{ps: &mps, skipChecks: true}

var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestNetIOStatsSpeedUnsupported(t *testing.T) {

t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys"))

plugin := &NetIOStats{ps: &mps, skipChecks: true}
plugin := &Net{ps: &mps, skipChecks: true}

var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestNetIOStatsNoSpeedFile(t *testing.T) {

t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys"))

plugin := &NetIOStats{ps: &mps, skipChecks: true}
plugin := &Net{ps: &mps, skipChecks: true}

var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
Expand Down
Loading
Loading