diff --git a/pkg/iterator/iterator.go b/pkg/iterator/iterator.go new file mode 100644 index 0000000..f9e02a1 --- /dev/null +++ b/pkg/iterator/iterator.go @@ -0,0 +1,127 @@ +// Copyright (c) 2023 Cloudnatively Services Pvt Ltd +// +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package iterator + +import ( + "time" +) + +type MinuteCheckPoint struct { + // minute start time. + time time.Time +} + +type QueryIterator[OK any, ERR any] struct { + rangeStartTime time.Time + rangeEndTime time.Time + ascending bool + index int + windows []MinuteCheckPoint + ready bool + finished bool + queryRunner func(time.Time, time.Time) (OK, ERR) + hasData func(time.Time, time.Time) bool +} + +func NewQueryIterator[OK any, ERR any](startTime time.Time, endTime time.Time, ascending bool, queryRunner func(time.Time, time.Time) (OK, ERR), hasData func(time.Time, time.Time) bool) QueryIterator[OK, ERR] { + iter := QueryIterator[OK, ERR]{ + rangeStartTime: startTime, + rangeEndTime: endTime, + ascending: ascending, + index: -1, + windows: []MinuteCheckPoint{}, + ready: true, + finished: false, + queryRunner: queryRunner, + hasData: hasData, + } + iter.populateNextNonEmpty() + return iter +} + +func (iter *QueryIterator[OK, ERR]) inRange(targetTime time.Time) bool { + return targetTime.Equal(iter.rangeStartTime) || (targetTime.After(iter.rangeStartTime) && targetTime.Before(iter.rangeEndTime)) +} + +func (iter *QueryIterator[OK, ERR]) Ready() bool { + return iter.ready +} + +func (iter *QueryIterator[OK, ERR]) Finished() bool { + return iter.finished && iter.index == len(iter.windows)-1 +} + +func (iter *QueryIterator[OK, ERR]) CanFetchPrev() bool { + return iter.index > 0 +} + +func (iter *QueryIterator[OK, ERR]) populateNextNonEmpty() { + var inspectMinute MinuteCheckPoint + + // this is initial condition when no checkpoint exists in the window + if len(iter.windows) == 0 { + if iter.ascending { + inspectMinute = MinuteCheckPoint{time: iter.rangeStartTime} + } else { + inspectMinute = MinuteCheckPoint{iter.rangeEndTime.Add(-time.Minute)} + } + } else { + inspectMinute = MinuteCheckPoint{time: nextMinute(iter.windows[len(iter.windows)-1].time, iter.ascending)} + } + + iter.ready = false + for iter.inRange(inspectMinute.time) { + if iter.hasData(inspectMinute.time, inspectMinute.time.Add(time.Minute)) { + iter.windows = append(iter.windows, inspectMinute) + iter.ready = true + return + } + inspectMinute = MinuteCheckPoint{ + time: nextMinute(inspectMinute.time, iter.ascending), + } + } + + // if the loops breaks we have crossed the range with no data + iter.ready = true + iter.finished = true +} + +func (iter *QueryIterator[OK, ERR]) Next() (OK, ERR) { + // This assumes that there is always a next index to fetch if this function is called + iter.index++ + currentMinute := iter.windows[iter.index] + if iter.index == len(iter.windows)-1 { + iter.ready = false + go iter.populateNextNonEmpty() + } + return iter.queryRunner(currentMinute.time, currentMinute.time.Add(time.Minute)) +} + +func (iter *QueryIterator[OK, ERR]) Prev() (OK, ERR) { + if iter.index > 0 { + iter.index-- + } + currentMinute := iter.windows[iter.index] + return iter.queryRunner(currentMinute.time, currentMinute.time.Add(time.Minute)) +} + +func nextMinute(current time.Time, ascending bool) time.Time { + if ascending { + return current.Add(time.Minute) + } + return current.Add(-time.Minute) +} diff --git a/pkg/iterator/iterator_test.go b/pkg/iterator/iterator_test.go new file mode 100644 index 0000000..7a6369e --- /dev/null +++ b/pkg/iterator/iterator_test.go @@ -0,0 +1,200 @@ +// Copyright (c) 2023 Cloudnatively Services Pvt Ltd +// +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package iterator + +import ( + "testing" + "time" + + "golang.org/x/exp/slices" +) + +// dummy query provider can be instantiated with counts +type DummyQueryProvider struct { + state map[string]int +} + +func (d *DummyQueryProvider) StartTime() time.Time { + keys := make([]time.Time, 0, len(d.state)) + for k := range d.state { + parsedTime, _ := time.Parse(time.RFC822Z, k) + keys = append(keys, parsedTime) + } + return slices.MinFunc(keys, func(a time.Time, b time.Time) int { + return a.Compare(b) + }) +} + +func (d *DummyQueryProvider) EndTime() time.Time { + keys := make([]time.Time, 0, len(d.state)) + for k := range d.state { + parsedTime, _ := time.Parse(time.RFC822Z, k) + keys = append(keys, parsedTime) + } + maxTime := slices.MaxFunc(keys, func(a time.Time, b time.Time) int { + return a.Compare(b) + }) + + return maxTime.Add(time.Minute) +} + +func (*DummyQueryProvider) QueryRunnerFunc() func(time.Time, time.Time) ([]map[string]interface{}, error) { + return func(t1, t2 time.Time) ([]map[string]interface{}, error) { + return make([]map[string]interface{}, 0), nil + } +} + +func (d *DummyQueryProvider) HasDataFunc() func(time.Time, time.Time) bool { + return func(t1, t2 time.Time) bool { + val, isExists := d.state[t1.Format(time.RFC822Z)] + if isExists && val > 0 { + return true + } + return false + } +} + +func DefaultTestScenario() DummyQueryProvider { + return DummyQueryProvider{ + state: map[string]int{ + "02 Jan 06 15:04 +0000": 10, + "02 Jan 06 15:05 +0000": 0, + "02 Jan 06 15:06 +0000": 0, + "02 Jan 06 15:07 +0000": 10, + "02 Jan 06 15:08 +0000": 0, + "02 Jan 06 15:09 +0000": 3, + "02 Jan 06 15:10 +0000": 0, + "02 Jan 06 15:11 +0000": 0, + "02 Jan 06 15:12 +0000": 1, + }, + } +} + +func TestIteratorConstruct(t *testing.T) { + scenario := DefaultTestScenario() + iter := NewQueryIterator(scenario.StartTime(), scenario.EndTime(), true, scenario.QueryRunnerFunc(), scenario.HasDataFunc()) + + currentWindow := iter.windows[0] + if !(currentWindow.time == scenario.StartTime()) { + t.Fatalf("window time does not match start, expected %s, actual %s", scenario.StartTime().String(), currentWindow.time.String()) + } +} + +func TestIteratorAscending(t *testing.T) { + scenario := DefaultTestScenario() + iter := NewQueryIterator(scenario.StartTime(), scenario.EndTime(), true, scenario.QueryRunnerFunc(), scenario.HasDataFunc()) + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow := iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:04 +0000", currentWindow, t) + + // next should populate new window + if iter.finished == true { + t.Fatalf("Iter finished before expected") + } + if iter.ready == false { + t.Fatalf("Iter is not ready when it should be") + } + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow = iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:07 +0000", currentWindow, t) + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow = iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:09 +0000", currentWindow, t) + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow = iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:12 +0000", currentWindow, t) + + if iter.finished != true { + t.Fatalf("iter should be finished now but it is not") + } +} + +func TestIteratorDescending(t *testing.T) { + scenario := DefaultTestScenario() + iter := NewQueryIterator(scenario.StartTime(), scenario.EndTime(), false, scenario.QueryRunnerFunc(), scenario.HasDataFunc()) + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow := iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:12 +0000", currentWindow, t) + + // next should populate new window + if iter.finished == true { + t.Fatalf("Iter finished before expected") + } + if iter.ready == false { + t.Fatalf("Iter is not ready when it should be") + } + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow = iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:09 +0000", currentWindow, t) + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow = iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:07 +0000", currentWindow, t) + + iter.Next() + // busy loop waiting for iter to be ready + for !iter.Ready() { + } + + currentWindow = iter.windows[iter.index] + checkCurrentWindowIndex("02 Jan 06 15:04 +0000", currentWindow, t) + + if iter.finished != true { + t.Fatalf("iter should be finished now but it is not") + } +} + +func checkCurrentWindowIndex(expectedValue string, currentWindow MinuteCheckPoint, t *testing.T) { + expectedTime, _ := time.Parse(time.RFC822Z, expectedValue) + if !(currentWindow.time == expectedTime) { + t.Fatalf("window time does not match start, expected %s, actual %s", expectedTime.String(), currentWindow.time.String()) + } +} diff --git a/pkg/model/query.go b/pkg/model/query.go index 82d417a..91cca66 100644 --- a/pkg/model/query.go +++ b/pkg/model/query.go @@ -24,6 +24,10 @@ import ( "net/http" "os" "pb/pkg/config" + "pb/pkg/iterator" + "regexp" + "strings" + "sync" "time" "github.com/charmbracelet/bubbles/help" @@ -92,6 +96,11 @@ var ( key.NewBinding(key.WithKeys("ctrl+r"), key.WithHelp("ctrl r", "(re) run query")), } + pagiatorKeyBinds = []key.Binding{ + key.NewBinding(key.WithKeys("ctrl+r"), key.WithHelp("ctrl r", "Fetch Next Minute")), + key.NewBinding(key.WithKeys("ctrl+b"), key.WithHelp("ctrl b", "Fetch Prev Minute")), + } + QueryNavigationMap = []string{"query", "time", "table"} ) @@ -117,16 +126,17 @@ const ( ) type QueryModel struct { - width int - height int - table table.Model - query textarea.Model - timeRange TimeInputModel - profile config.Profile - help help.Model - status StatusBar - overlay uint - focused int + width int + height int + table table.Model + query textarea.Model + timeRange TimeInputModel + profile config.Profile + help help.Model + status StatusBar + queryIterator *iterator.QueryIterator[QueryData, FetchResult] + overlay uint + focused int } func (m *QueryModel) focusSelected() { @@ -145,6 +155,47 @@ func (m *QueryModel) currentFocus() string { return QueryNavigationMap[m.focused] } +func (m *QueryModel) initIterator() { + iter := createIteratorFromModel(m) + m.queryIterator = iter +} + +func createIteratorFromModel(m *QueryModel) *iterator.QueryIterator[QueryData, FetchResult] { + startTime := m.timeRange.start.Time() + endTime := m.timeRange.end.Time() + + startTime = startTime.Truncate(time.Minute) + endTime = endTime.Truncate(time.Minute).Add(time.Minute) + + regex := regexp.MustCompile(`^select\s+(?:\*|\w+(?:,\s*\w+)*)\s+from\s+(\w+)(?:\s+;)?$`) + matches := regex.FindStringSubmatch(m.query.Value()) + if matches == nil { + return nil + } + table := matches[1] + iter := iterator.NewQueryIterator( + startTime, endTime, + false, + func(t1, t2 time.Time) (QueryData, FetchResult) { + client := &http.Client{ + Timeout: time.Second * 50, + } + return fetchData(client, &m.profile, m.query.Value(), t1.UTC().Format(time.RFC3339), t2.UTC().Format(time.RFC3339)) + }, + func(t1, t2 time.Time) bool { + client := &http.Client{ + Timeout: time.Second * 50, + } + res, err := fetchData(client, &m.profile, "select count(*) as count from "+table, m.timeRange.StartValueUtc(), m.timeRange.EndValueUtc()) + if err == fetchErr { + return false + } + count := res.Records[0]["count"].(float64) + return count > 0 + }) + return &iter +} + func NewQueryModel(profile config.Profile, stream string, duration uint) QueryModel { w, h, _ := term.GetSize(int(os.Stdout.Fd())) @@ -184,22 +235,40 @@ func NewQueryModel(profile config.Profile, stream string, duration uint) QueryMo help := help.New() help.Styles.FullDesc = lipgloss.NewStyle().Foreground(FocusSecondry) - return QueryModel{ - width: w, - height: h, - table: table, - query: query, - timeRange: inputs, - overlay: overlayNone, - profile: profile, - help: help, - status: NewStatusBar(profile.URL, stream, w), + model := QueryModel{ + width: w, + height: h, + table: table, + query: query, + timeRange: inputs, + overlay: overlayNone, + profile: profile, + help: help, + queryIterator: nil, + status: NewStatusBar(profile.URL, stream, w), } + model.queryIterator = createIteratorFromModel(&model) + return model } func (m QueryModel) Init() tea.Cmd { - // Just return `nil`, which means "no I/O right now, please." - return NewFetchTask(m.profile, m.query.Value(), m.timeRange.StartValueUtc(), m.timeRange.EndValueUtc()) + return func() tea.Msg { + var ready sync.WaitGroup + ready.Add(1) + go func() { + m.initIterator() + for !m.queryIterator.Ready() { + time.Sleep(time.Millisecond * 100) + } + ready.Done() + }() + ready.Wait() + if m.queryIterator.Finished() { + return nil + } + + return IteratorNext(m.queryIterator)() + } } func (m QueryModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { @@ -256,7 +325,21 @@ func (m QueryModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // common keybind if msg.Type == tea.KeyCtrlR { m.overlay = overlayNone - return m, NewFetchTask(m.profile, m.query.Value(), m.timeRange.StartValueUtc(), m.timeRange.EndValueUtc()) + if m.queryIterator == nil { + return m, NewFetchTask(m.profile, m.query.Value(), m.timeRange.StartValueUtc(), m.timeRange.EndValueUtc()) + } + if m.queryIterator.Ready() && !m.queryIterator.Finished() { + return m, IteratorNext(m.queryIterator) + } + return m, nil + } + + if msg.Type == tea.KeyCtrlB { + m.overlay = overlayNone + if m.queryIterator.CanFetchPrev() { + return m, IteratorPrev(m.queryIterator) + } + return m, nil } switch msg.Type { @@ -269,12 +352,14 @@ func (m QueryModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { switch m.currentFocus() { case "query": m.query, cmd = m.query.Update(msg) + m.initIterator() case "table": m.table, cmd = m.table.Update(msg) } cmds = append(cmds, cmd) case overlayInputs: m.timeRange, cmd = m.timeRange.Update(msg) + m.initIterator() cmds = append(cmds, cmd) } } @@ -314,12 +399,33 @@ func (m QueryModel) View() string { BorderForeground(FocusPrimary) } + mainViewRenderElements := []string{lipgloss.JoinHorizontal(lipgloss.Top, queryOuter.Render(m.query.View()), timeOuter.Render(time)), tableOuter.Render(m.table.View())} + + if m.queryIterator != nil { + inactiveStyle := lipgloss.NewStyle().Foreground(StandardPrimary) + activeStyle := lipgloss.NewStyle().Foreground(FocusPrimary) + var line strings.Builder + + if m.queryIterator.CanFetchPrev() { + line.WriteString(activeStyle.Render("<<")) + } else { + line.WriteString(inactiveStyle.Render("<<")) + } + + fmt.Fprintf(&line, " %d of many ", m.table.TotalRows()) + + if m.queryIterator.Ready() && !m.queryIterator.Finished() { + line.WriteString(activeStyle.Render(">>")) + } else { + line.WriteString(inactiveStyle.Render(">>")) + } + + mainViewRenderElements = append(mainViewRenderElements, line.String()) + } + switch m.overlay { case overlayNone: - mainView = lipgloss.JoinVertical(lipgloss.Left, - lipgloss.JoinHorizontal(lipgloss.Top, queryOuter.Render(m.query.View()), timeOuter.Render(time)), - tableOuter.Render(m.table.View()), - ) + mainView = lipgloss.JoinVertical(lipgloss.Left, mainViewRenderElements...) switch m.currentFocus() { case "query": helpKeys = TextAreaHelpKeys{}.FullHelp() @@ -334,7 +440,13 @@ func (m QueryModel) View() string { mainView = m.timeRange.View() helpKeys = m.timeRange.FullHelp() } - helpKeys = append(helpKeys, additionalKeyBinds) + + if m.queryIterator != nil { + helpKeys = append(helpKeys, pagiatorKeyBinds) + } else { + helpKeys = append(helpKeys, additionalKeyBinds) + } + helpView = m.help.FullHelpView(helpKeys) helpHeight := lipgloss.Height(helpView) @@ -377,6 +489,46 @@ func NewFetchTask(profile config.Profile, query string, startTime string, endTim } } +func IteratorNext(iter *iterator.QueryIterator[QueryData, FetchResult]) func() tea.Msg { + return func() tea.Msg { + res := FetchData{ + status: fetchErr, + schema: []string{}, + data: []map[string]interface{}{}, + } + + data, status := iter.Next() + + if status == fetchOk { + res.data = data.Records + res.schema = data.Fields + res.status = fetchOk + } + + return res + } +} + +func IteratorPrev(iter *iterator.QueryIterator[QueryData, FetchResult]) func() tea.Msg { + return func() tea.Msg { + res := FetchData{ + status: fetchErr, + schema: []string{}, + data: []map[string]interface{}{}, + } + + data, status := iter.Prev() + + if status == fetchOk { + res.data = data.Records + res.schema = data.Fields + res.status = fetchOk + } + + return res + } +} + func fetchData(client *http.Client, profile *config.Profile, query string, startTime string, endTime string) (data QueryData, res FetchResult) { data = QueryData{} res = fetchErr