Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Aug 23, 2024
1 parent dadc331 commit 69220eb
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 40 deletions.
19 changes: 15 additions & 4 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,24 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
func (q *boundedMemoryQueue[T]) Claim(
preConsumeCallback func(),
consumeFunc func(context.Context, T) error) bool {

item, ok := q.sizedChannel.pop()
if !ok {
return false
}
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
q.sizedChannel.updateSize(-q.sizer.Sizeof(item))
return item
}

func (pq *persistentQueue[T]) Read(item memQueueEl[T]) (T, context.Context) {
return item
}

func (pq *persistentQueue[T]) Consume(item memQueueEl[T]) {
consumeFunc(item.context.Background(), item.req);
return true
}

Expand Down
51 changes: 49 additions & 2 deletions exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,27 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con
}
}

func (qc *Consumers[T]) flush() error {
grab a mutex

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

expected ';', found a

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected a at end of statement

Check failure on line 30 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected a at end of statement
check last flush time and compare with now

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected last at end of statement

Check failure on line 31 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected last at end of statement

take an active batch

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected an at end of statement

Check failure on line 33 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected an at end of statement
update active batch

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected active at end of statement

Check failure on line 34 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected active at end of statement

resetTimer
release mutex

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected mutex at end of statement

Check failure on line 37 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected mutex at end of statement

if the active batch is not empty

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

expected ';', found active

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected active, expected {

Check failure on line 39 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected active, expected {
nextSender.send()
}

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected }, expected expression

Check failure on line 41 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected }, expected expression

// Start ensures that queue and all consumers are started.
func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / windows-unittest

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / test-coverage

syntax error: unexpected context in argument list; possibly missing comma or )

Check failure on line 44 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected context in argument list; possibly missing comma or )
if err := qc.queue.Start(ctx, host); err != nil {
return err
}

timer := time.NewTimer(bs.cfg.FlushTimeout) // TODO
var startWG sync.WaitGroup
for i := 0; i < qc.numConsumers; i++ {
qc.stopWG.Add(1)
Expand All @@ -40,8 +55,15 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
startWG.Done()
defer qc.stopWG.Done()
for {
if !qc.queue.Consume(qc.consumeFunc) {
return
select {
case <-timer.C:
qc.flushIfNecessary()
case <- allocConsumer:
Claim()
qc.allocConsumer <- true
qc.Read()
qc.flushIfNecessary()
case <- shutdownCh:
}
}
}()
Expand All @@ -51,6 +73,31 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
return nil
}

// // Start ensures that queue and all consumers are started.
// func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
// if err := qc.queue.Start(ctx, host); err != nil {
// return err
// }

// var startWG sync.WaitGroup
// for i := 0; i < qc.numConsumers; i++ {
// qc.stopWG.Add(1)
// startWG.Add(1)
// go func() {
// startWG.Done()
// defer qc.stopWG.Done()
// for {
// if !qc.queue.Consume(/*preConsumeCallback=*/nil, qc.consumeFunc) {
// return
// }
// }
// }()
// }
// startWG.Wait()

// return nil
// }

// Shutdown ensures that queue and all consumers are stopped.
func (qc *Consumers[T]) Shutdown(ctx context.Context) error {

Check failure on line 102 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

missing ',' in argument list
if err := qc.queue.Shutdown(ctx); err != nil {

Check failure on line 103 in exporter/internal/queue/consumers.go

View workflow job for this annotation

GitHub Actions / checks

missing ',' in argument list
Expand Down
80 changes: 54 additions & 26 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,35 +188,63 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (
return bytesToItemIndex(val)
}


func (pq *persistentQueue[T]) Claim(
preConsumeCallback func(),
consumeFunc func(context.Context, T) error) bool {
_, ok := pq.sizedChannel.pop()
if !ok {
return false
}
}

func (pq *persistentQueue[T]) Read() (T, context.Context) {
var (
req T
onProcessingFinished func(error)
consumed bool
)
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
if consumed {
pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req))
}
return req, context
}

func (pq *persistentQueue[T]) Consume() {
onProcessingFinished(consumeFunc(context.Background(), req);

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / checks

missing ',' in argument list

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected semicolon in argument list; possibly missing comma or )

Check failure on line 215 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected semicolon in argument list; possibly missing comma or )
return true

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / Integration test

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

syntax error: unexpected return at end of statement

Check failure on line 216 in exporter/internal/queue/persistent_queue.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

syntax error: unexpected return at end of statement
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
return pq.set.Sizer.Sizeof(req)
})
if !ok {
return false
}
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
return true
}
}
}
// func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
// for {
// var (
// req T
// onProcessingFinished func(error)
// consumed bool
// )

// // If we are stopped we still process all the other events in the channel before, but we
// // return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
// _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
// req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
// if !consumed {
// return 0
// }
// return pq.set.Sizer.Sizeof(req)
// })
// if !ok {
// return false
// }
// if consumed {
// onProcessingFinished(consumeFunc(context.Background(), req))
// return true
// }
// }
// }

func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
// If the queue is not initialized, there is nothing to shut down.
Expand Down
5 changes: 5 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Queue[T any] interface {
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item T) error

Claim()
Read()
Remove()

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
Expand Down
39 changes: 31 additions & 8 deletions exporter/internal/queue/sized_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "sync/atomic"

// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
// Not thread safe.
type sizedChannel[T any] struct {
used *atomic.Int64

Expand Down Expand Up @@ -59,27 +60,49 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
// The callback is called before the element is removed from the queue. It must return the size of the element.
func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) {
// REMOVE
// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize()
// This is because we want to parallize "reading" from the queue, and we only know the item size
// after reading. We also need to update the queue size in case the batch end up failing.

func (vcq *sizedChannel[T]) pop() (T, bool) {
el, ok := <-vcq.ch
if !ok {
return el, false
}
}

size := callback(el)

func (vcq *sizedChannel[T]) updateSize(deltaSize int64) (T, bool) {
// The used size and the channel size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if vcq.used.Add(-size) < 0 {
if vcq.used.Add(deltaSize) < 0 {
vcq.used.Store(0)
}
return el, true
}

// // pop removes the element from the queue and returns it.
// // The call blocks until there is an item available or the queue is stopped.
// // The function returns true when an item is consumed or false if the queue is stopped and emptied.
// // The callback is called before the element is removed from the queue. It must return the size of the element.
// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) {
// el, ok := <-vcq.ch
// if !ok {
// return el, false
// }

// size := callback(el)

// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk
// // because we don't flush the current queue size on the disk on every read/write.
// // In that case we need to make sure it doesn't go below 0.
// if vcq.used.Add(-size) < 0 {
// vcq.used.Store(0)
// }
// return el, true
// }

// syncSize updates the used size to 0 if the queue is empty.
// The caller must ensure that this call is not called concurrently with push.
// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always
Expand Down

0 comments on commit 69220eb

Please sign in to comment.