Skip to content

Commit

Permalink
Add Query Iterator for paginating select queries (#19)
Browse files Browse the repository at this point in the history
Co-authored-by: Nitish Tiwari <[email protected]>
  • Loading branch information
trueleo and nitisht authored Sep 27, 2023
1 parent dbb3632 commit e638b2c
Show file tree
Hide file tree
Showing 3 changed files with 515 additions and 28 deletions.
127 changes: 127 additions & 0 deletions pkg/iterator/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) 2023 Cloudnatively Services Pvt Ltd
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package iterator

import (
"time"
)

type MinuteCheckPoint struct {
// minute start time.
time time.Time
}

type QueryIterator[OK any, ERR any] struct {
rangeStartTime time.Time
rangeEndTime time.Time
ascending bool
index int
windows []MinuteCheckPoint
ready bool
finished bool
queryRunner func(time.Time, time.Time) (OK, ERR)
hasData func(time.Time, time.Time) bool
}

func NewQueryIterator[OK any, ERR any](startTime time.Time, endTime time.Time, ascending bool, queryRunner func(time.Time, time.Time) (OK, ERR), hasData func(time.Time, time.Time) bool) QueryIterator[OK, ERR] {
iter := QueryIterator[OK, ERR]{
rangeStartTime: startTime,
rangeEndTime: endTime,
ascending: ascending,
index: -1,
windows: []MinuteCheckPoint{},
ready: true,
finished: false,
queryRunner: queryRunner,
hasData: hasData,
}
iter.populateNextNonEmpty()
return iter
}

func (iter *QueryIterator[OK, ERR]) inRange(targetTime time.Time) bool {
return targetTime.Equal(iter.rangeStartTime) || (targetTime.After(iter.rangeStartTime) && targetTime.Before(iter.rangeEndTime))
}

func (iter *QueryIterator[OK, ERR]) Ready() bool {
return iter.ready
}

func (iter *QueryIterator[OK, ERR]) Finished() bool {
return iter.finished && iter.index == len(iter.windows)-1
}

func (iter *QueryIterator[OK, ERR]) CanFetchPrev() bool {
return iter.index > 0
}

func (iter *QueryIterator[OK, ERR]) populateNextNonEmpty() {
var inspectMinute MinuteCheckPoint

// this is initial condition when no checkpoint exists in the window
if len(iter.windows) == 0 {
if iter.ascending {
inspectMinute = MinuteCheckPoint{time: iter.rangeStartTime}
} else {
inspectMinute = MinuteCheckPoint{iter.rangeEndTime.Add(-time.Minute)}
}
} else {
inspectMinute = MinuteCheckPoint{time: nextMinute(iter.windows[len(iter.windows)-1].time, iter.ascending)}
}

iter.ready = false
for iter.inRange(inspectMinute.time) {
if iter.hasData(inspectMinute.time, inspectMinute.time.Add(time.Minute)) {
iter.windows = append(iter.windows, inspectMinute)
iter.ready = true
return
}
inspectMinute = MinuteCheckPoint{
time: nextMinute(inspectMinute.time, iter.ascending),
}
}

// if the loops breaks we have crossed the range with no data
iter.ready = true
iter.finished = true
}

func (iter *QueryIterator[OK, ERR]) Next() (OK, ERR) {
// This assumes that there is always a next index to fetch if this function is called
iter.index++
currentMinute := iter.windows[iter.index]
if iter.index == len(iter.windows)-1 {
iter.ready = false
go iter.populateNextNonEmpty()
}
return iter.queryRunner(currentMinute.time, currentMinute.time.Add(time.Minute))
}

func (iter *QueryIterator[OK, ERR]) Prev() (OK, ERR) {
if iter.index > 0 {
iter.index--
}
currentMinute := iter.windows[iter.index]
return iter.queryRunner(currentMinute.time, currentMinute.time.Add(time.Minute))
}

func nextMinute(current time.Time, ascending bool) time.Time {
if ascending {
return current.Add(time.Minute)
}
return current.Add(-time.Minute)
}
208 changes: 208 additions & 0 deletions pkg/iterator/iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright (c) 2023 Cloudnatively Services Pvt Ltd
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package iterator

import (
"testing"
"time"

"golang.org/x/exp/slices"
)

// dummy query provider can be instantiated with counts
type DummyQueryProvider struct {
state map[string]int
}

func (d *DummyQueryProvider) StartTime() time.Time {
keys := make([]time.Time, 0, len(d.state))
for k := range d.state {
parsedTime, _ := time.Parse(time.RFC822Z, k)
keys = append(keys, parsedTime)
}
return slices.MinFunc(keys, func(a time.Time, b time.Time) int {
return a.Compare(b)
})
}

func (d *DummyQueryProvider) EndTime() time.Time {
keys := make([]time.Time, 0, len(d.state))
for k := range d.state {
parsedTime, _ := time.Parse(time.RFC822Z, k)
keys = append(keys, parsedTime)
}
maxTime := slices.MaxFunc(keys, func(a time.Time, b time.Time) int {
return a.Compare(b)
})

return maxTime.Add(time.Minute)
}

func (*DummyQueryProvider) QueryRunnerFunc() func(time.Time, time.Time) ([]map[string]interface{}, error) {
return func(t1, t2 time.Time) ([]map[string]interface{}, error) {
return make([]map[string]interface{}, 0), nil
}
}

func (d *DummyQueryProvider) HasDataFunc() func(time.Time, time.Time) bool {
return func(t1, t2 time.Time) bool {
val, isExists := d.state[t1.Format(time.RFC822Z)]
if isExists && val > 0 {
return true
}
return false
}
}

func DefaultTestScenario() DummyQueryProvider {
return DummyQueryProvider{
state: map[string]int{
"02 Jan 06 15:04 +0000": 10,
"02 Jan 06 15:05 +0000": 0,
"02 Jan 06 15:06 +0000": 0,
"02 Jan 06 15:07 +0000": 10,
"02 Jan 06 15:08 +0000": 0,
"02 Jan 06 15:09 +0000": 3,
"02 Jan 06 15:10 +0000": 0,
"02 Jan 06 15:11 +0000": 0,
"02 Jan 06 15:12 +0000": 1,
},
}
}

func TestIteratorConstruct(t *testing.T) {
scenario := DefaultTestScenario()
iter := NewQueryIterator(scenario.StartTime(), scenario.EndTime(), true, scenario.QueryRunnerFunc(), scenario.HasDataFunc())

currentWindow := iter.windows[0]
if !(currentWindow.time == scenario.StartTime()) {
t.Fatalf("window time does not match start, expected %s, actual %s", scenario.StartTime().String(), currentWindow.time.String())
}
}

func TestIteratorAscending(t *testing.T) {
scenario := DefaultTestScenario()
iter := NewQueryIterator(scenario.StartTime(), scenario.EndTime(), true, scenario.QueryRunnerFunc(), scenario.HasDataFunc())

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow := iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:04 +0000", currentWindow, t)

// next should populate new window
if iter.finished == true {
t.Fatalf("Iter finished before expected")
}
if iter.ready == false {
t.Fatalf("Iter is not ready when it should be")
}

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow = iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:07 +0000", currentWindow, t)

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow = iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:09 +0000", currentWindow, t)

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow = iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:12 +0000", currentWindow, t)

if iter.finished != true {
t.Fatalf("iter should be finished now but it is not")
}
}

func TestIteratorDescending(t *testing.T) {
scenario := DefaultTestScenario()
iter := NewQueryIterator(scenario.StartTime(), scenario.EndTime(), false, scenario.QueryRunnerFunc(), scenario.HasDataFunc())

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow := iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:12 +0000", currentWindow, t)

// next should populate new window
if iter.finished == true {
t.Fatalf("Iter finished before expected")
}
if iter.ready == false {
t.Fatalf("Iter is not ready when it should be")
}

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow = iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:09 +0000", currentWindow, t)

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow = iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:07 +0000", currentWindow, t)

iter.Next()
// busy loop waiting for iter to be ready
for !iter.Ready() {
continue
}

currentWindow = iter.windows[iter.index]
checkCurrentWindowIndex("02 Jan 06 15:04 +0000", currentWindow, t)

if iter.finished != true {
t.Fatalf("iter should be finished now but it is not")
}
}

func checkCurrentWindowIndex(expectedValue string, currentWindow MinuteCheckPoint, t *testing.T) {
expectedTime, _ := time.Parse(time.RFC822Z, expectedValue)
if !(currentWindow.time == expectedTime) {
t.Fatalf("window time does not match start, expected %s, actual %s", expectedTime.String(), currentWindow.time.String())
}
}
Loading

0 comments on commit e638b2c

Please sign in to comment.