Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Aug 23, 2024
1 parent dadc331 commit 0c1f0c1
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 30 deletions.
35 changes: 35 additions & 0 deletions exporter/exporterbatcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher"

// type Batcher[T any] struct {
// batches batch*

// flushTimeout int // TODO
// flushFunc func(req T) err
// }

// func (b *Batcher[T]) FlushIfNecessary() error {
// mu.Lock()

// var batchToExport
// var now = time.Now()
// if now - lastFlushTime > flushTimeout || activeBatch.size() > minBatchSize {
// lastFlushTime = now
// batchToExport = activeBatch
// activeBatch = pendingBatches[0]
// pendingBatches = pendingBatches[1:]
// }
// qc.timer.Reset(batcher.FlushTimeout)
// mu.Unlock()
// flushFunc(req)
// }


// func (b *Batcher[T]) Push(item T) error {
// if maxSize != 0 && batches[0].size() + item.size() > maxSize:
// sdlfj


// }
16 changes: 15 additions & 1 deletion exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,33 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

func (q *boundedMemoryQueue[T]) ClaimAndRead(onClaim func()) (T, bool) {
item, ok := q.sizedChannel.pop()
if !ok {
return item.req, ok
}
q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req))
onClaim()
return item.req, ok
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
item, ok := q.sizedChannel.pop()
if !ok {
return false
}
q.sizedChannel.updateSize(q.sizer.Sizeof(item.req))
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

func (pq *boundedMemoryQueue[T]) CommitConsume(ctx context.Context, index uint64) {
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
Expand Down
57 changes: 57 additions & 0 deletions exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
import (
"context"
"sync"
"time"

"go.opentelemetry.io/collector/component"
// "go.opentelemetry.io/collector/exporter/exporterbatcher"
)

type Consumers[T any] struct {
queue Queue[T]
numConsumers int
consumeFunc func(context.Context, T) error
stopWG sync.WaitGroup

mu sync.Mutex
timer time.Time
flushTimeout int // TODO
// batcher Batcher[T] TODO
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] {
Expand All @@ -23,9 +30,59 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con
numConsumers: numConsumers,
consumeFunc: consumeFunc,
stopWG: sync.WaitGroup{},
flushTimeout: 20, // TODO
}
}

func (qc *Consumers[T]) flushIfNecessary(ctx context.Context) error {

//if the active batch is not empty { TODO
var req T // TODO
if true {
if err := qc.consumeFunc(ctx, req); err == nil {
qc.queue.CommitConsume(ctx, 1) // TODO
}
}
return nil
}

// Start ensures that queue and all consumers are started.
// func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
// if err := qc.queue.Start(ctx, host); err != nil {
// return err
// }

// timer := time.NewTimer(1 * time.Second) // TODO
// allocConsumer := make(chan bool, 1)

// var startWG sync.WaitGroup
// for i := 0; i < qc.numConsumers; i++ {
// qc.stopWG.Add(1)
// startWG.Add(1)
// go func() {
// startWG.Done()
// defer qc.stopWG.Done()
// for {
// select {
// case <-timer.C:
// // qc.batcher.FlushIfNecessary() // TODO
// case <- allocConsumer:
// _, ok := qc.queue.ClaimAndRead(func(){ allocConsumer <- true })
// if !ok {
// return
// }
// // Put item into the batch
// // qc.batcher.Push(item) // TODO
// // qc.batcher.FlushIfNecessary() // TODO
// }
// }
// }()
// }
// startWG.Wait()

// return nil
// }

// Start ensures that queue and all consumers are started.
func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
if err := qc.queue.Start(ctx, host); err != nil {
Expand Down
52 changes: 35 additions & 17 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,33 +188,51 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (
return bytesToItemIndex(val)
}


func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool) {
if _, ok := pq.sizedChannel.pop(); !ok {
var t T
return t, false
}
onClaim()

var (
req T
// onProcessingFinished func(error)
consumed bool
)
req, _, consumed = pq.getNextItem(context.Background())
if consumed {
pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req))
}
return req, true
}

func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) {
if err := pq.itemDispatchingFinish(ctx, index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
for {
var (
req T
onProcessingFinished func(error)
consumed bool
)

// If we are stopped we still process all the other events in the channel before, but we
// return fast in the `getNextItem`, so we will free the channel fast and get to the stop.
_, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 {
req, onProcessingFinished, consumed = pq.getNextItem(context.Background())
if !consumed {
return 0
}
return pq.set.Sizer.Sizeof(req)
})
if !ok {
if _, ok := pq.sizedChannel.pop(); !ok {
return false
}
if consumed {
onProcessingFinished(consumeFunc(context.Background(), req))
return true

req, onProcessingFinished, ok := pq.getNextItem(context.Background())
if !ok {
return false
}

pq.sizedChannel.updateSize(pq.set.Sizer.Sizeof(req))
onProcessingFinished(consumeFunc(context.Background(), req))
return true
}
}

Expand Down
4 changes: 4 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type Queue[T any] interface {
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item T) error

ClaimAndRead(onClaim func()) (T, bool)
CommitConsume(ctx context.Context, index uint64)

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
Expand Down
44 changes: 32 additions & 12 deletions exporter/internal/queue/sized_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "sync/atomic"

// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
// Not thread safe.
type sizedChannel[T any] struct {
used *atomic.Int64

Expand Down Expand Up @@ -59,27 +60,46 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
// The callback is called before the element is removed from the queue. It must return the size of the element.
func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) {
el, ok := <-vcq.ch
if !ok {
return el, false
}
// REMOVE
// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize()
// This is because we want to parallize "reading" from the queue, and we only know the item size
// after reading. We also need to update the queue size in case the batch end up failing.

size := callback(el)
func (vcq *sizedChannel[T]) pop() (T, bool) {
el, ok := <-vcq.ch
return el, ok
}

func (vcq *sizedChannel[T]) updateSize(deltaSize int64) {
// The used size and the channel size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if vcq.used.Add(-size) < 0 {
if vcq.used.Add(deltaSize) < 0 {
vcq.used.Store(0)
}
return el, true
}

// // pop removes the element from the queue and returns it.
// // The call blocks until there is an item available or the queue is stopped.
// // The function returns true when an item is consumed or false if the queue is stopped and emptied.
// // The callback is called before the element is removed from the queue. It must return the size of the element.
// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) {
// el, ok := <-vcq.ch
// if !ok {
// return el, false
// }

// size := callback(el)

// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk
// // because we don't flush the current queue size on the disk on every read/write.
// // In that case we need to make sure it doesn't go below 0.
// if vcq.used.Add(-size) < 0 {
// vcq.used.Store(0)
// }
// return el, true
// }

// syncSize updates the used size to 0 if the queue is empty.
// The caller must ensure that this call is not called concurrently with push.
// It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always
Expand Down

0 comments on commit 0c1f0c1

Please sign in to comment.