Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(event_hubs): refactored to plugin/commons #16525

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions plugins/common/eh/eh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package eh

import (
"context"
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
)

var sampleConfig string

/*
** Wrapper interface for eventhub.Hub
*/

type EventHubInterface interface {
GetHub(s string) error
Close(ctx context.Context) error
SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error
}

type EventHub struct {
hub *eventhub.Hub
}

func (eh *EventHub) GetHub(s string) error {
hub, err := eventhub.NewHubFromConnectionString(s)

if err != nil {
return err
}

eh.hub = hub

return nil
}

func (eh *EventHub) Close(ctx context.Context) error {
return eh.hub.Close(ctx)
}

func (eh *EventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
return eh.hub.SendBatch(ctx, iterator, opts...)
}

/* End wrapper interface */

type EventHubs struct {
Log telegraf.Logger `toml:"-"`
ConnectionString string `toml:"connection_string"`
Timeout config.Duration `toml:"timeout"`
PartitionKey string `toml:"partition_key"`
MaxMessageSize int `toml:"max_message_size"`

Hub EventHubInterface
}

func (*EventHubs) SampleConfig() string {
return sampleConfig
}

func (e *EventHubs) Init() error {
err := e.Hub.GetHub(e.ConnectionString)

if err != nil {
return err
}

return nil
}

func (e *EventHubs) Connect() error {
return nil
}

func (e *EventHubs) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()

err := e.Hub.Close(ctx)

if err != nil {
return err
}

return nil
}
34 changes: 34 additions & 0 deletions plugins/common/eh/eh_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eh

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
)

/*
** Integration test (requires an Event Hubs instance)
*/

func TestInit(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

connstring := "Endpoint=sb://telegraf.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+"

// Configure the plugin to target the newly created hub
e := &EventHubs{
Hub: &EventHub{},
ConnectionString: connstring,
Timeout: config.Duration(time.Second * 5),
}

err := e.Init()
// Verify that we can connect to Event Hubs
require.NoError(t, err)
e.Close()
}
81 changes: 15 additions & 66 deletions plugins/outputs/event_hubs/event_hubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,100 +7,47 @@ import (
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
eh_commons "github.com/influxdata/telegraf/plugins/common/eh"

"github.com/influxdata/telegraf/plugins/outputs"
)

//go:embed sample.conf
var sampleConfig string

/*
** Wrapper interface for eventhub.Hub
*/

type EventHubInterface interface {
GetHub(s string) error
Close(ctx context.Context) error
SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error
}

type eventHub struct {
hub *eventhub.Hub
}

func (eh *eventHub) GetHub(s string) error {
hub, err := eventhub.NewHubFromConnectionString(s)

if err != nil {
return err
}

eh.hub = hub

return nil
}

func (eh *eventHub) Close(ctx context.Context) error {
return eh.hub.Close(ctx)
}

func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
return eh.hub.SendBatch(ctx, iterator, opts...)
}

/* End wrapper interface */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment can be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks for the review @DStrand1 . I will update the PR


type EventHubs struct {
Log telegraf.Logger `toml:"-"`
ConnectionString string `toml:"connection_string"`
Timeout config.Duration `toml:"timeout"`
PartitionKey string `toml:"partition_key"`
MaxMessageSize int `toml:"max_message_size"`

Hub EventHubInterface
batchOptions []eventhub.BatchOption
eh_commons.EventHubs
serializer telegraf.Serializer
batchOptions []eventhub.BatchOption
}

const (
defaultRequestTimeout = time.Second * 30
)

func (*EventHubs) SampleConfig() string {
//go:embed sample.conf
var sampleConfig string

func (e *EventHubs) SampleConfig() string {
return sampleConfig
}

func (e *EventHubs) Init() error {
err := e.Hub.GetHub(e.ConnectionString)

if err != nil {
return err
}

if e.MaxMessageSize > 0 {
e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize))
}

return nil
return e.EventHubs.Init()
}

func (*EventHubs) Connect() error {
func (e *EventHubs) Connect() error {
return nil
}

func (e *EventHubs) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()

err := e.Hub.Close(ctx)

if err != nil {
return err
}

return nil
return e.EventHubs.Close()
}

func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) {
Expand Down Expand Up @@ -146,8 +93,10 @@ func (e *EventHubs) Write(metrics []telegraf.Metric) error {
func init() {
outputs.Add("event_hubs", func() telegraf.Output {
return &EventHubs{
Hub: &eventHub{},
Timeout: config.Duration(defaultRequestTimeout),
EventHubs: eh_commons.EventHubs{
Hub: &eh_commons.EventHub{},
Timeout: config.Duration(defaultRequestTimeout),
},
}
})
}
30 changes: 18 additions & 12 deletions plugins/outputs/event_hubs/event_hubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/config"
eh_commons "github.com/influxdata/telegraf/plugins/common/eh"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -47,14 +48,16 @@ func TestInitAndWrite(t *testing.T) {
require.NoError(t, serializer.Init())

mockHub := &mockEventHub{}
e := &EventHubs{
Hub: mockHub,
ConnectionString: "mock",
Timeout: config.Duration(time.Second * 5),
MaxMessageSize: 1000000,
serializer: serializer,
}

e := EventHubs{
EventHubs: eh_commons.EventHubs{
Hub: mockHub,
ConnectionString: "mock",
Timeout: config.Duration(time.Second * 5),
MaxMessageSize: 1000000,
},
serializer: serializer,
}
mockHub.On("GetHub", mock.Anything).Return(nil).Once()
require.NoError(t, e.Init())
mockHub.AssertExpectations(t)
Expand Down Expand Up @@ -104,11 +107,14 @@ func TestInitAndWriteIntegration(t *testing.T) {
// Configure the plugin to target the newly created hub
serializer := &json.Serializer{}
require.NoError(t, serializer.Init())
e := &EventHubs{
Hub: &eventHub{},
ConnectionString: testHubCS,
Timeout: config.Duration(time.Second * 5),
serializer: serializer,
e := EventHubs{

EventHubs: eh_commons.EventHubs{
Hub: &eh_commons.EventHub{},
ConnectionString: testHubCS,
Timeout: config.Duration(time.Second * 5),
},
serializer: serializer,
}

// Verify that we can connect to Event Hubs
Expand Down
Loading