Skip to content

Commit

Permalink
Add UpdatesOnly query option to fetch data for only changed keys
Browse files Browse the repository at this point in the history
As mentioned long ago (and roughly outlined) in #2791, a limitation on blocking queries is that all K/V entries matching that prefix are returned, even though only 1 key may have changed. This diff adds a new `UpdatesOnly` option, that when set, introduces server-side filtering to remove all keys/entries (from a `Keys()` or `List()` query) that have an old `ModifyIndex`.

It uses `MinQueryIndex` as the threshold here, which follows the effective serialization/deserialization chain: `WaitIndex` --> `index` --> `MinQueryIndex`.
- `WaitIndex` being the client-side parameter, determining the index the "client" waits for on a blocking query
- `index` being the serialized HTTP query parameter representing the `WaitIndex`
- `MinQueryIndex` being the server-side struct (deserialized result of `index`)

While we (potentially?) wait for the streaming backend: https://developer.hashicorp.com/consul/api-docs/features/blocking#streaming-backend to be implemented for KV endpoints, this can be a good stop-gap solution to enable in-memory, incremental scans or queries over a prefix.
  • Loading branch information
tanmayghai18 committed Jan 28, 2025
1 parent 4bca5c5 commit c9997a8
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 47 deletions.
18 changes: 18 additions & 0 deletions agent/consul/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package consul

import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)

type dirEntFilter struct {
Expand Down Expand Up @@ -116,3 +118,19 @@ func FilterEntries(f Filter) int {
// Return the size of the slice
return dst
}

func FilterKVList(state *state.Store, ws memdb.WatchSet, prefix string, entMeta *acl.EnterpriseMeta,
queryMeta *structs.QueryMeta, queryOptions structs.QueryOptions) (uint64, structs.DirEntries, error) {
// If the UpdatesOnly query option is set, utilize MinQueryIndex to only
// get back entries that have a ModifyIndex >= than that value.
//
// MinQueryIndex represents the minimum query index for blocking queries,
// corresponding to the index HTTP query parameter and the WaitIndex
// client-side parameter, after deserialization.
minQueryIndex := uint64(0)
if queryOptions.UpdatesOnly {
minQueryIndex = queryOptions.MinQueryIndex
}

return state.KVSList(ws, prefix, entMeta, minQueryIndex)
}
2 changes: 1 addition & 1 deletion agent/consul/fsm/commands_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func TestFSM_TombstoneReap(t *testing.T) {
t.Fatalf("err: %v", err)
}

idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil, 0)
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/fsm/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestFSM_SnapshotRestore_CE(t *testing.T) {
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil, 0)
require.NoError(t, err)
require.EqualValues(t, 12, idx, "bad index")

Expand Down
14 changes: 7 additions & 7 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,16 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSList(ws, args.Key, &args.EnterpriseMeta)
index, entries, err := FilterKVList(state, ws, args.Key, &args.EnterpriseMeta, &reply.QueryMeta, args.QueryOptions)
if err != nil {
return err
}

total := len(ent)
ent = FilterDirEnt(authz, ent)
reply.QueryMeta.ResultsFilteredByACLs = total != len(ent)
total := len(entries)
entries = FilterDirEnt(authz, entries)
reply.QueryMeta.ResultsFilteredByACLs = total != len(entries)

if len(ent) == 0 {
if len(entries) == 0 {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
Expand All @@ -224,7 +224,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Entries = nil
} else {
reply.Index = index
reply.Entries = ent
reply.Entries = entries
}
return nil
})
Expand Down Expand Up @@ -259,7 +259,7 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.KVSList(ws, args.Prefix, &args.EnterpriseMeta)
index, entries, err := FilterKVList(state, ws, args.Prefix, &args.EnterpriseMeta, &reply.QueryMeta, args.QueryOptions)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions agent/consul/state/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func kvsGetTxn(tx ReadTxn,
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *Store) KVSList(ws memdb.WatchSet,
prefix string, entMeta *acl.EnterpriseMeta) (uint64, structs.DirEntries, error) {
prefix string, entMeta *acl.EnterpriseMeta, minQueryIndex uint64) (uint64, structs.DirEntries, error) {

tx := s.db.Txn(false)
defer tx.Abort()
Expand All @@ -221,18 +221,18 @@ func (s *Store) KVSList(ws memdb.WatchSet,
entMeta = structs.DefaultEnterpriseMetaInDefaultPartition()
}

return s.kvsListTxn(tx, ws, prefix, *entMeta)
return s.kvsListTxn(tx, ws, prefix, *entMeta, minQueryIndex)
}

// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *Store) kvsListTxn(tx ReadTxn,
ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta) (uint64, structs.DirEntries, error) {
ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta, minQueryIndex uint64) (uint64, structs.DirEntries, error) {

// Get the table indexes.
idx := kvsMaxIndex(tx, entMeta)

lindex, entries, err := kvsListEntriesTxn(tx, ws, prefix, entMeta)
lindex, entries, err := kvsListEntriesTxn(tx, ws, prefix, entMeta, minQueryIndex)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
Expand Down
7 changes: 5 additions & 2 deletions agent/consul/state/kvs_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) e
return nil
}

func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta) (uint64, structs.DirEntries, error) {
func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta acl.EnterpriseMeta, minQueryIndex uint64) (uint64, structs.DirEntries, error) {
var ents structs.DirEntries
var lindex uint64

Expand All @@ -72,7 +72,10 @@ func kvsListEntriesTxn(tx ReadTxn, ws memdb.WatchSet, prefix string, entMeta acl
// Gather all of the keys found
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
ents = append(ents, e)
// Filter out entries that have an old ModifyIndex (default=0, so keep everything)
if e.ModifyIndex >= minQueryIndex {
ents = append(ents, e)
}
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
Expand Down
Loading

0 comments on commit c9997a8

Please sign in to comment.