Skip to content

Commit

Permalink
Showing 15 changed files with 737 additions and 2 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -31,7 +31,9 @@ Main (unreleased)

- Introduce the `remotecfg` service that enables loading configuration from a
remote endpoint. (@tpaschalis)


- Add `otelcol.connector.host_info` component to gather usage metrics for cloud users. (@rlankfo, @jcreixell)

### Enhancements

- Include line numbers in profiles produced by `pyrsocope.java` component. (@korniltsev)
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/auth/headers" // Import otelcol.auth.headers
_ "github.com/grafana/agent/component/otelcol/auth/oauth2" // Import otelcol.auth.oauth2
_ "github.com/grafana/agent/component/otelcol/auth/sigv4" // Import otelcol.auth.sigv4
_ "github.com/grafana/agent/component/otelcol/connector/host_info" // Import otelcol.connector.host_info
_ "github.com/grafana/agent/component/otelcol/connector/servicegraph" // Import otelcol.connector.servicegraph
_ "github.com/grafana/agent/component/otelcol/connector/spanlogs" // Import otelcol.connector.spanlogs
_ "github.com/grafana/agent/component/otelcol/connector/spanmetrics" // Import otelcol.connector.spanmetrics
31 changes: 31 additions & 0 deletions component/otelcol/connector/host_info/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package host_info

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
)

// Config defines the configuration options for the host_info connector.
type Config struct {
// HostIdentifiers defines the list of resource attributes used to derive
// a unique `grafana.host.id` value. In most cases, this should be [ "host.id" ]
HostIdentifiers []string `mapstructure:"host_identifiers"`
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`
}

var _ component.ConfigValidator = (*Config)(nil)

// Validate checks if the configuration is valid
func (c Config) Validate() error {
if len(c.HostIdentifiers) == 0 {
return fmt.Errorf("at least one host identifier is required")
}

if c.MetricsFlushInterval > 5*time.Minute || c.MetricsFlushInterval < 15*time.Second {
return fmt.Errorf("%q is not a valid flush interval", c.MetricsFlushInterval)
}

return nil
}
49 changes: 49 additions & 0 deletions component/otelcol/connector/host_info/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package host_info

import (
"testing"
"time"

"gotest.tools/assert"
)

func TestValidate(t *testing.T) {
tests := []struct {
name string
cfg *Config
err string
}{
{
name: "valid config",
cfg: &Config{
HostIdentifiers: []string{"host.id"},
MetricsFlushInterval: 1 * time.Minute,
},
},
{
name: "invalid host identifiers",
cfg: &Config{
HostIdentifiers: nil,
},
err: "at least one host identifier is required",
},
{
name: "invalid metrics flush interval",
cfg: &Config{
HostIdentifiers: []string{"host.id"},
MetricsFlushInterval: 1 * time.Second,
},
err: "\"1s\" is not a valid flush interval",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
if tc.err != "" {
assert.Error(t, err, tc.err)
} else {
assert.NilError(t, err)
}
})
}
}
119 changes: 119 additions & 0 deletions component/otelcol/connector/host_info/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package host_info

import (
"context"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

const (
hostInfoMetric = "traces_host_info"
hostIdentifierAttr = "grafana.host.id"
)

var _ connector.Traces = (*connectorImp)(nil)

type connectorImp struct {
config Config
logger *zap.Logger

started bool
done chan struct{}
shutdownOnce sync.Once

metricsConsumer consumer.Metrics
hostMetrics *hostMetrics
}

func newConnector(logger *zap.Logger, config component.Config) *connectorImp {
logger.Info("Building host_info connector")
cfg := config.(*Config)
return &connectorImp{
config: *cfg,
logger: logger,
done: make(chan struct{}),
hostMetrics: newHostMetrics(),
}
}

// Capabilities implements connector.Traces.
func (c *connectorImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// ConsumeTraces implements connector.Traces.
func (c *connectorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)

for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
attrs := resourceSpan.Resource().Attributes()
mapping := attrs.AsRaw()

for key, val := range mapping {
for _, attrName := range c.config.HostIdentifiers {
if key == attrName {
c.hostMetrics.add(val.(string))
break
}
}
}
}
}
return nil
}

// Start implements connector.Traces.
func (c *connectorImp) Start(ctx context.Context, host component.Host) error {
c.logger.Info("Starting host_info connector")
c.started = true
ticker := time.NewTicker(c.config.MetricsFlushInterval)
go func() {
for {
select {
case <-c.done:
ticker.Stop()
return
case <-ticker.C:
if err := c.flush(ctx); err != nil {
c.logger.Error("Error consuming metrics", zap.Error(err))
}
}
}
}()
return nil
}

// Shutdown implements connector.Traces.
func (c *connectorImp) Shutdown(ctx context.Context) error {
c.shutdownOnce.Do(func() {
c.logger.Info("Stopping host_info connector")
if c.started {
// flush metrics on shutdown
if err := c.flush(ctx); err != nil {
c.logger.Error("Error consuming metrics", zap.Error(err))
}
c.done <- struct{}{}
c.started = false
}
})
return nil
}

func (c *connectorImp) flush(ctx context.Context) error {
var err error

metrics, count := c.hostMetrics.metrics()
if count > 0 {
c.hostMetrics.reset()
c.logger.Debug("Flushing metrics", zap.Int("count", count))
err = c.metricsConsumer.ConsumeMetrics(ctx, *metrics)
}
return err
}
57 changes: 57 additions & 0 deletions component/otelcol/connector/host_info/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package host_info

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer/consumertest"
)

func TestNewConnector(t *testing.T) {
for _, tc := range []struct {
name string
hostIdentifiers []string
metricsFlushInterval *time.Duration
expectedConfig *Config
}{
{
name: "default config",
expectedConfig: createDefaultConfig().(*Config),
},
{
name: "other config",
hostIdentifiers: []string{"host.id", "host.name", "k8s.node.uid"},
metricsFlushInterval: durationPtr(15 * time.Second),
expectedConfig: &Config{
HostIdentifiers: []string{"host.id", "host.name", "k8s.node.uid"},
MetricsFlushInterval: 15 * time.Second,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
if tc.hostIdentifiers != nil {
cfg.HostIdentifiers = tc.hostIdentifiers
}
if tc.metricsFlushInterval != nil {
cfg.MetricsFlushInterval = *tc.metricsFlushInterval
}

c, err := factory.CreateTracesToMetrics(context.Background(), connectortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
imp := c.(*connectorImp)

assert.NoError(t, err)
assert.NotNil(t, imp)
assert.Equal(t, tc.expectedConfig.HostIdentifiers, imp.config.HostIdentifiers)
assert.Equal(t, tc.expectedConfig.MetricsFlushInterval, imp.config.MetricsFlushInterval)
})
}
}

func durationPtr(t time.Duration) *time.Duration {
return &t
}
35 changes: 35 additions & 0 deletions component/otelcol/connector/host_info/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package host_info

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
)

const (
typeStr = "hostinfoconnector"
)

func NewFactory() connector.Factory {
return connector.NewFactory(
typeStr,
createDefaultConfig,
connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha),
)
}

func createDefaultConfig() component.Config {
return &Config{
HostIdentifiers: []string{"host.id"},
MetricsFlushInterval: 60 * time.Second,
}
}

func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, next consumer.Metrics) (connector.Traces, error) {
c := newConnector(params.Logger, cfg)
c.metricsConsumer = next
return c, nil
}
21 changes: 21 additions & 0 deletions component/otelcol/connector/host_info/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package host_info

import (
"testing"
"time"

"go.opentelemetry.io/collector/component/componenttest"
"gotest.tools/assert"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

assert.DeepEqual(t, &Config{
HostIdentifiers: []string{"host.id"},
MetricsFlushInterval: 60 * time.Second,
}, cfg)

assert.NilError(t, componenttest.CheckConfigStruct(cfg))
}
Loading

0 comments on commit 7a8d6c5

Please sign in to comment.