Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Aug 23, 2024
1 parent dadc331 commit 6439a13
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 43 deletions.
15 changes: 9 additions & 6 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool) {
item, ok := q.sizedChannel.pop()
if !ok {
return false
return nil, false
}
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
q.sizedChannel.updateSize(-q.sizer.Sizeof(item))
onClaim()
return item
}

func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) {
}

// Shutdown closes the queue channel to initiate draining of the queue.
Expand Down
70 changes: 68 additions & 2 deletions exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
import (
"context"
"sync"
"time"

"go.opentelemetry.io/collector/component"
)
Expand All @@ -15,6 +16,11 @@ type Consumers[T any] struct {
numConsumers int
consumeFunc func(context.Context, T) error
stopWG sync.WaitGroup

mu sync.Mutex 

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

illegal character U+3000

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid character U+3000 in identifier

Check failure on line 20 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid character U+3000 in identifier
timer time.Time
flushTimeout int // TODO
batcher Batcher[T]
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] {
Expand All @@ -23,6 +29,30 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con
numConsumers: numConsumers,
consumeFunc: consumeFunc,
stopWG: sync.WaitGroup{},
flushTimeout: 20

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected newline in composite literal; possibly missing comma or }

Check failure on line 32 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected newline in composite literal; possibly missing comma or }
}
}

type batch struct {
ctx context.Context
request Request
}

func (qc *Consumers[T]) flushIfNecessary() error {
qc.mu.Lock()

// TODO: move these to batcher
check last flush time and compare with now

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

expected ';', found last

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected last at end of statement

Check failure on line 45 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected last at end of statement
take an active batch

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected an at end of statement

Check failure on line 46 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected an at end of statement
update active batch

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected active at end of statement

Check failure on line 47 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected active at end of statement

qc.timer.Reset(batcher.FlushTimeout)
qc.mu.Unlock()

if the active batch is not empty {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

expected ';', found active

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected active, expected {

Check failure on line 52 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected active, expected {
if err := qc.consumeFunc(ctx, batch.req); err == nil {
qc.queue.CommitConsume()
}
}
}

Expand All @@ -32,6 +62,7 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
return err
}

qc.timer = time.NewTimer(bs.cfg.FlushTimeout) // TODO
var startWG sync.WaitGroup
for i := 0; i < qc.numConsumers; i++ {
qc.stopWG.Add(1)
Expand All @@ -40,8 +71,18 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
startWG.Done()
defer qc.stopWG.Done()
for {
if !qc.queue.Consume(qc.consumeFunc) {
return
select {
case <-timer.C:
qc.batcher.flushIfNecessary() // TODO
case <- allocConsumer:
item, ctx, ok := qc.queue.Read(func(){ qc.allocConsumer <- true })
if !ok {
return
}
// Put item into the batch
qc.batcher.push(item) // TODO
qc.batcher.flushIfNecessary() // TODO
case <- shutdownCh:
}
}
}()
Expand All @@ -51,6 +92,31 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
return nil
}

// // Start ensures that queue and all consumers are started.
// func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
// if err := qc.queue.Start(ctx, host); err != nil {
// return err
// }

// var startWG sync.WaitGroup
// for i := 0; i < qc.numConsumers; i++ {
// qc.stopWG.Add(1)
// startWG.Add(1)
// go func() {
// startWG.Done()
// defer qc.stopWG.Done()
// for {
// if !qc.queue.Consume(/*preConsumeCallback=*/nil, qc.consumeFunc) {
// return
// }
// }
// }()
// }
// startWG.Wait()

// return nil
// }

// Shutdown ensures that queue and all consumers are stopped.
func (qc *Consumers[T]) Shutdown(ctx context.Context) error {
if err := qc.queue.Shutdown(ctx); err != nil {
Expand Down
79 changes: 52 additions & 27 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,36 +188,61 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (
return bytesToItemIndex(val)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
return pq.set.Sizer.Sizeof(req)
})
if !ok {
return false
}
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
return true
}

func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool) {
if _, ok := pq.sizedChannel.pop(); !ok {
return nil, false
}
onClaim()

var (
req T
onProcessingFinished func(error)
consumed bool
)
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
if consumed {
pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req))
}
return req
}

func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) {
if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}
}

// // Consume applies the provided function on the head of queue.
// // The call blocks until there is an item available or the queue is stopped.
// // The function returns true when an item is consumed or false if the queue is stopped.
// func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
// for {
// var (
// req T
// onProcessingFinished func(error)
// consumed bool
// )

// // If we are stopped we still process all the other events in the channel before, but we
// // return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
// _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
// req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
// if !consumed {
// return 0
// }
// return pq.set.Sizer.Sizeof(req)
// })
// if !ok {
// return false
// }
// if consumed {
// onProcessingFinished(consumeFunc(context.Background(), req))
// return true
// }
// }
// }

func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
// If the queue is not initialized, there is nothing to shut down.
if pq.client == nil {
Expand Down
4 changes: 4 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type Queue[T any] interface {
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item T) error

ClaimAndRead(onClaim func()) (T, bool)
CommitConsume(ctx context.Context, index uint64)

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
Expand Down
39 changes: 31 additions & 8 deletions exporter/internal/queue/sized_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "sync/atomic"

// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
// Not thread safe.
type sizedChannel[T any] struct {
used *atomic.Int64

Expand Down Expand Up @@ -59,27 +60,49 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
// The callback is called before the element is removed from the queue. It must return the size of the element.
func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) {
// REMOVE
// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize()
// This is because we want to parallize "reading" from the queue, and we only know the item size
// after reading. We also need to update the queue size in case the batch end up failing.

func (vcq *sizedChannel[T]) pop() (T, bool) {
el, ok := <-vcq.ch
if !ok {
return el, false
}
}

size := callback(el)

func (vcq *sizedChannel[T]) updateSize(deltaSize int64) (T, bool) {
// The used size and the channel size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if vcq.used.Add(-size) < 0 {
if vcq.used.Add(deltaSize) < 0 {
vcq.used.Store(0)
}
return el, true
}

// // pop removes the element from the queue and returns it.
// // The call blocks until there is an item available or the queue is stopped.
// // The function returns true when an item is consumed or false if the queue is stopped and emptied.
// // The callback is called before the element is removed from the queue. It must return the size of the element.
// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) {
// el, ok := <-vcq.ch
// if !ok {
// return el, false
// }

// size := callback(el)

// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk
// // because we don't flush the current queue size on the disk on every read/write.
// // In that case we need to make sure it doesn't go below 0.
// if vcq.used.Add(-size) < 0 {
// vcq.used.Store(0)
// }
// return el, true
// }

// syncSize updates the used size to 0 if the queue is empty.
// The caller must ensure that this call is not called concurrently with push.
// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always
Expand Down

0 comments on commit 6439a13

Please sign in to comment.