diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176a..d39c205d9b60 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -43,13 +43,24 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) +func (q *boundedMemoryQueue[T]) Claim( + preConsumeCallback func(), + consumeFunc func(context.Context, T) error) bool { + + item, ok := q.sizedChannel.pop() if !ok { return false } - // the memory queue doesn't handle consume errors - _ = consumeFunc(item.ctx, item.req) + q.sizedChannel.updateSize(-q.sizer.Sizeof(item)) + return item +} + +func (pq *persistentQueue[T]) Read(item memQueueEl[T]) (T, context.Context) { + return item +} + +func (pq *persistentQueue[T]) Consume(item memQueueEl[T]) { + consumeFunc(item.context.Background(), item.req); return true } diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index 7c57fea96200..b9520746a839 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -26,12 +26,27 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con } } +func (qc *Consumers[T]) flush() error { + grab a mutex + check last flush time and compare with now + + take an active batch + update active batch + + resetTimer + release mutex + + if the active batch is not empty + nextSender.send() +} + // Start ensures that queue and all consumers are started. func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { if err := qc.queue.Start(ctx, host); err != nil { return err } + timer := time.NewTimer(bs.cfg.FlushTimeout) // TODO var startWG sync.WaitGroup for i := 0; i < qc.numConsumers; i++ { qc.stopWG.Add(1) @@ -40,8 +55,15 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { startWG.Done() defer qc.stopWG.Done() for { - if !qc.queue.Consume(qc.consumeFunc) { - return + select { + case <-timer.C: + qc.flushIfNecessary() + case <- allocConsumer: + Claim() + qc.allocConsumer <- true + qc.Read() + qc.flushIfNecessary() + case <- shutdownCh: } } }() @@ -51,6 +73,31 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { return nil } +// // Start ensures that queue and all consumers are started. +// func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { +// if err := qc.queue.Start(ctx, host); err != nil { +// return err +// } + +// var startWG sync.WaitGroup +// for i := 0; i < qc.numConsumers; i++ { +// qc.stopWG.Add(1) +// startWG.Add(1) +// go func() { +// startWG.Done() +// defer qc.stopWG.Done() +// for { +// if !qc.queue.Consume(/*preConsumeCallback=*/nil, qc.consumeFunc) { +// return +// } +// } +// }() +// } +// startWG.Wait() + +// return nil +// } + // Shutdown ensures that queue and all consumers are stopped. func (qc *Consumers[T]) Shutdown(ctx context.Context) error { if err := qc.queue.Shutdown(ctx); err != nil { diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 7dd646c6ef30..ade8a1f901ec 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -188,35 +188,63 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } + +func (pq *persistentQueue[T]) Claim( + preConsumeCallback func(), + consumeFunc func(context.Context, T) error) bool { + _, ok := pq.sizedChannel.pop() + if !ok { + return false + } +} + +func (pq *persistentQueue[T]) Read() (T, context.Context) { + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if consumed { + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + } + return req, context +} + +func (pq *persistentQueue[T]) Consume() { + onProcessingFinished(consumeFunc(context.Background(), req); + return true +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. -func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - for { - var ( - req T - onProcessingFinished func(error) - consumed bool - ) - - // If we are stopped we still process all the other events in the channel before, but we - // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { - return false - } - if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) - return true - } - } -} +// func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { +// for { +// var ( +// req T +// onProcessingFinished func(error) +// consumed bool +// ) + +// // If we are stopped we still process all the other events in the channel before, but we +// // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. +// _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { +// req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) +// if !consumed { +// return 0 +// } +// return pq.set.Sizer.Sizeof(req) +// }) +// if !ok { +// return false +// } +// if consumed { +// onProcessingFinished(consumeFunc(context.Background(), req)) +// return true +// } +// } +// } func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { // If the queue is not initialized, there is nothing to shut down. diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579ed..3cb208fd003f 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,6 +25,11 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error + + Claim() + Read() + Remove() + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index 1702a38ac2f6..ff0b05861c23 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -7,6 +7,7 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. +// Not thread safe. type sizedChannel[T any] struct { used *atomic.Int64 @@ -59,27 +60,49 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error return nil } -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// REMOVE +// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize() +// This is because we want to parallize "reading" from the queue, and we only know the item size +// after reading. We also need to update the queue size in case the batch end up failing. + +func (vcq *sizedChannel[T]) pop() (T, bool) { el, ok := <-vcq.ch if !ok { return el, false } +} - size := callback(el) - +func (vcq *sizedChannel[T]) updateSize(deltaSize int64) (T, bool) { // The used size and the channel size might be not in sync with the queue in case it's restored from the disk // because we don't flush the current queue size on the disk on every read/write. // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { + if vcq.used.Add(deltaSize) < 0 { vcq.used.Store(0) } return el, true } +// // pop removes the element from the queue and returns it. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped and emptied. +// // The callback is called before the element is removed from the queue. It must return the size of the element. +// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// el, ok := <-vcq.ch +// if !ok { +// return el, false +// } + +// size := callback(el) + +// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk +// // because we don't flush the current queue size on the disk on every read/write. +// // In that case we need to make sure it doesn't go below 0. +// if vcq.used.Add(-size) < 0 { +// vcq.used.Store(0) +// } +// return el, true +// } + // syncSize updates the used size to 0 if the queue is empty. // The caller must ensure that this call is not called concurrently with push. // It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always