Skip to content

Commit

Permalink
Span filtering (#93)
Browse files Browse the repository at this point in the history
* feat: span filtering

* feat(go): span filtering

* feat(rust): span filtering

* chore: tests

* chore: tests

* chore: update rust examples

* chore: rust examples

* fix: set default min duration to 50us

* fix: iota go

* chore: PR feedback, fix deno test

* fix: node test

* fix: node test

* fix: always include root span, reduce default min duration to 20us

* chore: better var naming

* chore: use time.Microsecond * 20
  • Loading branch information
wikiwong authored Aug 17, 2023
1 parent 51ec1d9 commit d820d9b
Show file tree
Hide file tree
Showing 38 changed files with 271 additions and 105 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Run Deno test
run: |
cd js/packages/observe-sdk-stdout
npm run build:esm
npm run build
npm run test:deno > out.txt
# test the expected content of the formatted output
Expand All @@ -36,7 +36,7 @@ jobs:
- name: Run Node test
run: |
cd js/packages/observe-sdk-stdout
npm run build:cjs
npm run build
npm link
pushd test/node
npm link @dylibso/observe-sdk-stdout
Expand Down
37 changes: 22 additions & 15 deletions demo-iota/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package main

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
Expand All @@ -24,8 +24,6 @@ type server struct {
}

func main() {
//
// Adapter API
config := datadog.DefaultDatadogConfig()
config.ServiceName = "iota"
config.AgentHost = "http://ddagent:8126"
Expand Down Expand Up @@ -97,6 +95,7 @@ func upload(res http.ResponseWriter, req *http.Request) {
}

func (s *server) runModule(res http.ResponseWriter, req *http.Request) {
ctx := context.Background()
if req.Method != http.MethodPost {
res.WriteHeader(http.StatusMethodNotAllowed)
return
Expand All @@ -111,40 +110,48 @@ func (s *server) runModule(res http.ResponseWriter, req *http.Request) {
}

path := filepath.Join(os.TempDir(), name)
wasm, err := ioutil.ReadFile(path)
wasm, err := os.ReadFile(path)
if err != nil {
log.Println("name error: no module found", err)
res.WriteHeader(http.StatusNotFound)
res.Write([]byte("Not Found"))
return
}

//
// Collector API
collector := observe.NewCollector(nil)
ctx, r, err := collector.InitRuntime()
s.adapter.Start(ctx)
defer s.adapter.Stop(true)

cfg := wazero.NewRuntimeConfig().WithCustomSections(true)
rt := wazero.NewRuntimeWithConfig(ctx, cfg)
traceCtx, err := s.adapter.NewTraceCtx(ctx, rt, wasm, nil)
if err != nil {
log.Panicln(err)
}
defer r.Close(ctx) // This closes everything this Runtime created.

wasi_snapshot_preview1.MustInstantiate(ctx, r)
wasi_snapshot_preview1.MustInstantiate(ctx, rt)
output := &bytes.Buffer{}
config := wazero.NewModuleConfig().WithStdin(req.Body).WithStdout(output)
defer req.Body.Close()

s.adapter.Start(collector, wasm)
mod, err := r.InstantiateWithConfig(ctx, wasm, config)
mod, err := rt.InstantiateWithConfig(ctx, wasm, config)
if err != nil {
log.Println("module instance error:", err)
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte("Internal Service Error"))
return
}
s.adapter.Stop(collector)
log.Println("stopped collector, sent to datadog")
defer mod.Close(ctx)

meta := datadog.DatadogMetadata{
ResourceName: "iota-go",
HttpUrl: req.URL.String(),
HttpStatusCode: 200,
SpanKind: datadog.Server,
HttpClientIp: req.RemoteAddr,
}
traceCtx.Metadata(meta)
traceCtx.Finish()
log.Println("stopped collector, sent to datadog")

res.WriteHeader(http.StatusOK)
res.Header().Add("content-type", "application/json")
res.Write(output.Bytes())
Expand Down
31 changes: 26 additions & 5 deletions go/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type AdapterBase struct {
flusher Flusher
}

func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, config *Config) (*TraceCtx, error) {
if config == nil {
config = NewDefaultConfig()
func (a *AdapterBase) NewTraceCtx(ctx context.Context, r wazero.Runtime, wasm []byte, opts *Options) (*TraceCtx, error) {
if opts == nil {
opts = NewDefaultOptions()
}
return newTraceCtx(ctx, a.TraceEvents, r, wasm, config)
return newTraceCtx(ctx, a.TraceEvents, r, wasm, opts)
}

func NewAdapterBase(batchSize int, flushPeriod time.Duration) AdapterBase {
Expand All @@ -57,7 +57,7 @@ func (b *AdapterBase) HandleTraceEvent(te TraceEvent) {
b.eventBucket.addEvent(te, b.flusher)
}

func (b *AdapterBase) Start(a Adapter, ctx context.Context) {
func (b *AdapterBase) Start(ctx context.Context, a Adapter) {
b.stop = make(chan bool)

go func() {
Expand All @@ -83,3 +83,24 @@ func (b *AdapterBase) Stop(wait bool) {
b.eventBucket.Wait()
}
}

// Definition of how to filter our Spans to reduce noise
type SpanFilter struct {
MinDuration time.Duration
}

// Specify options to change what or how the adapter receives ObserveEvents
type Options struct {
SpanFilter *SpanFilter
ChannelBufferSize int
}

// Create a default configuration
func NewDefaultOptions() *Options {
return &Options{
ChannelBufferSize: 1024,
SpanFilter: &SpanFilter{
MinDuration: time.Microsecond * 20,
},
}
}
2 changes: 1 addition & 1 deletion go/adapter/datadog/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewDatadogAdapter(config *DatadogConfig) (*DatadogAdapter, error) {
}

func (d *DatadogAdapter) Start(ctx context.Context) {
d.AdapterBase.Start(d, ctx)
d.AdapterBase.Start(ctx, d)
}

func (d *DatadogAdapter) Stop(wait bool) {
Expand Down
2 changes: 1 addition & 1 deletion go/adapter/honeycomb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewHoneycombAdapter(config *HoneycombConfig) *HoneycombAdapter {
}

func (h *HoneycombAdapter) Start(ctx context.Context) {
h.AdapterBase.Start(h, ctx)
h.AdapterBase.Start(ctx, h)
}

func (h *HoneycombAdapter) HandleTraceEvent(te observe.TraceEvent) {
Expand Down
2 changes: 1 addition & 1 deletion go/adapter/lightstep/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewLightstepAdapter(config *LightstepConfig) *LightstepAdapter {
}

func (h *LightstepAdapter) Start(ctx context.Context) {
h.AdapterBase.Start(h, ctx)
h.AdapterBase.Start(ctx, h)
}

func (h *LightstepAdapter) HandleTraceEvent(te observe.TraceEvent) {
Expand Down
2 changes: 1 addition & 1 deletion go/adapter/otel_stdout/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,5 @@ func (o *OtelStdoutAdapter) makeCallSpans(event observe.CallEvent, parentId []by
}

func (o *OtelStdoutAdapter) Start(ctx context.Context) {
o.AdapterBase.Start(o, ctx)
o.AdapterBase.Start(ctx, o)
}
2 changes: 1 addition & 1 deletion go/adapter/stdout/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) {
}

func (s *StdoutAdapter) Start(ctx context.Context) {
s.AdapterBase.Start(s, ctx)
s.AdapterBase.Start(ctx, s)
}
39 changes: 20 additions & 19 deletions go/trace_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@ import (
"github.com/tetratelabs/wazero/experimental"
)

// The configuration object for the observe SDK
type Config struct {
ChannelBufferSize int
}

// Create a default configuration
func NewDefaultConfig() *Config {
return &Config{
ChannelBufferSize: 1024,
}
}

// TraceCtx holds the context for a trace, or wasm module invocation.
// It collects holds a channel to the Adapter and from the wazero Listener
// It will collect events throughout the invocation of the function. Calling
Expand All @@ -31,25 +19,25 @@ type TraceCtx struct {
raw chan RawEvent
events []Event
stack []CallEvent
Config *Config
Options *Options
names map[uint32]string
telemetryId TelemetryId
adapterMeta interface{}
}

// Creates a new TraceCtx. Used internally by the Adapter. The user should create the trace context from the Adapter.
func newTraceCtx(ctx context.Context, eventsChan chan TraceEvent, r wazero.Runtime, data []byte, config *Config) (*TraceCtx, error) {
func newTraceCtx(ctx context.Context, eventsChan chan TraceEvent, r wazero.Runtime, data []byte, opts *Options) (*TraceCtx, error) {
names, err := parseNames(data)
if err != nil {
return nil, err
}

traceCtx := &TraceCtx{
adapter: eventsChan,
raw: make(chan RawEvent, config.ChannelBufferSize),
raw: make(chan RawEvent, opts.ChannelBufferSize),
names: names,
telemetryId: NewTraceId(),
Config: config,
Options: opts,
}

err = traceCtx.init(ctx, r)
Expand Down Expand Up @@ -120,14 +108,27 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error {
fn.Stop(end)
fn.Raw = append(fn.Raw, ev)

f, ok := t.popFunction()
// if there is no function left to pop, we are exiting the root function of the trace
f, ok := t.peekFunction()
if !ok {
t.events = append(t.events, fn)
return
}

f.within = append(f.within, fn)
t.pushFunction(f)
// if the function duration is less than minimum duration, disregard
funcDuration := fn.Duration.Microseconds()
minSpanDuration := t.Options.SpanFilter.MinDuration.Microseconds()
if funcDuration < minSpanDuration {
return
}

// the function is within another function
f, ok = t.popFunction()
if ok {
f.within = append(f.within, fn)
t.pushFunction(f)
}

}).Export("instrument_exit")

functions.WithFunc(func(ctx context.Context, amt int32) {
Expand Down
8 changes: 7 additions & 1 deletion js/packages/observe-sdk-datadog/test/deno/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import Context from "https://deno.land/[email protected]/wasi/snapshot_preview1.ts";
const adapter = new DatadogAdapter();

const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm");
const traceContext = await adapter.start(bytes);

const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};
const traceContext = await adapter.start(bytes, opts);
const module = new WebAssembly.Module(bytes);

const runtime = new Context({
Expand Down
8 changes: 7 additions & 1 deletion js/packages/observe-sdk-datadog/test/node/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ const wasi = new WASI({
});

const adapter = new DatadogAdapter();
const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};

const bytes = fs.readFileSync("../../test-data/test.c.instr.wasm");
adapter.start(bytes).then((traceContext) => {

adapter.start(bytes, opts).then((traceContext) => {
const module = new WebAssembly.Module(bytes);

WebAssembly.instantiate(module, {
Expand Down
7 changes: 6 additions & 1 deletion js/packages/observe-sdk-datadog/test/web/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ import { File, OpenFile, WASI } from "@bjorn3/browser_wasi_shim";

const f = async () => {
const adapter = new DatadogAdapter();
const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};
const resp = await fetch("count_vowels.instr.wasm");

const bytes = await resp.arrayBuffer();
const traceContext = await adapter.start(bytes);
const traceContext = await adapter.start(bytes, opts);

let fds = [
new OpenFile(
Expand Down
7 changes: 6 additions & 1 deletion js/packages/observe-sdk-honeycomb/test/deno/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ const config: HoneycombConfig = {
host: 'https://api.honeycomb.io',
}
const adapter = new HoneycombAdapter(config);
const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};

const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm");
const traceContext = await adapter.start(bytes);
const traceContext = await adapter.start(bytes, opts);
const module = new WebAssembly.Module(bytes);

const runtime = new Context({
Expand Down
7 changes: 6 additions & 1 deletion js/packages/observe-sdk-honeycomb/test/node/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ const config = {
host: 'https://api.honeycomb.io',
}
const adapter = new HoneycombAdapter(config);
const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};

const bytes = fs.readFileSync("../../test-data/test.c.instr.wasm");
adapter.start(bytes).then((traceContext) => {
adapter.start(bytes, opts).then((traceContext) => {
const module = new WebAssembly.Module(bytes);

WebAssembly.instantiate(module, {
Expand Down
7 changes: 6 additions & 1 deletion js/packages/observe-sdk-honeycomb/test/web/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ const f = async () => {
host: 'https://api.honeycomb.io',
}
const adapter = new HoneycombAdapter(config);
const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};
const resp = await fetch("test.c.instr.wasm");

const bytes = await resp.arrayBuffer();
const traceContext = await adapter.start(bytes);
const traceContext = await adapter.start(bytes, opts);

let fds = [
new OpenFile(
Expand Down
8 changes: 6 additions & 2 deletions js/packages/observe-sdk-lightstep/test/deno/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ const config: LightstepConfig = {
host: 'https://ingest.lightstep.com',
}
const adapter = new LightstepAdapter(config);

const opts = {
spanFilter: {
minDurationMicroseconds: 100,
}
};
const bytes = await Deno.readFile("../../test-data/test.c.instr.wasm");
const traceContext = await adapter.start(bytes);
const traceContext = await adapter.start(bytes, opts);
const module = new WebAssembly.Module(bytes);

const runtime = new Context({
Expand Down
Loading

0 comments on commit d820d9b

Please sign in to comment.