Skip to content

Commit

Permalink
Add disk caching in ingester SearchTagValuesV2 for completed blocks (#…
Browse files Browse the repository at this point in the history
…4069)

* ingore /tmp dir in the root

* instrument localblock seacher methods

* insturment walBlock

* disk cache works locally

* fix fmt

* fix lint

* refactor the disk caching logic

* add tests and cleanup

* update CHANGELOG.md

* address review comments

* cacheKey without start and end
  • Loading branch information
electron0zero authored Sep 19, 2024
1 parent 9fdc032 commit cc6943a
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
private-key.key
integration/e2e/e2e_integration_test[0-9]*
.tempo.yaml
/tmp
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* **BREAKING CHANGE** tempo-query is no longer a jaeger instance with grpcPlugin. Its now a standalone server. Serving a grpc api for jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus)
Expand Down
155 changes: 134 additions & 21 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/segmentio/fasthash/fnv1a"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
Expand Down Expand Up @@ -235,7 +237,7 @@ func (i *instance) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequ
query := traceql.ExtractMatchers(req.Query)

searchBlock := func(ctx context.Context, s common.Searcher, spanName string) error {
ctx, span := tracer.Start(ctx, "instance.SearchTags."+spanName)
ctx, span := tracer.Start(ctx, "instance.SearchTagsV2."+spanName)
defer span.End()

if s == nil {
Expand Down Expand Up @@ -396,7 +398,9 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag

engine := traceql.NewEngine()

wg := boundedwaitgroup.New(20) // TODO: Make configurable
// we usually have 5-10 blocks on an ingester so cap of 20 is more than enough and usually more than the blocks
// we need to search, and this also acts as the limit on the amount of search load on the ingester.
wg := boundedwaitgroup.New(20)
var anyErr atomic.Error
var inspectedBlocks atomic.Int32
var maxBlocks int32
Expand All @@ -417,27 +421,104 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
}

query := traceql.ExtractMatchers(req.Query)
// cacheKey will be same for all blocks in a request so only compute it once
// NOTE: cacheKey tag name and query, so if we start respecting start and end, add them to the cacheKey
cacheKey := searchTagValuesV2CacheKey(req, limit, "cache_search_tagvaluesv2")

searchBlock := func(ctx context.Context, s common.Searcher) error {
// helper functions as closures, to access local variables
performSearch := func(ctx context.Context, s common.Searcher, collector *collector.DistinctValue[tempopb.TagValue]) error {
if traceql.IsEmptyQuery(query) {
return s.SearchTagValuesV2(ctx, tag, traceql.MakeCollectTagValueFunc(collector.Collect), common.DefaultSearchOptions())
}

// Otherwise, use the filtered search
fetcher := traceql.NewTagValuesFetcherWrapper(func(ctx context.Context, req traceql.FetchTagValuesRequest, cb traceql.FetchTagValuesCallback) error {
return s.FetchTagValues(ctx, req, cb, common.DefaultSearchOptions())
})

return engine.ExecuteTagValues(ctx, tag, query, traceql.MakeCollectTagValueFunc(collector.Collect), fetcher)
}

exitEarly := func() bool {
if anyErr.Load() != nil {
return nil // Early exit if any error has occurred
return true // Early exit if any error has occurred
}

if maxBlocks > 0 && inspectedBlocks.Inc() > maxBlocks {
return true
}

return false // Continue searching
}

searchBlock := func(ctx context.Context, s common.Searcher, spanName string) error {
ctx, span := tracer.Start(ctx, "instance.SearchTagValuesV2."+spanName)
defer span.End()

if exitEarly() {
return nil
}

// if the query is empty, use the old search
if traceql.IsEmptyQuery(query) {
return s.SearchTagValuesV2(ctx, tag, traceql.MakeCollectTagValueFunc(valueCollector.Collect), common.DefaultSearchOptions())
return performSearch(ctx, s, valueCollector)
}

searchBlockWithCache := func(ctx context.Context, b *LocalBlock, spanName string) error {
ctx, span := tracer.Start(ctx, "instance.SearchTagValuesV2."+spanName)
defer span.End()

if exitEarly() {
return nil
}

// otherwise use the filtered search
fetcher := traceql.NewTagValuesFetcherWrapper(func(ctx context.Context, req traceql.FetchTagValuesRequest, cb traceql.FetchTagValuesCallback) error {
return s.FetchTagValues(ctx, req, cb, common.DefaultSearchOptions())
})
// check the cache first
cacheData, err := b.GetDiskCache(ctx, cacheKey)
if err != nil {
// just log the error and move on...we will search the block
_ = level.Warn(log.Logger).Log("msg", "GetDiskCache failed", "err", err)
}

return engine.ExecuteTagValues(ctx, tag, query, traceql.MakeCollectTagValueFunc(valueCollector.Collect), fetcher)
// we got data...unmarshall, and add values to central collector
if len(cacheData) > 0 && err == nil {
resp := &tempopb.SearchTagValuesV2Response{}
err = proto.Unmarshal(cacheData, resp)
if err != nil {
return err
}
span.SetAttributes(attribute.Bool("cached", true))
for _, v := range resp.TagValues {
if valueCollector.Collect(*v) {
break // we have reached the limit, so stop
}
}
return nil
}

span.SetAttributes(attribute.Bool("cached", false))
// results not in cache, so search the block
// using a local collector to collect values from the block and set cache
localCol := collector.NewDistinctValue[tempopb.TagValue](limit, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })
localErr := performSearch(ctx, b, localCol)
if localErr != nil {
return localErr
}

// marshal the local collector and set the cache
values := localCol.Values()
valuesProto, err := valuesToTagValuesV2RespProto(values)
if err == nil && len(valuesProto) > 0 {
err2 := b.SetDiskCache(ctx, cacheKey, valuesProto)
if err2 != nil {
_ = level.Warn(log.Logger).Log("msg", "SetDiskCache failed", "err", err2)
}
}

// add values to the central collector
for _, v := range values {
if valueCollector.Collect(v) {
break // we have reached the limit, so stop
}
}
return nil
}

// head block
Expand All @@ -451,11 +532,9 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
if i.headBlock != nil {
wg.Add(1)
go func() {
ctx, span := tracer.Start(ctx, "instance.SearchTagValuesV2.headBlock")
defer span.End()
defer i.headBlockMtx.RUnlock()
defer wg.Done()
if err := searchBlock(ctx, i.headBlock); err != nil {
if err := searchBlock(ctx, i.headBlock, "headBlock"); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching head block (%s): %w", i.headBlock.BlockMeta().BlockID, err))
}
}()
Expand All @@ -469,10 +548,8 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
for _, b := range i.completeBlocks {
wg.Add(1)
go func(b *LocalBlock) {
ctx, span := tracer.Start(ctx, "instance.SearchTagValuesV2.completedBlock")
defer span.End()
defer wg.Done()
if err := searchBlock(ctx, b); err != nil {
if err := searchBlockWithCache(ctx, b, "completeBlocks"); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching complete block (%s): %w", b.BlockMeta().BlockID, err))
}
}(b)
Expand All @@ -482,10 +559,8 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
for _, b := range i.completingBlocks {
wg.Add(1)
go func(b common.WALBlock) {
ctx, span := tracer.Start(ctx, "instance.SearchTagValuesV2.completingBlock")
defer span.End()
defer wg.Done()
if err := searchBlock(ctx, b); err != nil {
if err := searchBlock(ctx, b, "completingBlocks"); err != nil {
anyErr.Store(fmt.Errorf("unexpected error searching completing block (%s): %w", b.BlockMeta().BlockID, err))
}
}(b)
Expand Down Expand Up @@ -522,3 +597,41 @@ func includeBlock(b *backend.BlockMeta, req *tempopb.SearchRequest) bool {

return b.StartTime.Unix() <= end && b.EndTime.Unix() >= start
}

func searchTagValuesV2CacheKey(req *tempopb.SearchTagValuesRequest, limit int, prefix string) string {
query := req.Query
if req.Query != "" {
ast, err := traceql.Parse(req.Query)
if err != nil { // this should never happen but in case it happens
return ""
}
// forces the query into a canonical form
query = ast.String()
}

// NOTE: we are not adding req.Start and req.End to the cache key because we don't respect the start and end
// please add them to cacheKey if we start respecting them
h := fnv1a.HashString64(req.TagName)
h = fnv1a.AddString64(h, query)
h = fnv1a.AddUint64(h, uint64(limit))

return fmt.Sprintf("%s_%v.buf", prefix, h)
}

// valuesToTagValuesV2RespProto converts TagValues to a protobuf marshalled bytes
// this is slightly modified version of valuesToV2Response from querier.go
func valuesToTagValuesV2RespProto(tagValues []tempopb.TagValue) ([]byte, error) {
resp := &tempopb.SearchTagValuesV2Response{}
resp.TagValues = make([]*tempopb.TagValue, 0, len(tagValues))

for _, v := range tagValues {
v2 := &v
resp.TagValues = append(resp.TagValues, v2)
}

data, err := proto.Marshal(resp)
if err != nil {
return nil, err
}
return data, nil
}
31 changes: 31 additions & 0 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,21 @@ func TestInstanceSearchTagAndValuesV2(t *testing.T) {
require.NoError(t, i.ClearCompletingBlock(blockID)) // Clear the completing block

testSearchTagsAndValuesV2(t, userCtx, i, tagKey, queryThatMatches, expectedTagValues, expectedEventTagValues, expectedLinkTagValues)

// test that we are creating cache files for search tag values v2
// check that we have cache files for all complete blocks for all the cache keys
limit := i.limiter.limits.MaxBytesPerTagValuesQuery("fake")
cacheKeys := cacheKeysForTestSearchTagValuesV2(tagKey, queryThatMatches, limit)
for _, cacheKey := range cacheKeys {
for _, b := range i.completeBlocks {
cache, err := b.GetDiskCache(context.Background(), cacheKey)
require.NoError(t, err)
require.NotEmpty(t, cache)
}
}

// test search is returning same results with cache
testSearchTagsAndValuesV2(t, userCtx, i, tagKey, queryThatMatches, expectedTagValues, expectedEventTagValues, expectedLinkTagValues)
}

// nolint:revive,unparam
Expand Down Expand Up @@ -427,6 +442,22 @@ func testSearchTagsAndValuesV2(
assert.Equal(t, expectedTagValues, tagValues)
}

func cacheKeysForTestSearchTagValuesV2(tagKey, query string, limit int) []string {
scopes := []string{"span", "event", "link", "instrumentation"}
cacheKeys := make([]string, 0, len(scopes))

for _, prefix := range scopes {
req := &tempopb.SearchTagValuesRequest{
TagName: fmt.Sprintf("%s.%s", prefix, tagKey),
Query: query,
}
cacheKey := searchTagValuesV2CacheKey(req, limit, "cache_search_tagvaluesv2")
cacheKeys = append(cacheKeys, cacheKey)
}

return cacheKeys
}

// TestInstanceSearchTagsSpecialCases tess that SearchTags errors on an unknown scope and
// returns known instrinics for the "intrinsic" scope
func TestInstanceSearchTagsSpecialCases(t *testing.T) {
Expand Down
51 changes: 50 additions & 1 deletion modules/ingester/local_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package ingester

import (
"context"
"errors"
"fmt"
"time"

"github.com/grafana/tempo/pkg/traceql"
"go.uber.org/atomic"

"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -26,7 +28,10 @@ type LocalBlock struct {
flushedTime atomic.Int64 // protecting flushedTime b/c it's accessed from the store on flush and from the ingester instance checking flush time
}

var _ common.Finder = (*LocalBlock)(nil)
var (
_ common.Finder = (*LocalBlock)(nil)
_ common.Searcher = (*LocalBlock)(nil)
)

// NewLocalBlock creates a local block
func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *local.Backend) *LocalBlock {
Expand Down Expand Up @@ -54,6 +59,36 @@ func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts commo
return c.BackendBlock.FindTraceByID(ctx, id, opts)
}

func (c *LocalBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.Search")
defer span.End()
return c.BackendBlock.Search(ctx, req, opts)
}

func (c *LocalBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attribute, cb common.TagValuesCallbackV2, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.SearchTagValuesV2")
defer span.End()
return c.BackendBlock.SearchTagValuesV2(ctx, tag, cb, opts)
}

func (c *LocalBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opts common.SearchOptions) (traceql.FetchSpansResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.Fetch")
defer span.End()
return c.BackendBlock.Fetch(ctx, req, opts)
}

func (c *LocalBlock) FetchTagValues(ctx context.Context, req traceql.FetchTagValuesRequest, cb traceql.FetchTagValuesCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.FetchTagValues")
defer span.End()
return c.BackendBlock.FetchTagValues(ctx, req, cb, opts)
}

func (c *LocalBlock) FetchTagNames(ctx context.Context, req traceql.FetchTagsRequest, cb traceql.FetchTagsCallback, opts common.SearchOptions) error {
ctx, span := tracer.Start(ctx, "LocalBlock.FetchTagNames")
defer span.End()
return c.BackendBlock.FetchTagNames(ctx, req, cb, opts)
}

// FlushedTime returns the time the block was flushed. Will return 0
//
// if the block was never flushed
Expand Down Expand Up @@ -90,3 +125,17 @@ func (c *LocalBlock) Write(ctx context.Context, w backend.Writer) error {
err = c.SetFlushed(ctx)
return err
}

func (c *LocalBlock) SetDiskCache(ctx context.Context, cacheKey string, data []byte) error {
return c.writer.Write(ctx, cacheKey, c.BlockMeta().BlockID, c.BlockMeta().TenantID, data, nil)
}

func (c *LocalBlock) GetDiskCache(ctx context.Context, cacheKey string) ([]byte, error) {
data, err := c.reader.Read(ctx, cacheKey, c.BlockMeta().BlockID, c.BlockMeta().TenantID, nil)
if errors.Is(err, backend.ErrDoesNotExist) {
// file doesn't exist, so it's a cache miss
return nil, nil
}

return data, err
}
1 change: 1 addition & 0 deletions tempodb/encoding/vparquet4/block_search_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (b *backendBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attrib
if err != nil {
return fmt.Errorf("unexpected error opening parquet file: %w", err)
}
// TODO(suraj): push this BytesRead to SLO middleware
defer func() { span.SetAttributes(attribute.Int64("inspectedBytes", int64(rr.BytesRead()))) }()

return searchTagValues(derivedCtx, tag, cb, pf, b.meta.DedicatedColumns)
Expand Down
Loading

0 comments on commit cc6943a

Please sign in to comment.