Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UpdatesOnly query option to fetch data for only changed keys #22108

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions agent/consul/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package consul

import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)

type dirEntFilter struct {
Expand Down Expand Up @@ -116,3 +118,19 @@ func FilterEntries(f Filter) int {
// Return the size of the slice
return dst
}

func FilterKVList(state *state.Store, ws memdb.WatchSet, prefix string, entMeta *acl.EnterpriseMeta,
queryMeta *structs.QueryMeta, queryOptions structs.QueryOptions) (uint64, structs.DirEntries, error) {
// If the UpdatesOnly query option is set, utilize MinQueryIndex to only
// get back entries that have a ModifyIndex >= than that value.
//
// MinQueryIndex represents the minimum query index for blocking queries,
// corresponding to the index HTTP query parameter and the WaitIndex
// client-side parameter, after deserialization.
minQueryIndex := uint64(0)
if queryOptions.UpdatesOnly {
minQueryIndex = queryOptions.MinQueryIndex
}

return state.KVSList(ws, prefix, entMeta, minQueryIndex)
}
2 changes: 1 addition & 1 deletion agent/consul/fsm/commands_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func TestFSM_TombstoneReap(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil, 0)
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/fsm/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestFSM_SnapshotRestore_CE(t *testing.T) {
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil, 0)
require.NoError(t, err)
require.EqualValues(t, 12, idx, "bad index")

Expand Down
14 changes: 7 additions & 7 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,16 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSList(ws, args.Key, &args.EnterpriseMeta)
index, entries, err := FilterKVList(state, ws, args.Key, &args.EnterpriseMeta, &reply.QueryMeta, args.QueryOptions)
if err != nil {
return err
}

total := len(ent)
ent = FilterDirEnt(authz, ent)
reply.QueryMeta.ResultsFilteredByACLs = total != len(ent)
total := len(entries)
entries = FilterDirEnt(authz, entries)
reply.QueryMeta.ResultsFilteredByACLs = total != len(entries)
Copy link
Author

@tanmayghai18 tanmayghai18 Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open q: similar to how we hydrate ResultsFilteredByACLs in the QueryMeta introduced by #11569, does it make sense to do the same thing here for filtering by query index?


if len(ent) == 0 {
if len(entries) == 0 {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
Expand All @@ -224,7 +224,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Entries = nil
} else {
reply.Index = index
reply.Entries = ent
reply.Entries = entries
}
return nil
})
Expand Down Expand Up @@ -259,7 +259,7 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.KVSList(ws, args.Prefix, &args.EnterpriseMeta)
index, entries, err := FilterKVList(state, ws, args.Prefix, &args.EnterpriseMeta, &reply.QueryMeta, args.QueryOptions)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions agent/consul/state/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func kvsGetTxn(tx ReadTxn,
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *Store) KVSList(ws memdb.WatchSet,
prefix string, entMeta *acl.EnterpriseMeta) (uint64, structs.DirEntries, error) {
prefix string, entMeta *acl.EnterpriseMeta, minQueryIndex uint64) (uint64, structs.DirEntries, error) {

tx := s.db.Txn(false)
defer tx.Abort()
Expand All @@ -221,18 +221,18 @@ func (s *Store) KVSList(ws memdb.WatchSet,
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}

return s.kvsListTxn(tx, ws, prefix, *entMeta)
return s.kvsListTxn(tx, ws, prefix, *entMeta, minQueryIndex)
}

// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *Store) kvsListTxn(tx ReadTxn,
ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta) (uint64, structs.DirEntries, error) {
ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta, minQueryIndex uint64) (uint64, structs.DirEntries, error) {

// Get the table indexes.
idx := kvsMaxIndex(tx, entMeta)

lindex, entries, err := kvsListEntriesTxn(tx, ws, prefix, entMeta)
lindex, entries, err := kvsListEntriesTxn(tx, ws, prefix, entMeta, minQueryIndex)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
Expand Down
7 changes: 5 additions & 2 deletions agent/consul/state/kvs_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) e
return nil
}

func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta) (uint64, structs.DirEntries, error) {
func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta, minQueryIndex uint64) (uint64, structs.DirEntries, error) {
var ents structs.DirEntries
var lindex uint64

Expand All @@ -72,7 +72,10 @@ func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta acl
// Gather all of the keys found
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
ents = append(ents, e)
// Filter out entries that have an old ModifyIndex (default=0, so keep everything)
if e.ModifyIndex >= minQueryIndex {
ents = append(ents, e)
}
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
Expand Down
Loading