Skip to content

Commit

Permalink
enforce batch sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Dec 20, 2023
1 parent acc21bb commit db7525f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ func (b *shard) sendItems(trigger trigger) {
var err error

var parent context.Context
isSDK := allSame(contexts)
isSingleCtx := allSame(contexts)

// For SDK's we can reuse the parent context because there is
// only one possible parent. This is not the case
// for collector batchprocessors which must break the parent context
// because batch items can be incoming from multiple receivers.
if isSDK {
if isSingleCtx {
fmt.Println("ISSDK")
parent = contexts[0]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,13 @@ func TestBatchProcessorCancelContext(t *testing.T) {
func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 100
cfg.SendBatchMaxSize = 100
cfg.Timeout = 3 * time.Second
cfg.MaxInFlightBytesMiB = 2
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
requestCount := 5
spansPerRequest := 200
requestCount := 10
spansPerRequest := 20
exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
Expand Down Expand Up @@ -394,13 +395,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
// need to flush tracerprovider
tp.ForceFlush(bg)
td := exp.GetSpans()
fmt.Println(td)
fmt.Println(len(exp.GetSpans()))

assert.Equal(t, requestCount+1, len(td))
numBatches := (spansPerRequest*requestCount) / int(cfg.SendBatchMaxSize)
assert.Equal(t, numBatches+1, len(td))
for i := range td {
fmt.Println("LINKS")
fmt.Println(td[i].ChildSpanCount)
if !td[i].Parent.HasTraceID() {
assert.Equal(t, td[i].SpanContext, rootSp.SpanContext())
continue
Expand Down Expand Up @@ -496,11 +494,11 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
wg.Wait()

// need to flush tracerprovider
// for i := range callCtxs {
// tp.ForceFlush(callCtxs[i])
// fmt.Println(len(exp.GetSpans()))
// }
tp.ForceFlush(bg)
for i := range callCtxs {
tp.ForceFlush(callCtxs[i])
fmt.Println(len(exp.GetSpans()))
}
// tp.ForceFlush(bg)

td := exp.GetSpans()
// fmt.Println(td)
Expand Down

0 comments on commit db7525f

Please sign in to comment.