Skip to content

Commit

Permalink
remove global reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
swi-jared committed May 2, 2024
1 parent 9afc27b commit 3d3396b
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 195 deletions.
21 changes: 12 additions & 9 deletions internal/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
)

type exporter struct {
r reporter.Reporter
}

func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) {
func (e *exporter) exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) {
evt := reporter.CreateEntryEvent(s.SpanContext(), s.StartTime(), s.Parent())
layer := fmt.Sprintf("%s:%s", strings.ToUpper(s.SpanKind().String()), s.Name())
evt.SetLayer(layer)
Expand Down Expand Up @@ -68,7 +69,7 @@ func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) {

evt.AddKVs(s.Attributes())

if err := reporter.ReportEvent(evt); err != nil {
if err := e.r.ReportEvent(evt); err != nil {
log.Warning("cannot send entry event", err)
return
}
Expand All @@ -88,33 +89,35 @@ func exportSpan(_ context.Context, s sdktrace.ReadOnlySpan) {
}
}
evt.AddKVs(otEvt.Attributes)
if err := reporter.ReportEvent(evt); err != nil {
if err := e.r.ReportEvent(evt); err != nil {
log.Warningf("could not send %s event: %s", s.Name(), err)
continue
}
}

evt = reporter.CreateExitEvent(s.SpanContext(), s.EndTime())
evt.AddKV(attribute.String(constants.Layer, layer))
if err := reporter.ReportEvent(evt); err != nil {
if err := e.r.ReportEvent(evt); err != nil {
log.Warning("cannot send exit event", err)
return
}

}

func (e *exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
reporter.WaitForReady(ctx)
e.r.WaitForReady(ctx)
for _, s := range spans {
exportSpan(ctx, s)
e.exportSpan(ctx, s)
}
return nil
}

func (e *exporter) Shutdown(ctx context.Context) error {
return reporter.Shutdown(ctx)
return e.r.Shutdown(ctx)
}

func NewExporter() sdktrace.SpanExporter {
return &exporter{}
func NewExporter(r reporter.Reporter) sdktrace.SpanExporter {
return &exporter{
r: r,
}
}
9 changes: 3 additions & 6 deletions internal/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (

func TestExportSpan(t *testing.T) {
r := &capturingReporter{}
defer reporter.SetGlobalReporter(r)()
tr, cb := testutils.TracerWithExporter(NewExporter())
tr, cb := testutils.TracerWithExporter(NewExporter(r))
defer cb()

ctx := context.Background()
Expand Down Expand Up @@ -156,8 +155,7 @@ func TestExportSpan(t *testing.T) {

func TestExportSpanBacktrace(t *testing.T) {
r := &capturingReporter{}
defer reporter.SetGlobalReporter(r)()
tr, cb := testutils.TracerWithExporter(NewExporter())
tr, cb := testutils.TracerWithExporter(NewExporter(r))
defer cb()

ctx := context.Background()
Expand Down Expand Up @@ -199,8 +197,7 @@ func getBsonFromEvent(t *testing.T, event reporter.Event) map[string]interface{}

func TestExportSpanStatusCodes(t *testing.T) {
r := &capturingReporter{}
defer reporter.SetGlobalReporter(r)()
tr, cb := testutils.TracerWithExporter(NewExporter())
tr, cb := testutils.TracerWithExporter(NewExporter(r))
defer cb()

permutations := []struct {
Expand Down
23 changes: 12 additions & 11 deletions internal/reporter/oboe.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ func createInitMessage(tid trace.TraceID, r *resource.Resource) Event {
return evt
}

func sendInitMessage(r *resource.Resource) {
if Closed() {
func sendInitMessage(r Reporter, rsrc *resource.Resource) {
if r.Closed() {
log.Info(errors.Wrap(ErrReporterIsClosed, "send init message"))
return
}
tid := trace.TraceID{0}
rand.Random(tid[:])
evt := createInitMessage(tid, r)
if err := ReportStatus(evt); err != nil {
evt := createInitMessage(tid, rsrc)
if err := r.ReportStatus(evt); err != nil {
log.Error("could not send init message", err)
}
}
Expand Down Expand Up @@ -348,13 +348,14 @@ func (tm TriggerTraceMode) Requested() bool {
}

func oboeSampleRequest(continued bool, url string, triggerTrace TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision {
if usingTestReporter {
if r, ok := globalReporter.(*TestReporter); ok {
if !r.UseSettings {
return SampleDecision{r.ShouldTrace, 0, SAMPLE_SOURCE_NONE, true, ttEmpty, 0, 0, false} // trace tests
}
}
}
// TODO: ick!
//if usingTestReporter {
// if r, ok := globalReporter.(*TestReporter); ok {
// if !r.UseSettings {
// return SampleDecision{r.ShouldTrace, 0, SAMPLE_SOURCE_NONE, true, ttEmpty, 0, 0, false} // trace tests
// }
// }
//}

var setting *oboeSettings
var ok bool
Expand Down
66 changes: 9 additions & 57 deletions internal/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ const (
kvMaxCustomMetrics = "MaxCustomMetrics"
)

// currently used reporter
var globalReporter Reporter = &nullReporter{}

var (
periodicTasksDisabled = false // disable periodic tasks, for testing
)
Expand All @@ -84,18 +81,18 @@ func (r *nullReporter) WaitForReady(context.Context) bool { return true }
func (r *nullReporter) SetServiceKey(string) error { return nil }
func (r *nullReporter) GetServiceName() string { return "" }

func Start(r *resource.Resource, registry interface{}) error {
func Start(rsrc *resource.Resource, registry interface{}) (Reporter, error) {
log.SetLevelFromStr(config.DebugLevel())
if reg, ok := registry.(metrics.LegacyRegistry); !ok {
return fmt.Errorf("metrics registry must implement metrics.LegacyRegistry")
return nil, fmt.Errorf("metrics registry must implement metrics.LegacyRegistry")
} else {
initReporter(r, reg)
sendInitMessage(r)
return nil
rptr := initReporter(rsrc, reg)
sendInitMessage(rptr, rsrc)
return rptr, nil
}
}

func initReporter(r *resource.Resource, registry metrics.LegacyRegistry) {
func initReporter(r *resource.Resource, registry metrics.LegacyRegistry) Reporter {
var rt string
if !config.GetEnabled() {
log.Warning("SolarWinds Observability APM agent is disabled.")
Expand All @@ -107,39 +104,10 @@ func initReporter(r *resource.Resource, registry metrics.LegacyRegistry) {
if sn, ok := r.Set().Value(semconv.ServiceNameKey); ok {
otelServiceName = strings.TrimSpace(sn.AsString())
}
setGlobalReporter(rt, otelServiceName, registry)
}

func setGlobalReporter(reporterType string, otelServiceName string, registry metrics.LegacyRegistry) {
// Close the previous reporter
if globalReporter != nil {
globalReporter.ShutdownNow()
}

switch strings.ToLower(reporterType) {
case "none":
globalReporter = newNullReporter()
default:
globalReporter = newGRPCReporter(otelServiceName, registry)
if rt == "none" {
return newNullReporter()
}
}

// WaitForReady waits until the reporter becomes ready or the context is canceled.
func WaitForReady(ctx context.Context) bool {
// globalReporter is not protected by a mutex as currently it's only modified
// from the init() function.
return globalReporter.WaitForReady(ctx)
}

// Shutdown flushes the metrics and stops the reporter. It blocked until the reporter
// is shutdown or the context is canceled.
func Shutdown(ctx context.Context) error {
return globalReporter.Shutdown(ctx)
}

// Closed indicates if the reporter has been shutdown
func Closed() bool {
return globalReporter.Closed()
return newGRPCReporter(otelServiceName, registry)
}

func ShouldTraceRequestWithURL(traced bool, url string, ttMode TriggerTraceMode, swState w3cfmt.SwTraceState) SampleDecision {
Expand Down Expand Up @@ -205,19 +173,3 @@ func argsToMap(capacity, ratePerSec, tRCap, tRRate, tSCap, tSRate float64,

return args
}

func SetServiceKey(key string) error {
return globalReporter.SetServiceKey(key)
}

func ReportStatus(e Event) error {
return globalReporter.ReportStatus(e)
}

func ReportEvent(e Event) error {
return globalReporter.ReportEvent(e)
}

func GetServiceName() string {
return globalReporter.GetServiceName()
}
39 changes: 9 additions & 30 deletions internal/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,8 @@ func TestGRPCReporter(t *testing.T) {
setEnv("SW_APM_COLLECTOR", addr)
setEnv("SW_APM_TRUSTEDPATH", testCertFile)
config.Load()
oldReporter := globalReporter
registry := metrics.NewLegacyRegistry()
setGlobalReporter("ssl", "", registry)

require.IsType(t, &grpcReporter{}, globalReporter)

r := globalReporter.(*grpcReporter)
r := newGRPCReporter("myservice", registry).(*grpcReporter)

// Test WaitForReady
// The reporter is not ready when there is no default setting.
Expand Down Expand Up @@ -141,7 +136,6 @@ func TestGRPCReporter(t *testing.T) {

// stop test reporter
server.Stop()
globalReporter = oldReporter

// assert data received
require.Len(t, server.events, 1)
Expand Down Expand Up @@ -177,13 +171,8 @@ func TestShutdownGRPCReporter(t *testing.T) {
setEnv("SW_APM_COLLECTOR", addr)
setEnv("SW_APM_TRUSTEDPATH", testCertFile)
config.Load()
oldReporter := globalReporter
registry := metrics.NewLegacyRegistry()
setGlobalReporter("ssl", "", registry)

require.IsType(t, &grpcReporter{}, globalReporter)

r := globalReporter.(*grpcReporter)
r := newGRPCReporter("myservice", registry).(*grpcReporter)
r.ShutdownNow()

require.Equal(t, true, r.Closed())
Expand All @@ -192,7 +181,6 @@ func TestShutdownGRPCReporter(t *testing.T) {

// stop test reporter
server.Stop()
globalReporter = oldReporter
}

func TestSetServiceKey(t *testing.T) {
Expand Down Expand Up @@ -240,14 +228,11 @@ func TestInvalidKey(t *testing.T) {

// set gRPC reporter
config.Load()
oldReporter := globalReporter

log.SetLevel(log.INFO)
registry := metrics.NewLegacyRegistry()
setGlobalReporter("ssl", "", registry)
require.IsType(t, &grpcReporter{}, globalReporter)

r := globalReporter.(*grpcReporter)
r := newGRPCReporter("myservice", registry).(*grpcReporter)
ev1 := CreateInfoEvent(validSpanContext, time.Now())
ev1.SetLayer("hello-from-invalid-key")
require.NoError(t, r.ReportEvent(ev1))
Expand All @@ -261,7 +246,6 @@ func TestInvalidKey(t *testing.T) {

// Tear down everything.
server.Stop()
globalReporter = oldReporter
setEnv("SW_APM_SERVICE_KEY", oldKey)

patterns := []string{
Expand Down Expand Up @@ -457,18 +441,18 @@ func TestInitReporter(t *testing.T) {
setEnv("SW_APM_ENABLED", "false")
config.Load()
registry := metrics.NewLegacyRegistry()
initReporter(resource.Empty(), registry)
require.IsType(t, &nullReporter{}, globalReporter)
r := initReporter(resource.Empty(), registry)
require.IsType(t, &nullReporter{}, r)

// Test enable agent
require.NoError(t, os.Unsetenv("SW_APM_ENABLED"))
setEnv("SW_APM_REPORTER", "ssl")
config.Load()
require.True(t, config.GetEnabled())

initReporter(resource.NewWithAttributes("", semconv.ServiceName("my service name")), registry)
require.IsType(t, &grpcReporter{}, globalReporter)
require.Equal(t, "my service name", globalReporter.GetServiceName())
r = initReporter(resource.NewWithAttributes("", semconv.ServiceName("my service name")), registry)
require.IsType(t, &grpcReporter{}, r)
require.Equal(t, "my service name", r.GetServiceName())
}

func TestCollectMetricsNextInterval(t *testing.T) {
Expand Down Expand Up @@ -501,14 +485,9 @@ func testProxy(t *testing.T, proxyUrl string) {
server := StartTestGRPCServer(t, addr)
time.Sleep(100 * time.Millisecond)

oldReporter := globalReporter
defer func() { globalReporter = oldReporter }()
registry := metrics.NewLegacyRegistry()
setGlobalReporter("ssl", "", registry)

require.IsType(t, &grpcReporter{}, globalReporter)

r := globalReporter.(*grpcReporter)
r := newGRPCReporter("myservice", registry).(*grpcReporter)

// Test WaitForReady
// The reporter is not ready when there is no default setting.
Expand Down
13 changes: 0 additions & 13 deletions internal/reporter/test_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,6 @@ func TestReporterSettingType(tp int) TestReporterOption {
return func(r *TestReporter) { r.SettingType = tp }
}

func SetGlobalReporter(r Reporter) func() {
old := globalReporter
globalReporter = r
return func() {
globalReporter = old
}
}

// SetTestReporter sets and returns a test Reporter that captures raw event bytes
// for making assertions about using the graphtest package.
func SetTestReporter(options ...TestReporterOption) *TestReporter {
Expand All @@ -76,10 +68,6 @@ func SetTestReporter(options ...TestReporterOption) *TestReporter {
r.wg.Add(1)
go r.resultWriter()

if _, ok := oldReporter.(*nullReporter); ok {
oldReporter = globalReporter
}
globalReporter = r
usingTestReporter = true

// start with clean slate
Expand Down Expand Up @@ -134,7 +122,6 @@ func (r *TestReporter) Close(numBufs int) {
}
usingTestReporter = false
if _, ok := oldReporter.(*nullReporter); !ok {
globalReporter = oldReporter
oldReporter = &nullReporter{}
}
}
Expand Down
Loading

0 comments on commit 3d3396b

Please sign in to comment.