Skip to content

Commit

Permalink
feat: start hiding beats monitoring behind otel abstraction (#15360)
Browse files Browse the repository at this point in the history
* feat: start hiding beats monitoring behind otel abstraction

* lint: remove unused methods

* Remove MetricReader from RunnerParams

* Various fixes

- Only adapt go-docappender metrics when using the Elasticsearch output
- Increment processed events properly.
- Create object hierarchy rather than dotted metric names so _source
  remains backwards compatible. Revert system test changes.
- Report elasticsearch.indexers.active; use correct names for go-docappender
  indexer creation/destruction OTel metrics.

... and add a unit test for all of that

* Fix flaky test

---------

Co-authored-by: Andrew Wilkins <[email protected]>
(cherry picked from commit 3048b8f)
  • Loading branch information
kruskall authored and mergify[bot] committed Jan 24, 2025
1 parent c2d5774 commit f48e0df
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 134 deletions.
3 changes: 3 additions & 0 deletions cmd/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func main() {
return beater.NewRunner(beater.RunnerParams{
Config: args.Config,
Logger: args.Logger,

MeterProvider: args.MeterProvider,
MetricsGatherer: args.MetricsGatherer,
})
},
})
Expand Down
231 changes: 223 additions & 8 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
"time"

"github.com/gofrs/uuid/v5"
"go.elastic.co/apm/module/apmotel/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.uber.org/zap"
"go.uber.org/zap/exp/zapslog"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -76,6 +81,10 @@ type Beat struct {

rawConfig *config.C
newRunner NewRunnerFunc

metricReader *sdkmetric.ManualReader
meterProvider *sdkmetric.MeterProvider
metricGatherer *apmotel.Gatherer
}

// BeatParams holds parameters for NewBeat.
Expand Down Expand Up @@ -109,6 +118,18 @@ func NewBeat(args BeatParams) (*Beat, error) {
beatName = hostname
}

exporter, err := apmotel.NewGatherer()
if err != nil {
return nil, err
}

metricReader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(exporter),
sdkmetric.WithReader(metricReader),
)
otel.SetMeterProvider(meterProvider)

eid := uuid.FromStringOrNil(metricreport.EphemeralID().String())
b := &Beat{
Beat: beat.Beat{
Expand All @@ -127,9 +148,12 @@ func NewBeat(args BeatParams) (*Beat, error) {
BeatConfig: cfg.APMServer,
Registry: reload.NewRegistry(),
},
Config: cfg,
newRunner: args.NewRunner,
rawConfig: rawConfig,
Config: cfg,
newRunner: args.NewRunner,
rawConfig: rawConfig,
metricReader: metricReader,
meterProvider: meterProvider,
metricGatherer: &exporter,
}

if err := b.init(); err != nil {
Expand Down Expand Up @@ -374,7 +398,7 @@ func (b *Beat) Run(ctx context.Context) error {
}

if b.Manager.Enabled() {
reloader, err := NewReloader(b.Info, b.Registry, b.newRunner)
reloader, err := NewReloader(b.Info, b.Registry, b.newRunner, b.meterProvider, b.metricGatherer)
if err != nil {
return err
}
Expand All @@ -390,9 +414,11 @@ func (b *Beat) Run(ctx context.Context) error {
return errors.New("no output defined, please define one under the output section")
}
runner, err := b.newRunner(RunnerParams{
Config: b.rawConfig,
Info: b.Info,
Logger: logp.NewLogger(""),
Config: b.rawConfig,
Info: b.Info,
Logger: logp.NewLogger(""),
MeterProvider: b.meterProvider,
MetricsGatherer: b.metricGatherer,
})
if err != nil {
return err
Expand All @@ -410,7 +436,12 @@ func (b *Beat) Run(ctx context.Context) error {
// is then exposed through the HTTP monitoring endpoint (e.g. /info and /state)
// and/or pushed to Elasticsearch through the x-pack monitoring feature.
func (b *Beat) registerMetrics() {
// info
b.registerInfoMetrics()
b.registerStateMetrics()
b.registerStatsMetrics()
}

func (b *Beat) registerInfoMetrics() {
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
monitoring.NewString(infoRegistry, "version").Set(b.Info.Version)
monitoring.NewString(infoRegistry, "beat").Set(b.Info.Beat)
Expand All @@ -436,7 +467,9 @@ func (b *Beat) registerMetrics() {
monitoring.NewString(infoRegistry, "gid").Set(u.Gid)
}
}()
}

func (b *Beat) registerStateMetrics() {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()

// state.service
Expand All @@ -457,6 +490,188 @@ func (b *Beat) registerMetrics() {
monitoring.NewBool(managementRegistry, "enabled").Set(b.Manager.Enabled())
}

func (b *Beat) registerStatsMetrics() {
if b.Config.Output.Name() != "elasticsearch" {
return
}

libbeatRegistry := monitoring.Default.GetRegistry("libbeat")
monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
monitoring.ReportString(v, "type", "elasticsearch")
for _, sm := range rm.ScopeMetrics {
switch {
case sm.Scope.Name == "github.com/elastic/go-docappender":
addDocappenderLibbeatOutputMetrics(context.Background(), v, sm)
}
}
})
monitoring.NewFunc(libbeatRegistry, "pipeline", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
switch {
case sm.Scope.Name == "github.com/elastic/go-docappender":
addDocappenderLibbeatPipelineMetrics(context.Background(), v, sm)
}
}
})
monitoring.NewFunc(monitoring.Default, "output.elasticsearch", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
switch {
case sm.Scope.Name == "github.com/elastic/go-docappender":
addDocappenderOutputElasticsearchMetrics(context.Background(), v, sm)
}
}
})
}

// getScalarInt64 returns a single-value, dimensionless
// gauge or counter integer value, or (0, false) if the
// data does not match these constraints.
func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
switch data := data.(type) {
case metricdata.Sum[int64]:
if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 {
break
}
return data.DataPoints[0].Value, true
case metricdata.Gauge[int64]:
if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 {
break
}
return data.DataPoints[0].Value, true
}
return 0, false
}

// Adapt go-docappender's OTel metrics to beats stack monitoring metrics,
// with a mixture of libbeat-specific and apm-server specific metric names.
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
var writeBytes int64

v.OnRegistryStart()
v.OnKey("events")
for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.events.processed":
var acked, toomany, failed int64
data, _ := m.Data.(metricdata.Sum[int64])
for _, dp := range data.DataPoints {
status, ok := dp.Attributes.Value(attribute.Key("status"))
if !ok {
continue
}
switch status.AsString() {
case "Success":
acked += dp.Value
case "TooMany":
toomany += dp.Value
fallthrough
default:
failed += dp.Value
}
}
monitoring.ReportInt(v, "acked", acked)
monitoring.ReportInt(v, "failed", failed)
monitoring.ReportInt(v, "toomany", toomany)
case "elasticsearch.events.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "total", value)
}
case "elasticsearch.events.queued":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "active", value)
}
case "elasticsearch.flushed.bytes":
if value, ok := getScalarInt64(m.Data); ok {
writeBytes = value
}
case "elasticsearch.bulk_requests.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "batches", value)
}
}
}
v.OnRegistryFinished()

if writeBytes > 0 {
v.OnRegistryStart()
v.OnKey("write")
monitoring.ReportInt(v, "bytes", writeBytes)
v.OnRegistryFinished()
}
}

func addDocappenderLibbeatPipelineMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
v.OnRegistryStart()
defer v.OnRegistryFinished()
v.OnKey("events")

for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.events.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "total", value)
}
}
}
}

// Add non-libbeat Elasticsearch output metrics under "output.elasticsearch".
func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
var bulkRequestsCount, bulkRequestsAvailable int64
var indexersCreated, indexersDestroyed int64
for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.bulk_requests.count":
if value, ok := getScalarInt64(m.Data); ok {
bulkRequestsCount = value
}
case "elasticsearch.bulk_requests.available":
if value, ok := getScalarInt64(m.Data); ok {
bulkRequestsAvailable = value
}
case "elasticsearch.indexer.created":
if value, ok := getScalarInt64(m.Data); ok {
indexersCreated = value
}
case "elasticsearch.indexer.destroyed":
if value, ok := getScalarInt64(m.Data); ok {
indexersDestroyed = value
}
}
}

v.OnRegistryStart()
v.OnKey("bulk_requests")
monitoring.ReportInt(v, "completed", bulkRequestsCount)
monitoring.ReportInt(v, "available", bulkRequestsAvailable)
v.OnRegistryFinished()

v.OnRegistryStart()
v.OnKey("indexers")
monitoring.ReportInt(v, "created", indexersCreated)
monitoring.ReportInt(v, "destroyed", indexersDestroyed)
monitoring.ReportInt(v, "active", indexersCreated-indexersDestroyed+1)
v.OnRegistryFinished()
}

// registerElasticsearchVerfication registers a global callback to make sure
// the Elasticsearch instance we are connecting to has a valid license, and is
// at least on the same version as APM Server.
Expand Down
Loading

0 comments on commit f48e0df

Please sign in to comment.