Skip to content

Commit

Permalink
feat(ARCO-291): Fix sort order
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Jan 22, 2025
1 parent 7f55c91 commit 0c60561
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
23 changes: 13 additions & 10 deletions internal/callbacker/send_manager/send_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SendManager struct {
ctx context.Context

queueProcessInterval time.Duration
backfillQueueInterval time.Duration
fillUpQueueInterval time.Duration
sortByTimestampInterval time.Duration
batchSendInterval time.Duration
batchSize int
Expand All @@ -54,7 +54,7 @@ const (
entriesBufferSize = 10000
batchSizeDefault = 50
queueProcessIntervalDefault = 5 * time.Second
backfillQueueIntervalDefault = 5 * time.Second
fillUpQueueIntervalDefault = 5 * time.Second
expirationDefault = 24 * time.Hour
sortByTimestampIntervalDefault = 10 * time.Second
batchSendIntervalDefault = 5 * time.Second
Expand Down Expand Up @@ -85,7 +85,7 @@ func WithQueueProcessInterval(d time.Duration) func(*SendManager) {

func WithBackfillQueueInterval(d time.Duration) func(*SendManager) {
return func(m *SendManager) {
m.backfillQueueInterval = d
m.fillUpQueueInterval = d
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ func New(url string, sender callbacker.SenderI, store SendManagerStore, logger *

queueProcessInterval: queueProcessIntervalDefault,
expiration: expirationDefault,
backfillQueueInterval: backfillQueueIntervalDefault,
fillUpQueueInterval: fillUpQueueIntervalDefault,
sortByTimestampInterval: sortByTimestampIntervalDefault,
batchSendInterval: batchSendIntervalDefault,
batchSize: batchSizeDefault,
Expand Down Expand Up @@ -153,7 +153,8 @@ func (m *SendManager) Enqueue(entry callbacker.CallbackEntry) {
m.callbackQueue.PushBack(entry)
}

func (m *SendManager) sortByTimestamp() error {
// sortByTimestampAsc sorts the callback queue by timestamp in ascending order
func (m *SendManager) sortByTimestampAsc() error {
current := m.callbackQueue.Front()
if m.callbackQueue.Front() == nil {
return nil
Expand All @@ -170,7 +171,7 @@ func (m *SendManager) sortByTimestamp() error {
if !ok {
return ErrElementIsNotCallbackEntry
}
if currentTime.Data.Timestamp.Before(indexTime.Data.Timestamp) {
if currentTime.Data.Timestamp.After(indexTime.Data.Timestamp) {
temp := current.Value
current.Value = index.Value
index.Value = temp
Expand All @@ -190,7 +191,7 @@ func (m *SendManager) CallbacksQueued() int {
func (m *SendManager) Start() {
queueTicker := time.NewTicker(m.queueProcessInterval)
sortQueueTicker := time.NewTicker(m.sortByTimestampInterval)
backfillQueueTicker := time.NewTicker(m.backfillQueueInterval)
backfillQueueTicker := time.NewTicker(m.fillUpQueueInterval)
batchSendTicker := time.NewTicker(m.batchSendInterval)

m.entriesWg.Add(1)
Expand Down Expand Up @@ -231,13 +232,13 @@ func (m *SendManager) Start() {
case <-m.ctx.Done():
return
case <-sortQueueTicker.C:
err = m.sortByTimestamp()
err = m.sortByTimestampAsc()
if err != nil {
m.logger.Error("Failed to sort by timestamp", slog.String("err", err.Error()))
}

case <-backfillQueueTicker.C:
m.backfillQueue()
m.fillUpQueue()

m.logger.Debug("Callback queue backfilled", slog.Int("callback elements", len(callbackBatch)), slog.Int("queue length", m.CallbacksQueued()), slog.String("url", m.url))
case <-batchSendTicker.C:
Expand Down Expand Up @@ -350,7 +351,8 @@ func (m *SendManager) sendBatch(batch []callbacker.CallbackEntry) (success, retr
return m.sender.SendBatch(m.url, token, callbacks)
}

func (m *SendManager) backfillQueue() {
// fillUpQueue calculates the capacity left in the queue and fills it up
func (m *SendManager) fillUpQueue() {
capacityLeft := m.bufferSize - m.callbackQueue.Len()
if capacityLeft == 0 {
return
Expand Down Expand Up @@ -419,6 +421,7 @@ func (m *SendManager) dequeueAll() []callbacker.CallbackEntry {
next = front.Next()
entry, ok := front.Value.(callbacker.CallbackEntry)
if !ok {
m.callbackQueue.Remove(front)
continue
}
callbacks = append(callbacks, entry)
Expand Down
2 changes: 1 addition & 1 deletion internal/callbacker/send_manager/send_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestSendManagerStart(t *testing.T) {
assert.Equal(t, tc.expectedSetMany, len(data))

for i := 0; i < len(data)-1; i++ {
assert.GreaterOrEqual(t, data[i].Timestamp, data[i+1].Timestamp)
assert.LessOrEqual(t, data[i].Timestamp, data[i+1].Timestamp)
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/callbacker/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er
return err
}

// GetAndDelete returns deletes a number of callbacks limited by `limit` ordered by timestamp in ascending order
func (p *PostgreSQL) GetAndDelete(ctx context.Context, url string, limit int) ([]*store.CallbackData, error) {
const q = `DELETE FROM callbacker.callbacks
WHERE id IN (
SELECT id FROM callbacker.callbacks
WHERE url = $1
ORDER BY timestamp DESC
ORDER BY timestamp ASC
LIMIT $2
FOR UPDATE
)
Expand Down

0 comments on commit 0c60561

Please sign in to comment.