Skip to content

Commit

Permalink
add root spans, send single span requests
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Dec 20, 2023
1 parent 8d2086d commit 1d1b62e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (b *shard) sendItems(trigger trigger) {
parent, sp = b.tracer.Tracer("otel").Start(context.Background(), "concurrent_batch_processor/export", trace.WithLinks(links...))
sp.End()
}
fmt.Println(parent)
err = b.batch.export(parent, req)

latency := time.Since(before)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,12 @@ type testClient struct {
batcher processor.Traces
started []sdktrace.ReadWriteSpan
ended chan sdktrace.ReadOnlySpan
ctxMap map[context.Context][]sdktrace.ReadOnlySpan
}

func NewClient(batcher processor.Traces) sdktrace.SpanProcessor {
return &testClient{
batcher: batcher,
ended: make(chan sdktrace.ReadOnlySpan, 1),
ctxMap: make(map[context.Context][]sdktrace.ReadOnlySpan),
}
}

Expand All @@ -368,20 +366,15 @@ func (c *testClient) OnEnd(s sdktrace.ReadOnlySpan) {
c.ended <- s
}

func (c *testClient) split(ctx context.Context) {
c.mtx.Lock()
defer c.mtx.Unlock()
s := <-c.ended
c.ctxMap[ctx] = append(c.ctxMap[ctx], s)
return
}

func (c *testClient) Shutdown(ctx context.Context) error {
return nil
}

func (c *testClient) ForceFlush(ctx context.Context) error {
return c.batcher.ConsumeTraces(ctx, c.d2pd(c.ctxMap[ctx]))
span := <-c.ended
spans := []sdktrace.ReadOnlySpan{span}

return c.batcher.ConsumeTraces(ctx, c.d2pd(spans))
}

func (c *testClient) d2pd(in []sdktrace.ReadOnlySpan) ptrace.Traces {
Expand Down Expand Up @@ -436,7 +429,7 @@ func copyLinks(dest ptrace.SpanLinkSlice, links []sdktrace.Link) {
func TestBatchProcessorCancelContextMultiple(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 200
cfg.SendBatchSize = 50
cfg.Timeout = 3 * time.Second
cfg.MaxInFlightBytesMiB = 2
creationSet := processortest.NewNopCreateSettings()
Expand All @@ -448,14 +441,15 @@ func TestBatchProcessorCancelContextMultiple(t *testing.T) {
)
otel.SetTracerProvider(tp)
tracer := tp.Tracer("otel")
bg, rootSp := tracer.Start(context.Background(), "test_start_parent")
defer rootSp.End()
bp, err := newBatchTracesProcessor(creationSet, sink, cfg, true)
require.NoError(t, err)
c := NewClient(bp)

tp.RegisterSpanProcessor(c)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

bg := context.Background()
callCtxs := []context.Context{
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{
Expand Down Expand Up @@ -491,40 +485,40 @@ func TestBatchProcessorCancelContextMultiple(t *testing.T) {
var expectedSpanIDs []trace.SpanID
for i := 0; i < requestCount; i++ {
num := i % len(callCtxs)
_, span := tracer.Start(callCtxs[num], fmt.Sprintf("span%d", i))
ctx, span := tracer.Start(callCtxs[num], fmt.Sprintf("span%d", i))
span.End()

c.(*testClient).split(callCtxs[num])
expectedTraceIDs = append(expectedTraceIDs, span.SpanContext().TraceID())
expectedSpanIDs = append(expectedSpanIDs, span.SpanContext().SpanID())
}


for i := range callCtxs {
wg.Add(1)
go func() {
err = tp.ForceFlush(callCtxs[i])
assert.NoError(t, err)
err = tp.ForceFlush(ctx)
assert.NoError(t, err)
wg.Done()
}()
// c.(*testClient).split(callCtxs[num])
expectedTraceIDs = append(expectedTraceIDs, span.SpanContext().TraceID())
expectedSpanIDs = append(expectedSpanIDs, span.SpanContext().SpanID())
}

// expectedTraceIDs = append(expectedTraceIDs, rootSp.SpanContext().TraceID())
// expectedSpanIDs = append(expectedSpanIDs, rootSp.SpanContext().SpanID())

wg.Wait()

td := exp.GetSpans()
fmt.Println(td)
fmt.Println(len(exp.GetSpans()))
var actualTraceIDs []trace.TraceID
var actualSpanIDs []trace.SpanID
for i := range td {
fmt.Println("LINKS")
fmt.Println(td[i].Links)
fmt.Println(td[i].Parent)
actualTraceIDs = append(actualTraceIDs, td[i].SpanContext.TraceID())
actualSpanIDs = append(actualSpanIDs, td[i].SpanContext.SpanID())
}

assert.ElementsMatch(t, expectedSpanIDs, actualSpanIDs)
assert.ElementsMatch(t, expectedTraceIDs, actualTraceIDs)
// require.NoError(t, bp.Shutdown(context.Background()))
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorSpansDelivered(t *testing.T) {
Expand Down

0 comments on commit 1d1b62e

Please sign in to comment.