From d820d9baa3bb0860803a8215b96d58982dc53940 Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 17 Aug 2023 12:03:37 -0600 Subject: [PATCH] Span filtering (#93) * feat: span filtering * feat(go): span filtering * feat(rust): span filtering * chore: tests * chore: tests * chore: update rust examples * chore: rust examples * fix: set default min duration to 50us * fix: iota go * chore: PR feedback, fix deno test * fix: node test * fix: node test * fix: always include root span, reduce default min duration to 20us * chore: better var naming * chore: use time.Microsecond * 20 --- .github/workflows/ci.yml | 4 +- demo-iota/go/main.go | 37 +++++++++------- go/adapter.go | 31 ++++++++++--- go/adapter/datadog/adapter.go | 2 +- go/adapter/honeycomb/adapter.go | 2 +- go/adapter/lightstep/adapter.go | 2 +- go/adapter/otel_stdout/adapter.go | 2 +- go/adapter/stdout/adapter.go | 2 +- go/trace_ctx.go | 39 +++++++++-------- .../observe-sdk-datadog/test/deno/index.ts | 8 +++- .../observe-sdk-datadog/test/node/index.js | 8 +++- .../observe-sdk-datadog/test/web/index.js | 7 ++- .../observe-sdk-honeycomb/test/deno/index.ts | 7 ++- .../observe-sdk-honeycomb/test/node/index.js | 7 ++- .../observe-sdk-honeycomb/test/web/index.js | 7 ++- .../observe-sdk-lightstep/test/deno/index.ts | 8 +++- .../observe-sdk-lightstep/test/node/index.js | 7 ++- .../observe-sdk-lightstep/test/web/index.js | 7 ++- .../observe-sdk-stdout/test/deno/index.ts | 7 ++- .../observe-sdk-stdout/test/node/index.js | 7 ++- js/src/lib/adapters/datadog/mod.ts | 4 +- js/src/lib/adapters/honeycomb/mod.ts | 8 ++-- js/src/lib/adapters/lightstep/mod.ts | 6 +-- js/src/lib/adapters/stdout/mod.ts | 6 +-- js/src/lib/collectors/span/mod.ts | 14 +++++- js/src/lib/mod.ts | 12 +++++- rust/examples/basic.rs | 4 +- rust/examples/datadog.rs | 9 +++- rust/examples/honeycomb.rs | 9 +++- rust/examples/lightstep.rs | 9 +++- rust/examples/many.rs | 7 ++- rust/examples/otel-stdout.rs | 4 +- rust/examples/zipkin.rs | 4 +- rust/src/adapter/datadog.rs | 2 +- rust/src/adapter/honeycomb.rs | 2 +- rust/src/adapter/lightstep.rs | 2 +- rust/src/adapter/mod.rs | 30 ++++++++++++- rust/src/context.rs | 43 +++++++++++++------ 38 files changed, 271 insertions(+), 105 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 263a8e1..90af958 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: - name: Run Deno test run: | cd js/packages/observe-sdk-stdout - npm run build:esm + npm run build npm run test:deno > out.txt # test the expected content of the formatted output @@ -36,7 +36,7 @@ jobs: - name: Run Node test run: | cd js/packages/observe-sdk-stdout - npm run build:cjs + npm run build npm link pushd test/node npm link @dylibso/observe-sdk-stdout diff --git a/demo-iota/go/main.go b/demo-iota/go/main.go index be09b3b..cde4ad1 100644 --- a/demo-iota/go/main.go +++ b/demo-iota/go/main.go @@ -2,9 +2,9 @@ package main import ( "bytes" + "context" "fmt" "io" - "io/ioutil" "log" "net/http" "os" @@ -24,8 +24,6 @@ type server struct { } func main() { - // - // Adapter API config := datadog.DefaultDatadogConfig() config.ServiceName = "iota" config.AgentHost = "http://ddagent:8126" @@ -97,6 +95,7 @@ func upload(res http.ResponseWriter, req *http.Request) { } func (s *server) runModule(res http.ResponseWriter, req *http.Request) { + ctx := context.Background() if req.Method != http.MethodPost { res.WriteHeader(http.StatusMethodNotAllowed) return @@ -111,7 +110,7 @@ func (s *server) runModule(res http.ResponseWriter, req *http.Request) { } path := filepath.Join(os.TempDir(), name) - wasm, err := ioutil.ReadFile(path) + wasm, err := os.ReadFile(path) if err != nil { log.Println("name error: no module found", err) res.WriteHeader(http.StatusNotFound) @@ -119,32 +118,40 @@ func (s *server) runModule(res http.ResponseWriter, req *http.Request) { return } - // - // Collector API - collector := observe.NewCollector(nil) - ctx, r, err := collector.InitRuntime() + s.adapter.Start(ctx) + defer s.adapter.Stop(true) + + cfg := wazero.NewRuntimeConfig().WithCustomSections(true) + rt := wazero.NewRuntimeWithConfig(ctx, cfg) + traceCtx, err := s.adapter.NewTraceCtx(ctx, rt, wasm, nil) if err != nil { log.Panicln(err) } - defer r.Close(ctx) // This closes everything this Runtime created. - - wasi_snapshot_preview1.MustInstantiate(ctx, r) + wasi_snapshot_preview1.MustInstantiate(ctx, rt) output := &bytes.Buffer{} config := wazero.NewModuleConfig().WithStdin(req.Body).WithStdout(output) defer req.Body.Close() - s.adapter.Start(collector, wasm) - mod, err := r.InstantiateWithConfig(ctx, wasm, config) + mod, err := rt.InstantiateWithConfig(ctx, wasm, config) if err != nil { log.Println("module instance error:", err) res.WriteHeader(http.StatusInternalServerError) res.Write([]byte("Internal Service Error")) return } - s.adapter.Stop(collector) - log.Println("stopped collector, sent to datadog") defer mod.Close(ctx) + meta := datadog.DatadogMetadata{ + ResourceName: "iota-go", + HttpUrl: req.URL.String(), + HttpStatusCode: 200, + SpanKind: datadog.Server, + HttpClientIp: req.RemoteAddr, + } + traceCtx.Metadata(meta) + traceCtx.Finish() + log.Println("stopped collector, sent to datadog") + res.WriteHeader(http.StatusOK) res.Header().Add("content-type", "application/json") res.Write(output.Bytes()) diff --git a/go/adapter.go b/go/adapter.go index 04d06fb..ea6087c 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -34,11 +34,11 @@ type AdapterBase struct { flusher Flusher } -func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, config *Config) (*TraceCtx, error) { - if config == nil { - config = NewDefaultConfig() +func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, opts *Options) (*TraceCtx, error) { + if opts == nil { + opts = NewDefaultOptions() } - return newTraceCtx(ctx, a.TraceEvents, r, wasm, config) + return newTraceCtx(ctx, a.TraceEvents, r, wasm, opts) } func NewAdapterBase(batchSize int, flushPeriod time.Duration) AdapterBase { @@ -57,7 +57,7 @@ func (b *AdapterBase) HandleTraceEvent(te TraceEvent) { b.eventBucket.addEvent(te, b.flusher) } -func (b *AdapterBase) Start(a Adapter, ctx context.Context) { +func (b *AdapterBase) Start(ctx context.Context, a Adapter) { b.stop = make(chan bool) go func() { @@ -83,3 +83,24 @@ func (b *AdapterBase) Stop(wait bool) { b.eventBucket.Wait() } } + +// Definition of how to filter our Spans to reduce noise +type SpanFilter struct { + MinDuration time.Duration +} + +// Specify options to change what or how the adapter receives ObserveEvents +type Options struct { + SpanFilter *SpanFilter + ChannelBufferSize int +} + +// Create a default configuration +func NewDefaultOptions() *Options { + return &Options{ + ChannelBufferSize: 1024, + SpanFilter: &SpanFilter{ + MinDuration: time.Microsecond * 20, + }, + } +} diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 2af71ec..0f6f079 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -56,7 +56,7 @@ func NewDatadogAdapter(config *DatadogConfig) (*DatadogAdapter, error) { } func (d *DatadogAdapter) Start(ctx context.Context) { - d.AdapterBase.Start(d, ctx) + d.AdapterBase.Start(ctx, d) } func (d *DatadogAdapter) Stop(wait bool) { diff --git a/go/adapter/honeycomb/adapter.go b/go/adapter/honeycomb/adapter.go index 98aae93..d6e8892 100644 --- a/go/adapter/honeycomb/adapter.go +++ b/go/adapter/honeycomb/adapter.go @@ -41,7 +41,7 @@ func NewHoneycombAdapter(config *HoneycombConfig) *HoneycombAdapter { } func (h *HoneycombAdapter) Start(ctx context.Context) { - h.AdapterBase.Start(h, ctx) + h.AdapterBase.Start(ctx, h) } func (h *HoneycombAdapter) HandleTraceEvent(te observe.TraceEvent) { diff --git a/go/adapter/lightstep/adapter.go b/go/adapter/lightstep/adapter.go index a0cf7dc..fd37a88 100644 --- a/go/adapter/lightstep/adapter.go +++ b/go/adapter/lightstep/adapter.go @@ -41,7 +41,7 @@ func NewLightstepAdapter(config *LightstepConfig) *LightstepAdapter { } func (h *LightstepAdapter) Start(ctx context.Context) { - h.AdapterBase.Start(h, ctx) + h.AdapterBase.Start(ctx, h) } func (h *LightstepAdapter) HandleTraceEvent(te observe.TraceEvent) { diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index 1c9389c..8863489 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -86,5 +86,5 @@ func (o *OtelStdoutAdapter) makeCallSpans(event observe.CallEvent, parentId []by } func (o *OtelStdoutAdapter) Start(ctx context.Context) { - o.AdapterBase.Start(o, ctx) + o.AdapterBase.Start(ctx, o) } diff --git a/go/adapter/stdout/adapter.go b/go/adapter/stdout/adapter.go index 7b4ac55..2a5f1cb 100644 --- a/go/adapter/stdout/adapter.go +++ b/go/adapter/stdout/adapter.go @@ -59,5 +59,5 @@ func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) { } func (s *StdoutAdapter) Start(ctx context.Context) { - s.AdapterBase.Start(s, ctx) + s.AdapterBase.Start(ctx, s) } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index e304010..4318a18 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -10,18 +10,6 @@ import ( "github.com/tetratelabs/wazero/experimental" ) -// The configuration object for the observe SDK -type Config struct { - ChannelBufferSize int -} - -// Create a default configuration -func NewDefaultConfig() *Config { - return &Config{ - ChannelBufferSize: 1024, - } -} - // TraceCtx holds the context for a trace, or wasm module invocation. // It collects holds a channel to the Adapter and from the wazero Listener // It will collect events throughout the invocation of the function. Calling @@ -31,14 +19,14 @@ type TraceCtx struct { raw chan RawEvent events []Event stack []CallEvent - Config *Config + Options *Options names map[uint32]string telemetryId TelemetryId adapterMeta interface{} } // Creates a new TraceCtx. Used internally by the Adapter. The user should create the trace context from the Adapter. -func newTraceCtx(ctx context.Context, eventsChan chan TraceEvent, r wazero.Runtime, data []byte, config *Config) (*TraceCtx, error) { +func newTraceCtx(ctx context.Context, eventsChan chan TraceEvent, r wazero.Runtime, data []byte, opts *Options) (*TraceCtx, error) { names, err := parseNames(data) if err != nil { return nil, err @@ -46,10 +34,10 @@ func newTraceCtx(ctx context.Context, eventsChan chan TraceEvent, r wazero.Runti traceCtx := &TraceCtx{ adapter: eventsChan, - raw: make(chan RawEvent, config.ChannelBufferSize), + raw: make(chan RawEvent, opts.ChannelBufferSize), names: names, telemetryId: NewTraceId(), - Config: config, + Options: opts, } err = traceCtx.init(ctx, r) @@ -120,14 +108,27 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { fn.Stop(end) fn.Raw = append(fn.Raw, ev) - f, ok := t.popFunction() + // if there is no function left to pop, we are exiting the root function of the trace + f, ok := t.peekFunction() if !ok { t.events = append(t.events, fn) return } - f.within = append(f.within, fn) - t.pushFunction(f) + // if the function duration is less than minimum duration, disregard + funcDuration := fn.Duration.Microseconds() + minSpanDuration := t.Options.SpanFilter.MinDuration.Microseconds() + if funcDuration < minSpanDuration { + return + } + + // the function is within another function + f, ok = t.popFunction() + if ok { + f.within = append(f.within, fn) + t.pushFunction(f) + } + }).Export("instrument_exit") functions.WithFunc(func(ctx context.Context, amt int32) { diff --git a/js/packages/observe-sdk-datadog/test/deno/index.ts b/js/packages/observe-sdk-datadog/test/deno/index.ts index 011f85c..bca491c 100644 --- a/js/packages/observe-sdk-datadog/test/deno/index.ts +++ b/js/packages/observe-sdk-datadog/test/deno/index.ts @@ -4,7 +4,13 @@ import Context from "https://deno.land/std@0.192.0/wasi/snapshot_preview1.ts"; const adapter = new DatadogAdapter(); const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm"); -const traceContext = await adapter.start(bytes); + +const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } +}; +const traceContext = await adapter.start(bytes, opts); const module = new WebAssembly.Module(bytes); const runtime = new Context({ diff --git a/js/packages/observe-sdk-datadog/test/node/index.js b/js/packages/observe-sdk-datadog/test/node/index.js index 71d6fb9..1cf67e8 100644 --- a/js/packages/observe-sdk-datadog/test/node/index.js +++ b/js/packages/observe-sdk-datadog/test/node/index.js @@ -10,9 +10,15 @@ const wasi = new WASI({ }); const adapter = new DatadogAdapter(); +const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } +}; const bytes = fs.readFileSync("../../test-data/test.c.instr.wasm"); -adapter.start(bytes).then((traceContext) => { + +adapter.start(bytes, opts).then((traceContext) => { const module = new WebAssembly.Module(bytes); WebAssembly.instantiate(module, { diff --git a/js/packages/observe-sdk-datadog/test/web/index.js b/js/packages/observe-sdk-datadog/test/web/index.js index 9a1f52e..fb979ad 100644 --- a/js/packages/observe-sdk-datadog/test/web/index.js +++ b/js/packages/observe-sdk-datadog/test/web/index.js @@ -3,10 +3,15 @@ import { File, OpenFile, WASI } from "@bjorn3/browser_wasi_shim"; const f = async () => { const adapter = new DatadogAdapter(); + const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } + }; const resp = await fetch("count_vowels.instr.wasm"); const bytes = await resp.arrayBuffer(); - const traceContext = await adapter.start(bytes); + const traceContext = await adapter.start(bytes, opts); let fds = [ new OpenFile( diff --git a/js/packages/observe-sdk-honeycomb/test/deno/index.ts b/js/packages/observe-sdk-honeycomb/test/deno/index.ts index dc9dc9a..12cf9e3 100644 --- a/js/packages/observe-sdk-honeycomb/test/deno/index.ts +++ b/js/packages/observe-sdk-honeycomb/test/deno/index.ts @@ -13,9 +13,14 @@ const config: HoneycombConfig = { host: 'https://api.honeycomb.io', } const adapter = new HoneycombAdapter(config); +const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } +}; const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm"); -const traceContext = await adapter.start(bytes); +const traceContext = await adapter.start(bytes, opts); const module = new WebAssembly.Module(bytes); const runtime = new Context({ diff --git a/js/packages/observe-sdk-honeycomb/test/node/index.js b/js/packages/observe-sdk-honeycomb/test/node/index.js index e19ec52..ede2cde 100644 --- a/js/packages/observe-sdk-honeycomb/test/node/index.js +++ b/js/packages/observe-sdk-honeycomb/test/node/index.js @@ -18,9 +18,14 @@ const config = { host: 'https://api.honeycomb.io', } const adapter = new HoneycombAdapter(config); +const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } +}; const bytes = fs.readFileSync("../../test-data/test.c.instr.wasm"); -adapter.start(bytes).then((traceContext) => { +adapter.start(bytes, opts).then((traceContext) => { const module = new WebAssembly.Module(bytes); WebAssembly.instantiate(module, { diff --git a/js/packages/observe-sdk-honeycomb/test/web/index.js b/js/packages/observe-sdk-honeycomb/test/web/index.js index 88ab1d3..8d1be35 100644 --- a/js/packages/observe-sdk-honeycomb/test/web/index.js +++ b/js/packages/observe-sdk-honeycomb/test/web/index.js @@ -10,10 +10,15 @@ const f = async () => { host: 'https://api.honeycomb.io', } const adapter = new HoneycombAdapter(config); + const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } + }; const resp = await fetch("test.c.instr.wasm"); const bytes = await resp.arrayBuffer(); - const traceContext = await adapter.start(bytes); + const traceContext = await adapter.start(bytes, opts); let fds = [ new OpenFile( diff --git a/js/packages/observe-sdk-lightstep/test/deno/index.ts b/js/packages/observe-sdk-lightstep/test/deno/index.ts index 46a1cfd..1d701dd 100644 --- a/js/packages/observe-sdk-lightstep/test/deno/index.ts +++ b/js/packages/observe-sdk-lightstep/test/deno/index.ts @@ -13,9 +13,13 @@ const config: LightstepConfig = { host: 'https://ingest.lightstep.com', } const adapter = new LightstepAdapter(config); - +const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } +}; const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm"); -const traceContext = await adapter.start(bytes); +const traceContext = await adapter.start(bytes, opts); const module = new WebAssembly.Module(bytes); const runtime = new Context({ diff --git a/js/packages/observe-sdk-lightstep/test/node/index.js b/js/packages/observe-sdk-lightstep/test/node/index.js index bac7867..e813679 100644 --- a/js/packages/observe-sdk-lightstep/test/node/index.js +++ b/js/packages/observe-sdk-lightstep/test/node/index.js @@ -18,9 +18,14 @@ const config = { host: 'https://ingest.lightstep.com', } const adapter = new LightstepAdapter(config); +const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } +}; const bytes = fs.readFileSync("../../test-data/test.c.instr.wasm"); -adapter.start(bytes).then((traceContext) => { +adapter.start(bytes, opts).then((traceContext) => { const module = new WebAssembly.Module(bytes); WebAssembly.instantiate(module, { diff --git a/js/packages/observe-sdk-lightstep/test/web/index.js b/js/packages/observe-sdk-lightstep/test/web/index.js index 4fa2cb8..d99bf48 100644 --- a/js/packages/observe-sdk-lightstep/test/web/index.js +++ b/js/packages/observe-sdk-lightstep/test/web/index.js @@ -10,10 +10,15 @@ const f = async () => { host: 'https://ingest.lightstep.com', } const adapter = new LightstepAdapter(config); + const opts = { + spanFilter: { + minDurationMicroseconds: 100, + } + }; const resp = await fetch("test.c.instr.wasm"); const bytes = await resp.arrayBuffer(); - const traceContext = await adapter.start(bytes); + const traceContext = await adapter.start(bytes, opts); let fds = [ new OpenFile( diff --git a/js/packages/observe-sdk-stdout/test/deno/index.ts b/js/packages/observe-sdk-stdout/test/deno/index.ts index 02a8ecb..2c7a11a 100644 --- a/js/packages/observe-sdk-stdout/test/deno/index.ts +++ b/js/packages/observe-sdk-stdout/test/deno/index.ts @@ -2,9 +2,12 @@ import { StdOutAdapter } from "../../dist/esm/index.js"; // TODO: test as Deno e import Context from "https://deno.land/std@0.192.0/wasi/snapshot_preview1.ts"; const adapter = new StdOutAdapter(); - const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm"); -const traceContext = await adapter.start(bytes); +const traceContext = await adapter.start(bytes, { + spanFilter: { + minDurationMicroseconds: 0, + } +}); const module = new WebAssembly.Module(bytes); const runtime = new Context({ diff --git a/js/packages/observe-sdk-stdout/test/node/index.js b/js/packages/observe-sdk-stdout/test/node/index.js index 8d2a6cf..839c429 100644 --- a/js/packages/observe-sdk-stdout/test/node/index.js +++ b/js/packages/observe-sdk-stdout/test/node/index.js @@ -10,9 +10,12 @@ const wasi = new WASI({ }); const adapter = new StdOutAdapter(); - const bytes = fs.readFileSync("../../test-data/test.c.instr.wasm"); -adapter.start(bytes).then((traceContext) => { +adapter.start(bytes, { + spanFilter: { + minDurationMicroseconds: 0, + } +}).then((traceContext) => { const module = new WebAssembly.Module(bytes); WebAssembly.instantiate(module, { diff --git a/js/src/lib/adapters/datadog/mod.ts b/js/src/lib/adapters/datadog/mod.ts index b1c5cda..6a284aa 100644 --- a/js/src/lib/adapters/datadog/mod.ts +++ b/js/src/lib/adapters/datadog/mod.ts @@ -6,6 +6,7 @@ import { FunctionCall, MemoryGrow, ObserveEvent, + Options, TelemetryId, WASM, } from "../../mod.ts"; @@ -111,8 +112,9 @@ export class DatadogAdapter extends Adapter { public async start( wasm: WASM, + opts?: Options, ): Promise { - const spanCollector = new SpanCollector(this); + const spanCollector = new SpanCollector(this, opts); await spanCollector.setNames(wasm); this.startTraceInterval(); diff --git a/js/src/lib/adapters/honeycomb/mod.ts b/js/src/lib/adapters/honeycomb/mod.ts index 063ea46..bb0d328 100644 --- a/js/src/lib/adapters/honeycomb/mod.ts +++ b/js/src/lib/adapters/honeycomb/mod.ts @@ -1,7 +1,7 @@ -import { Adapter, ObserveEvent, WASM } from "../../mod.ts"; +import { Adapter, ObserveEvent, Options, WASM } from "../../mod.ts"; import { SpanCollector } from "../../collectors/span/mod.ts"; import { traceFromEvents, Trace, TracesData } from "../../formatters/opentelemetry.ts"; -import { AdapterConfig } from "../../../lib/mod"; +import { AdapterConfig } from "../../../lib/mod.ts"; const defaultConfig: HoneycombConfig = { apiKey: '', @@ -29,9 +29,9 @@ export class HoneycombAdapter extends Adapter { } } - public async start(wasm: WASM): Promise { + public async start(wasm: WASM, opts?: Options): Promise { super.startTraceInterval(); - const collector = new SpanCollector(this); + const collector = new SpanCollector(this, opts); await collector.setNames(wasm); return collector; } diff --git a/js/src/lib/adapters/lightstep/mod.ts b/js/src/lib/adapters/lightstep/mod.ts index c9f49ad..314e0e1 100644 --- a/js/src/lib/adapters/lightstep/mod.ts +++ b/js/src/lib/adapters/lightstep/mod.ts @@ -1,4 +1,4 @@ -import { Adapter, ObserveEvent, WASM } from "../../mod.ts"; +import { Adapter, ObserveEvent, Options, WASM } from "../../mod.ts"; import { SpanCollector } from "../../collectors/span/mod.ts"; import { traceFromEvents, Trace, TracesData } from "../../formatters/opentelemetry.ts"; import { AdapterConfig } from "../../mod.ts"; @@ -29,9 +29,9 @@ export class LightstepAdapter extends Adapter { } } - public async start(wasm: WASM): Promise { + public async start(wasm: WASM, opts?: Options): Promise { super.startTraceInterval(); - const collector = new SpanCollector(this); + const collector = new SpanCollector(this, opts); await collector.setNames(wasm); return collector; } diff --git a/js/src/lib/adapters/stdout/mod.ts b/js/src/lib/adapters/stdout/mod.ts index 11e0a28..b67f5d7 100644 --- a/js/src/lib/adapters/stdout/mod.ts +++ b/js/src/lib/adapters/stdout/mod.ts @@ -1,9 +1,9 @@ -import { Adapter, FunctionCall, MemoryGrow, ObserveEvent, WASM } from "../../mod.ts"; +import { Adapter, FunctionCall, MemoryGrow, ObserveEvent, Options, WASM } from "../../mod.ts"; import { SpanCollector } from "../../collectors/span/mod.ts"; export class StdOutAdapter extends Adapter { - public async start(wasm: WASM): Promise { - const collector = new SpanCollector(this); + public async start(wasm: WASM, opts?: Options): Promise { + const collector = new SpanCollector(this, opts); await collector.setNames(wasm); return collector; } diff --git a/js/src/lib/collectors/span/mod.ts b/js/src/lib/collectors/span/mod.ts index 312d83c..a2b9088 100644 --- a/js/src/lib/collectors/span/mod.ts +++ b/js/src/lib/collectors/span/mod.ts @@ -2,6 +2,7 @@ import * as wasm from "./modsurfer-demangle/modsurfer_demangle_bg.wasm"; import { __wbg_set_wasm } from "./modsurfer-demangle/modsurfer_demangle_bg.js"; import { demangle } from "./modsurfer-demangle/modsurfer_demangle.js"; import { parseNameSection } from "../../parser/mod.ts"; +import { Microseconds } from "../../mod.ts"; import { Adapter, @@ -14,19 +15,19 @@ import { NamesMap, now, ObserveEvent, + Options, WASM } from "../../mod.ts"; // @ts-ignore - The esbuild wasm plugin provides a `default` function to initialize the wasm wasm.default().then((bytes) => __wbg_set_wasm(bytes)); - export class SpanCollector implements Collector { meta?: any; names: NamesMap; stack: Array; events: ObserveEvent[]; - constructor(private adapter: Adapter) { + constructor(private adapter: Adapter, private opts: Options = new Options()) { this.stack = []; this.events = []; this.names = new Map(); @@ -71,11 +72,20 @@ export class SpanCollector implements Collector { } fn.stop(end); + // if the stack length is 0, we are exiting the root function of the trace if (this.stack.length === 0) { this.events.push(fn); return; } + // if the function duration is less than minimum duration, disregard + const funcDuration = fn.duration() * 1e-3; + const minSpanDuration = this.opts.spanFilter.minDurationMicroseconds; + if (funcDuration < minSpanDuration) { + return; + } + + // the function is within another function const f = this.stack.pop()!; f.within.push(fn); this.stack.push(f); diff --git a/js/src/lib/mod.ts b/js/src/lib/mod.ts index d6f07d5..9fbc5c1 100644 --- a/js/src/lib/mod.ts +++ b/js/src/lib/mod.ts @@ -6,6 +6,7 @@ export const now = (): Nanoseconds => { export type WASM = Uint8Array | WebAssembly.Module; export type Nanoseconds = number; +export type Microseconds = number; export type ObserveEvent = FunctionCall | MemoryGrow | CustomEvent; export type MemoryGrowAmount = number; export type FunctionId = number; @@ -73,7 +74,7 @@ export abstract class Adapter { traceIntervalId: number | undefined | NodeJS.Timer = undefined; config: AdapterConfig; - abstract start(wasm: WASM): Promise; + abstract start(wasm: WASM, opts?: Options): Promise; abstract collect(events: Array, metadata: any): void; @@ -110,3 +111,12 @@ export const newSpanId = (): TelemetryId => { export const newTraceId = (): TelemetryId => { return newTelemetryId(); }; +export interface SpanFilter { + minDurationMicroseconds: Microseconds +} + +export class Options { + spanFilter: SpanFilter = { + minDurationMicroseconds: 20 + } +} diff --git a/rust/examples/basic.rs b/rust/examples/basic.rs index 8406216..e1ccb82 100644 --- a/rust/examples/basic.rs +++ b/rust/examples/basic.rs @@ -1,4 +1,4 @@ -use dylibso_observe_sdk::adapter::stdout::StdoutAdapter; +use dylibso_observe_sdk::adapter::{default_options, stdout::StdoutAdapter}; #[tokio::main] pub async fn main() -> anyhow::Result<()> { @@ -27,7 +27,7 @@ pub async fn main() -> anyhow::Result<()> { // Provide the observability functions to the `Linker` to be made available // to the instrumented guest code. These are safe to add and are a no-op // if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, default_options())?; let instance = linker.instantiate(&mut store, &module)?; diff --git a/rust/examples/datadog.rs b/rust/examples/datadog.rs index 5ac8887..39ba0a1 100644 --- a/rust/examples/datadog.rs +++ b/rust/examples/datadog.rs @@ -1,5 +1,5 @@ use dylibso_observe_sdk::adapter::datadog::{ - AdapterMetadata, DatadogAdapter, DatadogConfig, DatadogMetadata, + AdapterMetadata, DatadogAdapter, DatadogConfig, DatadogMetadata, Options, SpanFilter, }; /// You need the datadog agent running on localhost for this example to work @@ -28,10 +28,15 @@ pub async fn main() -> anyhow::Result<()> { let mut linker = wasmtime::Linker::new(&engine); wasmtime_wasi::add_to_linker(&mut linker, |wasi| wasi)?; + let options = Options { + span_filter: SpanFilter { + min_duration_microseconds: std::time::Duration::from_micros(100), + }, + }; // Provide the observability functions to the `Linker` to be made available // to the instrumented guest code. These are safe to add and are a no-op // if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, options)?; let instance = linker.instantiate(&mut store, &module)?; diff --git a/rust/examples/honeycomb.rs b/rust/examples/honeycomb.rs index e94b129..865e219 100644 --- a/rust/examples/honeycomb.rs +++ b/rust/examples/honeycomb.rs @@ -1,5 +1,5 @@ use dylibso_observe_sdk::adapter::honeycomb::{ - AdapterMetadata, Attribute, HoneycombAdapter, HoneycombConfig, Value, + AdapterMetadata, Attribute, HoneycombAdapter, HoneycombConfig, Options, SpanFilter, Value, }; #[tokio::main] @@ -31,10 +31,15 @@ pub async fn main() -> anyhow::Result<()> { let mut linker = wasmtime::Linker::new(&engine); wasmtime_wasi::add_to_linker(&mut linker, |wasi| wasi)?; + let options = Options { + span_filter: SpanFilter { + min_duration_microseconds: std::time::Duration::from_micros(20), + }, + }; // Provide the observability functions to the `Linker` to be made available // to the instrumented guest code. These are safe to add and are a no-op // if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, options)?; let instance = linker.instantiate(&mut store, &module)?; diff --git a/rust/examples/lightstep.rs b/rust/examples/lightstep.rs index 5a4b4d9..b6b7a51 100644 --- a/rust/examples/lightstep.rs +++ b/rust/examples/lightstep.rs @@ -1,5 +1,5 @@ use dylibso_observe_sdk::adapter::lightstep::{ - AdapterMetadata, Attribute, LightstepAdapter, LightstepConfig, Value, + AdapterMetadata, Attribute, LightstepAdapter, LightstepConfig, Options, SpanFilter, Value, }; #[tokio::main] @@ -31,10 +31,15 @@ pub async fn main() -> anyhow::Result<()> { let mut linker = wasmtime::Linker::new(&engine); wasmtime_wasi::add_to_linker(&mut linker, |wasi| wasi)?; + let options = Options { + span_filter: SpanFilter { + min_duration_microseconds: std::time::Duration::from_micros(0), + }, + }; // Provide the observability functions to the `Linker` to be made available // to the instrumented guest code. These are safe to add and are a no-op // if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, options)?; let instance = linker.instantiate(&mut store, &module)?; diff --git a/rust/examples/many.rs b/rust/examples/many.rs index 6e1a8bc..0b6be33 100644 --- a/rust/examples/many.rs +++ b/rust/examples/many.rs @@ -1,4 +1,7 @@ -use dylibso_observe_sdk::{adapter::otelstdout::OtelStdoutAdapter, new_trace_id}; +use dylibso_observe_sdk::{ + adapter::{default_options, otelstdout::OtelStdoutAdapter}, + new_trace_id, +}; use rand::{seq::SliceRandom, thread_rng}; #[tokio::main] @@ -33,7 +36,7 @@ pub async fn main() -> anyhow::Result<()> { // Provide the observability functions to the `Linker` to be made // available to the instrumented guest code. These are safe to add // and are a no-op if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, default_options())?; let instance = linker.instantiate(&mut store, &module)?; instances.push((trace_ctx, instance, store)); diff --git a/rust/examples/otel-stdout.rs b/rust/examples/otel-stdout.rs index c49de71..c37d93b 100644 --- a/rust/examples/otel-stdout.rs +++ b/rust/examples/otel-stdout.rs @@ -1,4 +1,4 @@ -use dylibso_observe_sdk::adapter::otelstdout::OtelStdoutAdapter; +use dylibso_observe_sdk::adapter::{default_options, otelstdout::OtelStdoutAdapter}; #[tokio::main] pub async fn main() -> anyhow::Result<()> { @@ -27,7 +27,7 @@ pub async fn main() -> anyhow::Result<()> { // Provide the observability functions to the `Linker` to be made available // to the instrumented guest code. These are safe to add and are a no-op // if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, default_options())?; let instance = linker.instantiate(&mut store, &module)?; diff --git a/rust/examples/zipkin.rs b/rust/examples/zipkin.rs index b967cab..6475659 100644 --- a/rust/examples/zipkin.rs +++ b/rust/examples/zipkin.rs @@ -1,4 +1,4 @@ -use dylibso_observe_sdk::adapter::zipkin::ZipkinAdapter; +use dylibso_observe_sdk::adapter::{default_options, zipkin::ZipkinAdapter}; #[tokio::main] pub async fn main() -> anyhow::Result<()> { @@ -27,7 +27,7 @@ pub async fn main() -> anyhow::Result<()> { // Provide the observability functions to the `Linker` to be made available // to the instrumented guest code. These are safe to add and are a no-op // if guest code is uninstrumented. - let trace_ctx = adapter.start(&mut linker, &data)?; + let trace_ctx = adapter.start(&mut linker, &data, default_options())?; let instance = linker.instantiate(&mut store, &module)?; diff --git a/rust/src/adapter/datadog.rs b/rust/src/adapter/datadog.rs index fb19ac9..627a9e8 100644 --- a/rust/src/adapter/datadog.rs +++ b/rust/src/adapter/datadog.rs @@ -20,7 +20,7 @@ use super::{ Adapter, AdapterHandle, }; -pub use super::AdapterMetadata; +pub use super::{AdapterMetadata, Options, SpanFilter}; #[derive(Debug, Clone)] pub enum DatadogTraceType { diff --git a/rust/src/adapter/honeycomb.rs b/rust/src/adapter/honeycomb.rs index 8bc072a..f9681dd 100644 --- a/rust/src/adapter/honeycomb.rs +++ b/rust/src/adapter/honeycomb.rs @@ -11,7 +11,7 @@ use super::{ pub use super::{ otel_formatter::{Attribute, Value}, - AdapterMetadata, + AdapterMetadata, Options, SpanFilter, }; /// Config options for HoneycombAdapter diff --git a/rust/src/adapter/lightstep.rs b/rust/src/adapter/lightstep.rs index 45bf2c3..c68e4a4 100644 --- a/rust/src/adapter/lightstep.rs +++ b/rust/src/adapter/lightstep.rs @@ -11,7 +11,7 @@ use super::{ pub use super::{ otel_formatter::{Attribute, Value}, - AdapterMetadata, + AdapterMetadata, Options, SpanFilter, }; /// Config options for LightstepAdapter diff --git a/rust/src/adapter/mod.rs b/rust/src/adapter/mod.rs index 18528be..861b6e0 100644 --- a/rust/src/adapter/mod.rs +++ b/rust/src/adapter/mod.rs @@ -89,8 +89,13 @@ pub struct AdapterHandle { } impl AdapterHandle { - pub fn start(&self, linker: &mut Linker, data: &[u8]) -> Result { - let (collector, collector_rx) = add_to_linker(linker, data)?; + pub fn start( + &self, + linker: &mut Linker, + data: &[u8], + options: Options, + ) -> Result { + let (collector, collector_rx) = add_to_linker(linker, data, options)?; Collector::start(collector_rx, self.clone()); Ok(TraceContext { collector }) } @@ -107,3 +112,24 @@ pub enum AdapterMetadata { Datadog(DatadogMetadata), OpenTelemetry(Vec), } + +/// SpanFilter allows for specification of how to filter out spans +#[derive(Clone)] +pub struct SpanFilter { + pub min_duration_microseconds: std::time::Duration, +} + +/// Options allow you to tune certain characteristics of your telemetry +#[derive(Clone)] +pub struct Options { + pub span_filter: SpanFilter, +} + +/// default_options is a convenience method for setting sane default options +pub fn default_options() -> Options { + Options { + span_filter: SpanFilter { + min_duration_microseconds: std::time::Duration::from_micros(20), + }, + } +} diff --git a/rust/src/context.rs b/rust/src/context.rs index 3de0e95..faffc2c 100644 --- a/rust/src/context.rs +++ b/rust/src/context.rs @@ -11,8 +11,8 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use wasmtime::{Caller, FuncType, Linker, Val, ValType}; use crate::{ - collector::CollectorHandle, wasm_instr::WasmInstrInfo, Allocation, Event, FunctionCall, Log, - Metric, MetricFormat, Tags, + adapter::Options, collector::CollectorHandle, wasm_instr::WasmInstrInfo, Allocation, Event, + FunctionCall, Log, Metric, MetricFormat, Tags, }; /// The InstrumentationContext holds the implementations @@ -32,10 +32,13 @@ use crate::{ pub struct InstrumentationContext { collector: CollectorHandle, stack: Vec, + options: Options, } impl InstrumentationContext { - fn new() -> ( + fn new( + options: Options, + ) -> ( Arc>, CollectorHandle, Receiver, @@ -47,6 +50,7 @@ impl InstrumentationContext { Arc::new(Mutex::new(InstrumentationContext { collector: events_tx.clone(), stack: Vec::new(), + options, })), events_tx, events_rx, @@ -79,17 +83,28 @@ impl InstrumentationContext { // } func.end = SystemTime::now(); - if let Some(mut f) = self.stack.pop() { - f.within.push(Event::Func(func.clone())); - self.stack.push(f); - } - - // only push the end of the final call onto the channel - // this will contain all the other calls within it + // if the stack is empty, we are exiting the root function of the trace if self.stack.is_empty() { if let Err(e) = self.collector.try_send(Event::Func(func)) { error!("error recording function exit: {}", e); }; + } else { + // if the function duration is less than minimum duration, disregard + let func_duration = func.end.duration_since(func.start)?.as_micros(); + let min_span_duration = self + .options + .span_filter + .min_duration_microseconds + .as_micros(); + if func_duration < min_span_duration { + return Ok(()); + } + + // the function is within another function + if let Some(mut f) = self.stack.pop() { + f.within.push(Event::Func(func.clone())); + self.stack.push(f); + } } return Ok(()); @@ -400,8 +415,12 @@ const MODULE_NAME: &str = "dylibso_observe"; type EventChannel = (Sender, Receiver); /// Link observability import functions required by instrumented wasm code -pub fn add_to_linker(linker: &mut Linker, data: &[u8]) -> Result { - let (ctx, events_tx, events_rx) = InstrumentationContext::new(); +pub fn add_to_linker( + linker: &mut Linker, + data: &[u8], + options: Options, +) -> Result { + let (ctx, events_tx, events_rx) = InstrumentationContext::new(options); // load the static wasm-instr info let wasm_instr_info = WasmInstrInfo::new(data)?;