From 3d3396bb74672855ce5ed952ff6011e1de8a1e48 Mon Sep 17 00:00:00 2001 From: Jared Harper Date: Thu, 2 May 2024 15:02:55 -0700 Subject: [PATCH] remove global reporter --- internal/exporter/exporter.go | 21 ++++++---- internal/exporter/exporter_test.go | 9 ++-- internal/reporter/oboe.go | 23 ++++++----- internal/reporter/reporter.go | 66 ++++-------------------------- internal/reporter/reporter_test.go | 39 ++++-------------- internal/reporter/test_reporter.go | 13 ------ swo/agent.go | 39 ++---------------- swo/agent_test.go | 12 +----- swo/log.go | 13 +++--- swo/log_test.go | 30 +++++++------- 10 files changed, 70 insertions(+), 195 deletions(-) diff --git a/internal/exporter/exporter.go b/internal/exporter/exporter.go index 5956a99f..f950ff8d 100644 --- a/internal/exporter/exporter.go +++ b/internal/exporter/exporter.go @@ -30,9 +30,10 @@ import ( ) type exporter struct { + r reporter.Reporter } -func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) { +func (e *exporter) exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) { evt := reporter.CreateEntryEvent(s.SpanContext(), s.StartTime(), s.Parent()) layer := fmt.Sprintf("%s:%s", strings.ToUpper(s.SpanKind().String()), s.Name()) evt.SetLayer(layer) @@ -68,7 +69,7 @@ func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) { evt.AddKVs(s.Attributes()) - if err := reporter.ReportEvent(evt); err != nil { + if err := e.r.ReportEvent(evt); err != nil { log.Warning("cannot send entry event", err) return } @@ -88,7 +89,7 @@ func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) { } } evt.AddKVs(otEvt.Attributes) - if err := reporter.ReportEvent(evt); err != nil { + if err := e.r.ReportEvent(evt); err != nil { log.Warningf("could not send %s event: %s", s.Name(), err) continue } @@ -96,7 +97,7 @@ func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) { evt = reporter.CreateExitEvent(s.SpanContext(), s.EndTime()) evt.AddKV(attribute.String(constants.Layer, layer)) - if err := reporter.ReportEvent(evt); err != nil { + if err := e.r.ReportEvent(evt); err != nil { log.Warning("cannot send exit event", err) return } @@ -104,17 +105,19 @@ func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) { } func (e *exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { - reporter.WaitForReady(ctx) + e.r.WaitForReady(ctx) for _, s := range spans { - exportSpan(ctx, s) + e.exportSpan(ctx, s) } return nil } func (e *exporter) Shutdown(ctx context.Context) error { - return reporter.Shutdown(ctx) + return e.r.Shutdown(ctx) } -func NewExporter() sdktrace.SpanExporter { - return &exporter{} +func NewExporter(r reporter.Reporter) sdktrace.SpanExporter { + return &exporter{ + r: r, + } } diff --git a/internal/exporter/exporter_test.go b/internal/exporter/exporter_test.go index e4093da1..84c24e87 100644 --- a/internal/exporter/exporter_test.go +++ b/internal/exporter/exporter_test.go @@ -34,8 +34,7 @@ import ( func TestExportSpan(t *testing.T) { r := &capturingReporter{} - defer reporter.SetGlobalReporter(r)() - tr, cb := testutils.TracerWithExporter(NewExporter()) + tr, cb := testutils.TracerWithExporter(NewExporter(r)) defer cb() ctx := context.Background() @@ -156,8 +155,7 @@ func TestExportSpan(t *testing.T) { func TestExportSpanBacktrace(t *testing.T) { r := &capturingReporter{} - defer reporter.SetGlobalReporter(r)() - tr, cb := testutils.TracerWithExporter(NewExporter()) + tr, cb := testutils.TracerWithExporter(NewExporter(r)) defer cb() ctx := context.Background() @@ -199,8 +197,7 @@ func getBsonFromEvent(t *testing.T, event reporter.Event) map[string]interface{} func TestExportSpanStatusCodes(t *testing.T) { r := &capturingReporter{} - defer reporter.SetGlobalReporter(r)() - tr, cb := testutils.TracerWithExporter(NewExporter()) + tr, cb := testutils.TracerWithExporter(NewExporter(r)) defer cb() permutations := []struct { diff --git a/internal/reporter/oboe.go b/internal/reporter/oboe.go index 0f0eddd0..24a8edef 100644 --- a/internal/reporter/oboe.go +++ b/internal/reporter/oboe.go @@ -174,15 +174,15 @@ func createInitMessage(tid trace.TraceID, r *resource.Resource) Event { return evt } -func sendInitMessage(r *resource.Resource) { - if Closed() { +func sendInitMessage(r Reporter, rsrc *resource.Resource) { + if r.Closed() { log.Info(errors.Wrap(ErrReporterIsClosed, "send init message")) return } tid := trace.TraceID{0} rand.Random(tid[:]) - evt := createInitMessage(tid, r) - if err := ReportStatus(evt); err != nil { + evt := createInitMessage(tid, rsrc) + if err := r.ReportStatus(evt); err != nil { log.Error("could not send init message", err) } } @@ -348,13 +348,14 @@ func (tm TriggerTraceMode) Requested() bool { } func oboeSampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision { - if usingTestReporter { - if r, ok := globalReporter.(*TestReporter); ok { - if !r.UseSettings { - return SampleDecision{r.ShouldTrace, 0, SAMPLE_SOURCE_NONE, true, ttEmpty, 0, 0, false} // trace tests - } - } - } + // TODO: ick! + //if usingTestReporter { + // if r, ok := globalReporter.(*TestReporter); ok { + // if !r.UseSettings { + // return SampleDecision{r.ShouldTrace, 0, SAMPLE_SOURCE_NONE, true, ttEmpty, 0, 0, false} // trace tests + // } + // } + //} var setting *oboeSettings var ok bool diff --git a/internal/reporter/reporter.go b/internal/reporter/reporter.go index 3e551dea..b17afdfb 100644 --- a/internal/reporter/reporter.go +++ b/internal/reporter/reporter.go @@ -64,9 +64,6 @@ const ( kvMaxCustomMetrics = "MaxCustomMetrics" ) -// currently used reporter -var globalReporter Reporter = &nullReporter{} - var ( periodicTasksDisabled = false // disable periodic tasks, for testing ) @@ -84,18 +81,18 @@ func (r *nullReporter) WaitForReady(context.Context) bool { return true } func (r *nullReporter) SetServiceKey(string) error { return nil } func (r *nullReporter) GetServiceName() string { return "" } -func Start(r *resource.Resource, registry interface{}) error { +func Start(rsrc *resource.Resource, registry interface{}) (Reporter, error) { log.SetLevelFromStr(config.DebugLevel()) if reg, ok := registry.(metrics.LegacyRegistry); !ok { - return fmt.Errorf("metrics registry must implement metrics.LegacyRegistry") + return nil, fmt.Errorf("metrics registry must implement metrics.LegacyRegistry") } else { - initReporter(r, reg) - sendInitMessage(r) - return nil + rptr := initReporter(rsrc, reg) + sendInitMessage(rptr, rsrc) + return rptr, nil } } -func initReporter(r *resource.Resource, registry metrics.LegacyRegistry) { +func initReporter(r *resource.Resource, registry metrics.LegacyRegistry) Reporter { var rt string if !config.GetEnabled() { log.Warning("SolarWinds Observability APM agent is disabled.") @@ -107,39 +104,10 @@ func initReporter(r *resource.Resource, registry metrics.LegacyRegistry) { if sn, ok := r.Set().Value(semconv.ServiceNameKey); ok { otelServiceName = strings.TrimSpace(sn.AsString()) } - setGlobalReporter(rt, otelServiceName, registry) -} - -func setGlobalReporter(reporterType string, otelServiceName string, registry metrics.LegacyRegistry) { - // Close the previous reporter - if globalReporter != nil { - globalReporter.ShutdownNow() - } - - switch strings.ToLower(reporterType) { - case "none": - globalReporter = newNullReporter() - default: - globalReporter = newGRPCReporter(otelServiceName, registry) + if rt == "none" { + return newNullReporter() } -} - -// WaitForReady waits until the reporter becomes ready or the context is canceled. -func WaitForReady(ctx context.Context) bool { - // globalReporter is not protected by a mutex as currently it's only modified - // from the init() function. - return globalReporter.WaitForReady(ctx) -} - -// Shutdown flushes the metrics and stops the reporter. It blocked until the reporter -// is shutdown or the context is canceled. -func Shutdown(ctx context.Context) error { - return globalReporter.Shutdown(ctx) -} - -// Closed indicates if the reporter has been shutdown -func Closed() bool { - return globalReporter.Closed() + return newGRPCReporter(otelServiceName, registry) } func ShouldTraceRequestWithURL(traced bool, url string, ttMode TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision { @@ -205,19 +173,3 @@ func argsToMap(capacity, ratePerSec, tRCap, tRRate, tSCap, tSRate float64, return args } - -func SetServiceKey(key string) error { - return globalReporter.SetServiceKey(key) -} - -func ReportStatus(e Event) error { - return globalReporter.ReportStatus(e) -} - -func ReportEvent(e Event) error { - return globalReporter.ReportEvent(e) -} - -func GetServiceName() string { - return globalReporter.GetServiceName() -} diff --git a/internal/reporter/reporter_test.go b/internal/reporter/reporter_test.go index b642a8e8..aad535e4 100644 --- a/internal/reporter/reporter_test.go +++ b/internal/reporter/reporter_test.go @@ -85,13 +85,8 @@ func TestGRPCReporter(t *testing.T) { setEnv("SW_APM_COLLECTOR", addr) setEnv("SW_APM_TRUSTEDPATH", testCertFile) config.Load() - oldReporter := globalReporter registry := metrics.NewLegacyRegistry() - setGlobalReporter("ssl", "", registry) - - require.IsType(t, &grpcReporter{}, globalReporter) - - r := globalReporter.(*grpcReporter) + r := newGRPCReporter("myservice", registry).(*grpcReporter) // Test WaitForReady // The reporter is not ready when there is no default setting. @@ -141,7 +136,6 @@ func TestGRPCReporter(t *testing.T) { // stop test reporter server.Stop() - globalReporter = oldReporter // assert data received require.Len(t, server.events, 1) @@ -177,13 +171,8 @@ func TestShutdownGRPCReporter(t *testing.T) { setEnv("SW_APM_COLLECTOR", addr) setEnv("SW_APM_TRUSTEDPATH", testCertFile) config.Load() - oldReporter := globalReporter registry := metrics.NewLegacyRegistry() - setGlobalReporter("ssl", "", registry) - - require.IsType(t, &grpcReporter{}, globalReporter) - - r := globalReporter.(*grpcReporter) + r := newGRPCReporter("myservice", registry).(*grpcReporter) r.ShutdownNow() require.Equal(t, true, r.Closed()) @@ -192,7 +181,6 @@ func TestShutdownGRPCReporter(t *testing.T) { // stop test reporter server.Stop() - globalReporter = oldReporter } func TestSetServiceKey(t *testing.T) { @@ -240,14 +228,11 @@ func TestInvalidKey(t *testing.T) { // set gRPC reporter config.Load() - oldReporter := globalReporter log.SetLevel(log.INFO) registry := metrics.NewLegacyRegistry() - setGlobalReporter("ssl", "", registry) - require.IsType(t, &grpcReporter{}, globalReporter) - r := globalReporter.(*grpcReporter) + r := newGRPCReporter("myservice", registry).(*grpcReporter) ev1 := CreateInfoEvent(validSpanContext, time.Now()) ev1.SetLayer("hello-from-invalid-key") require.NoError(t, r.ReportEvent(ev1)) @@ -261,7 +246,6 @@ func TestInvalidKey(t *testing.T) { // Tear down everything. server.Stop() - globalReporter = oldReporter setEnv("SW_APM_SERVICE_KEY", oldKey) patterns := []string{ @@ -457,8 +441,8 @@ func TestInitReporter(t *testing.T) { setEnv("SW_APM_ENABLED", "false") config.Load() registry := metrics.NewLegacyRegistry() - initReporter(resource.Empty(), registry) - require.IsType(t, &nullReporter{}, globalReporter) + r := initReporter(resource.Empty(), registry) + require.IsType(t, &nullReporter{}, r) // Test enable agent require.NoError(t, os.Unsetenv("SW_APM_ENABLED")) @@ -466,9 +450,9 @@ func TestInitReporter(t *testing.T) { config.Load() require.True(t, config.GetEnabled()) - initReporter(resource.NewWithAttributes("", semconv.ServiceName("my service name")), registry) - require.IsType(t, &grpcReporter{}, globalReporter) - require.Equal(t, "my service name", globalReporter.GetServiceName()) + r = initReporter(resource.NewWithAttributes("", semconv.ServiceName("my service name")), registry) + require.IsType(t, &grpcReporter{}, r) + require.Equal(t, "my service name", r.GetServiceName()) } func TestCollectMetricsNextInterval(t *testing.T) { @@ -501,14 +485,9 @@ func testProxy(t *testing.T, proxyUrl string) { server := StartTestGRPCServer(t, addr) time.Sleep(100 * time.Millisecond) - oldReporter := globalReporter - defer func() { globalReporter = oldReporter }() registry := metrics.NewLegacyRegistry() - setGlobalReporter("ssl", "", registry) - - require.IsType(t, &grpcReporter{}, globalReporter) - r := globalReporter.(*grpcReporter) + r := newGRPCReporter("myservice", registry).(*grpcReporter) // Test WaitForReady // The reporter is not ready when there is no default setting. diff --git a/internal/reporter/test_reporter.go b/internal/reporter/test_reporter.go index dc0ebed1..e281d181 100644 --- a/internal/reporter/test_reporter.go +++ b/internal/reporter/test_reporter.go @@ -52,14 +52,6 @@ func TestReporterSettingType(tp int) TestReporterOption { return func(r *TestReporter) { r.SettingType = tp } } -func SetGlobalReporter(r Reporter) func() { - old := globalReporter - globalReporter = r - return func() { - globalReporter = old - } -} - // SetTestReporter sets and returns a test Reporter that captures raw event bytes // for making assertions about using the graphtest package. func SetTestReporter(options ...TestReporterOption) *TestReporter { @@ -76,10 +68,6 @@ func SetTestReporter(options ...TestReporterOption) *TestReporter { r.wg.Add(1) go r.resultWriter() - if _, ok := oldReporter.(*nullReporter); ok { - oldReporter = globalReporter - } - globalReporter = r usingTestReporter = true // start with clean slate @@ -134,7 +122,6 @@ func (r *TestReporter) Close(numBufs int) { } usingTestReporter = false if _, ok := oldReporter.(*nullReporter); !ok { - globalReporter = oldReporter oldReporter = &nullReporter{} } } diff --git a/swo/agent.go b/swo/agent.go index d15ea18c..55d88d1e 100644 --- a/swo/agent.go +++ b/swo/agent.go @@ -42,35 +42,6 @@ var ( errInvalidLogLevel = errors.New("invalid log level") ) -// WaitForReady checks if the agent is ready. It returns true is the agent is ready, -// or false if it is not. -// -// A call to this method will block until the agent is ready or the context is -// canceled, or the agent is already closed. -// The agent is considered ready if there is a valid default setting for sampling. -func WaitForReady(ctx context.Context) bool { - if Closed() { - return false - } - return reporter.WaitForReady(ctx) -} - -// Shutdown flush the metrics and stops the agent. The call will block until the agent -// flushes and is successfully shutdown or the context is canceled. It returns nil -// for successful shutdown and or error when the context is canceled or the agent -// has already been closed before. -// -// This function should be called only once. -func Shutdown(ctx context.Context) error { - return reporter.Shutdown(ctx) -} - -// Closed denotes if the agent is closed (by either calling Shutdown explicitly -// or being triggered from some internal error). -func Closed() bool { - return reporter.Closed() -} - // SetLogLevel changes the logging level of the library // Valid logging levels: DEBUG, INFO, WARN, ERROR func SetLogLevel(level string) error { @@ -92,11 +63,6 @@ func SetLogOutput(w io.Writer) { log.SetOutput(w) } -// SetServiceKey sets the service key of the agent -func SetServiceKey(key string) error { - return reporter.SetServiceKey(key) -} - func createResource(resourceAttrs ...attribute.KeyValue) (*resource.Resource, error) { return resource.New(context.Background(), resource.WithContainer(), @@ -122,11 +88,12 @@ func Start(resourceAttrs ...attribute.KeyValue) (func(), error) { }, err } registry := metrics.NewLegacyRegistry() - if err = reporter.Start(resrc, registry); err != nil { + _reporter, err := reporter.Start(resrc, registry) + if err != nil { return func() {}, err } - exprtr := exporter.NewExporter() + exprtr := exporter.NewExporter(_reporter) smplr := sampler.NewSampler() config.Load() isAppoptics := strings.Contains(strings.ToLower(config.GetCollector()), "appoptics.com") diff --git a/swo/agent_test.go b/swo/agent_test.go index 370254e9..7b99fcc2 100644 --- a/swo/agent_test.go +++ b/swo/agent_test.go @@ -20,15 +20,13 @@ import ( "github.com/solarwinds/apm-go/internal/log" "github.com/solarwinds/apm-go/internal/testutils" "github.com/solarwinds/apm-go/internal/utils" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/trace" "os" "strings" "testing" - "time" - - "github.com/stretchr/testify/assert" ) func TestSetGetLogLevel(t *testing.T) { @@ -47,14 +45,6 @@ func TestSetGetLogLevel(t *testing.T) { require.NoError(t, SetLogLevel(oldLevel)) } -func TestShutdown(t *testing.T) { - require.NoError(t, Shutdown(context.Background())) - assert.True(t, Closed()) - ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) - defer cancel() - assert.False(t, WaitForReady(ctx)) -} - func TestSetLogOutput(t *testing.T) { oldLevel := GetLogLevel() _ = SetLogLevel("DEBUG") diff --git a/swo/log.go b/swo/log.go index f3720508..4dc2fb6c 100644 --- a/swo/log.go +++ b/swo/log.go @@ -17,7 +17,6 @@ package swo import ( "context" "fmt" - "github.com/solarwinds/apm-go/internal/reporter" "go.opentelemetry.io/otel/trace" ) @@ -32,11 +31,10 @@ type LoggableTraceContext struct { // Example: trace_id=d4261c67357f99f39958b14f99da7e6c span_id=1280450002ba77b3 trace_flags=01 resource.service.name=my-service func (l LoggableTraceContext) String() string { return fmt.Sprintf( - "trace_id=%s span_id=%s trace_flags=%s resource.service.name=%s", + "trace_id=%s span_id=%s trace_flags=%s", l.TraceID, l.SpanID, l.TraceFlags, - l.ServiceName, ) } @@ -53,9 +51,10 @@ func LoggableTrace(ctx context.Context) LoggableTraceContext { // LoggableTraceFromSpanContext returns a LoggableTraceContext from a given SpanContext and the configured service name func LoggableTraceFromSpanContext(ctx trace.SpanContext) LoggableTraceContext { return LoggableTraceContext{ - TraceID: ctx.TraceID(), - SpanID: ctx.SpanID(), - TraceFlags: ctx.TraceFlags(), - ServiceName: reporter.GetServiceName(), + TraceID: ctx.TraceID(), + SpanID: ctx.SpanID(), + TraceFlags: ctx.TraceFlags(), + // TODO FIXME + //ServiceName: reporter.GetServiceName(), } } diff --git a/swo/log_test.go b/swo/log_test.go index a6501c55..0c0631c5 100644 --- a/swo/log_test.go +++ b/swo/log_test.go @@ -29,10 +29,10 @@ func TestLoggableTraceIDFromContext(t *testing.T) { ctx := context.Background() lt := LoggableTrace(ctx) require.Equal(t, LoggableTraceContext{ - TraceID: trace.TraceID{}, - SpanID: trace.SpanID{}, - TraceFlags: 0, - ServiceName: "test-reporter-service", + TraceID: trace.TraceID{}, + SpanID: trace.SpanID{}, + TraceFlags: 0, + // ServiceName: "test-reporter-service", }, lt) sc := trace.NewSpanContext(trace.SpanContextConfig{ TraceID: trace.TraceID{0x22}, @@ -41,20 +41,20 @@ func TestLoggableTraceIDFromContext(t *testing.T) { }) require.False(t, lt.IsValid()) require.Equal(t, - "trace_id=00000000000000000000000000000000 span_id=0000000000000000 trace_flags=00 resource.service.name=test-reporter-service", + "trace_id=00000000000000000000000000000000 span_id=0000000000000000 trace_flags=00", lt.String()) ctx = trace.ContextWithSpanContext(ctx, sc) lt = LoggableTrace(ctx) require.Equal(t, LoggableTraceContext{ - TraceID: sc.TraceID(), - SpanID: sc.SpanID(), - TraceFlags: sc.TraceFlags(), - ServiceName: "test-reporter-service", + TraceID: sc.TraceID(), + SpanID: sc.SpanID(), + TraceFlags: sc.TraceFlags(), + //ServiceName: "test-reporter-service", }, lt) require.True(t, lt.IsValid()) require.Equal(t, - "trace_id=22000000000000000000000000000000 span_id=1100000000000000 trace_flags=01 resource.service.name=test-reporter-service", + "trace_id=22000000000000000000000000000000 span_id=1100000000000000 trace_flags=01", lt.String()) sc = trace.NewSpanContext(trace.SpanContextConfig{ @@ -65,13 +65,13 @@ func TestLoggableTraceIDFromContext(t *testing.T) { ctx = trace.ContextWithSpanContext(ctx, sc) lt = LoggableTrace(ctx) require.Equal(t, LoggableTraceContext{ - TraceID: sc.TraceID(), - SpanID: sc.SpanID(), - TraceFlags: sc.TraceFlags(), - ServiceName: "test-reporter-service", + TraceID: sc.TraceID(), + SpanID: sc.SpanID(), + TraceFlags: sc.TraceFlags(), + //ServiceName: "test-reporter-service", }, lt) require.True(t, lt.IsValid()) require.Equal(t, - "trace_id=33000000000000000000000000000000 span_id=aa00000000000000 trace_flags=01 resource.service.name=test-reporter-service", + "trace_id=33000000000000000000000000000000 span_id=aa00000000000000 trace_flags=01", lt.String()) }