Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Sep 13, 2024
1 parent 5fc39ba commit 8a38fa6
Show file tree
Hide file tree
Showing 9 changed files with 1,074 additions and 792 deletions.
35 changes: 35 additions & 0 deletions exporter/exporterbatcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

// type Batcher[T any] struct {
// batches batch*

// flushTimeout int // TODO
// flushFunc func(req T) err
// }

// func (b *Batcher[T]) FlushIfNecessary() error {
// mu.Lock()

// var batchToExport
// var now = time.Now()
// if now - lastFlushTime > flushTimeout || activeBatch.size() > minBatchSize {
// lastFlushTime = now
// batchToExport = activeBatch
// activeBatch = pendingBatches[0]
// pendingBatches = pendingBatches[1:]
// }
// qc.timer.Reset(batcher.FlushTimeout)
// mu.Unlock()
// flushFunc(req)
// }


// func (b *Batcher[T]) Push(item T) error {
// if maxSize != 0 && batches[0].size() + item.size() > maxSize:
// sdlfj


// }
82 changes: 41 additions & 41 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,42 +61,42 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.Settings,
}

func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
bs.shutdownCh = make(chan struct{})
timer := time.NewTimer(bs.cfg.FlushTimeout)
go func() {
for {
select {
case <-bs.shutdownCh:
// There is a minimal chance that another request is added after the shutdown signal.
// This loop will handle that case.
for bs.activeRequests.Load() > 0 {
bs.mu.Lock()
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
}
bs.mu.Unlock()
}
if !timer.Stop() {
<-timer.C
}
close(bs.shutdownCompleteCh)
return
case <-timer.C:
bs.mu.Lock()
nextFlush := bs.cfg.FlushTimeout
if bs.activeBatch.request != nil {
sinceLastFlush := time.Since(bs.lastFlushed)
if sinceLastFlush >= bs.cfg.FlushTimeout {
bs.exportActiveBatch()
} else {
nextFlush = bs.cfg.FlushTimeout - sinceLastFlush
}
}
bs.mu.Unlock()
timer.Reset(nextFlush)
}
}
}()
// bs.shutdownCh = make(chan struct{})
// timer := time.NewTimer(bs.cfg.FlushTimeout)
// go func() {
// for {
// select {
// case <-bs.shutdownCh:
// // There is a minimal chance that another request is added after the shutdown signal.
// // This loop will handle that case.
// for bs.activeRequests.Load() > 0 {
// bs.mu.Lock()
// if bs.activeBatch.request != nil {
// bs.exportActiveBatch()
// }
// bs.mu.Unlock()
// }
// if !timer.Stop() {
// <-timer.C
// }
// close(bs.shutdownCompleteCh)
// return
// case <-timer.C:
// bs.mu.Lock()
// nextFlush := bs.cfg.FlushTimeout
// if bs.activeBatch.request != nil {
// sinceLastFlush := time.Since(bs.lastFlushed)
// if sinceLastFlush >= bs.cfg.FlushTimeout {
// bs.exportActiveBatch()
// } else {
// nextFlush = bs.cfg.FlushTimeout - sinceLastFlush
// }
// }
// bs.mu.Unlock()
// timer.Reset(nextFlush)
// }
// }
// }()

return nil
}
Expand Down Expand Up @@ -231,10 +231,10 @@ func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
}

func (bs *batchSender) Shutdown(context.Context) error {
bs.stopped.Store(true)
if bs.shutdownCh != nil {
close(bs.shutdownCh)
<-bs.shutdownCompleteCh
}
// bs.stopped.Store(true)
// if bs.shutdownCh != nil {
// close(bs.shutdownCh)
// <-bs.shutdownCompleteCh
// }
return nil
}
Loading

0 comments on commit 8a38fa6

Please sign in to comment.