From 6439a13346d1ec8ccc7ce7d877dcfc85583ed556 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 22 Aug 2024 18:32:13 -0700 Subject: [PATCH] poc --- .../internal/queue/bounded_memory_queue.go | 15 ++-- exporter/internal/queue/consumers.go | 70 +++++++++++++++- exporter/internal/queue/persistent_queue.go | 79 ++++++++++++------- exporter/internal/queue/queue.go | 4 + exporter/internal/queue/sized_channel.go | 39 +++++++-- 5 files changed, 164 insertions(+), 43 deletions(-) diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176a..effd543fae3c 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -43,14 +43,17 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) +func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool) { + item, ok := q.sizedChannel.pop() if !ok { - return false + return nil, false } - // the memory queue doesn't handle consume errors - _ = consumeFunc(item.ctx, item.req) - return true + q.sizedChannel.updateSize(-q.sizer.Sizeof(item)) + onClaim() + return item +} + +func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) { } // Shutdown closes the queue channel to initiate draining of the queue. diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index 7c57fea96200..ec77ea000e88 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -6,6 +6,7 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" "sync" + "time" "go.opentelemetry.io/collector/component" ) @@ -15,6 +16,11 @@ type Consumers[T any] struct { numConsumers int consumeFunc func(context.Context, T) error stopWG sync.WaitGroup + + mu sync.Mutex  + timer time.Time + flushTimeout int // TODO + batcher Batcher[T] } func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] { @@ -23,6 +29,30 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con numConsumers: numConsumers, consumeFunc: consumeFunc, stopWG: sync.WaitGroup{}, + flushTimeout: 20 + } +} + +type batch struct { + ctx context.Context + request Request +} + +func (qc *Consumers[T]) flushIfNecessary() error { + qc.mu.Lock() + + // TODO: move these to batcher + check last flush time and compare with now + take an active batch + update active batch + + qc.timer.Reset(batcher.FlushTimeout) + qc.mu.Unlock() + + if the active batch is not empty { + if err := qc.consumeFunc(ctx, batch.req); err == nil { + qc.queue.CommitConsume() + } } } @@ -32,6 +62,7 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { return err } + qc.timer = time.NewTimer(bs.cfg.FlushTimeout) // TODO var startWG sync.WaitGroup for i := 0; i < qc.numConsumers; i++ { qc.stopWG.Add(1) @@ -40,8 +71,18 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { startWG.Done() defer qc.stopWG.Done() for { - if !qc.queue.Consume(qc.consumeFunc) { - return + select { + case <-timer.C: + qc.batcher.flushIfNecessary() // TODO + case <- allocConsumer: + item, ctx, ok := qc.queue.Read(func(){ qc.allocConsumer <- true }) + if !ok { + return + } + // Put item into the batch + qc.batcher.push(item) // TODO + qc.batcher.flushIfNecessary() // TODO + case <- shutdownCh: } } }() @@ -51,6 +92,31 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { return nil } +// // Start ensures that queue and all consumers are started. +// func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error { +// if err := qc.queue.Start(ctx, host); err != nil { +// return err +// } + +// var startWG sync.WaitGroup +// for i := 0; i < qc.numConsumers; i++ { +// qc.stopWG.Add(1) +// startWG.Add(1) +// go func() { +// startWG.Done() +// defer qc.stopWG.Done() +// for { +// if !qc.queue.Consume(/*preConsumeCallback=*/nil, qc.consumeFunc) { +// return +// } +// } +// }() +// } +// startWG.Wait() + +// return nil +// } + // Shutdown ensures that queue and all consumers are stopped. func (qc *Consumers[T]) Shutdown(ctx context.Context) error { if err := qc.queue.Shutdown(ctx); err != nil { diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 7dd646c6ef30..aec6c6ae3def 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -188,36 +188,61 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } -// Consume applies the provided function on the head of queue. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped. -func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - for { - var ( - req T - onProcessingFinished func(error) - consumed bool - ) - - // If we are stopped we still process all the other events in the channel before, but we - // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { - return false - } - if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) - return true - } + +func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool) { + if _, ok := pq.sizedChannel.pop(); !ok { + return nil, false + } + onClaim() + + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if consumed { + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + } + return req +} + +func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) { + if err = pq.itemDispatchingFinish(ctx, index); err != nil { + pq.logger.Error("Error deleting item from queue", zap.Error(err)) } } +// // Consume applies the provided function on the head of queue. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped. +// func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { +// for { +// var ( +// req T +// onProcessingFinished func(error) +// consumed bool +// ) + +// // If we are stopped we still process all the other events in the channel before, but we +// // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. +// _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { +// req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) +// if !consumed { +// return 0 +// } +// return pq.set.Sizer.Sizeof(req) +// }) +// if !ok { +// return false +// } +// if consumed { +// onProcessingFinished(consumeFunc(context.Background(), req)) +// return true +// } +// } +// } + func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { // If the queue is not initialized, there is nothing to shut down. if pq.client == nil { diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579ed..baaba0ad3f3a 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,6 +25,10 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error + + ClaimAndRead(onClaim func()) (T, bool) + CommitConsume(ctx context.Context, index uint64) + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index 1702a38ac2f6..ff0b05861c23 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -7,6 +7,7 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. +// Not thread safe. type sizedChannel[T any] struct { used *atomic.Int64 @@ -59,27 +60,49 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error return nil } -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// REMOVE +// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize() +// This is because we want to parallize "reading" from the queue, and we only know the item size +// after reading. We also need to update the queue size in case the batch end up failing. + +func (vcq *sizedChannel[T]) pop() (T, bool) { el, ok := <-vcq.ch if !ok { return el, false } +} - size := callback(el) - +func (vcq *sizedChannel[T]) updateSize(deltaSize int64) (T, bool) { // The used size and the channel size might be not in sync with the queue in case it's restored from the disk // because we don't flush the current queue size on the disk on every read/write. // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { + if vcq.used.Add(deltaSize) < 0 { vcq.used.Store(0) } return el, true } +// // pop removes the element from the queue and returns it. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped and emptied. +// // The callback is called before the element is removed from the queue. It must return the size of the element. +// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// el, ok := <-vcq.ch +// if !ok { +// return el, false +// } + +// size := callback(el) + +// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk +// // because we don't flush the current queue size on the disk on every read/write. +// // In that case we need to make sure it doesn't go below 0. +// if vcq.used.Add(-size) < 0 { +// vcq.used.Store(0) +// } +// return el, true +// } + // syncSize updates the used size to 0 if the queue is empty. // The caller must ensure that this call is not called concurrently with push. // It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always