From ed2a9525884bc94d2660adab7bbbb9abbad8c386 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 09:44:01 -0400 Subject: [PATCH 1/5] Add validate method to block Signed-off-by: Joe Elliott --- tempodb/encoding/common/interfaces.go | 1 + tempodb/encoding/v2/backend_block.go | 4 ++ tempodb/encoding/v2/wal_block.go | 4 ++ tempodb/encoding/vparquet2/block.go | 4 ++ tempodb/encoding/vparquet2/wal_block.go | 4 ++ tempodb/encoding/vparquet3/block.go | 5 ++ tempodb/encoding/vparquet3/wal_block.go | 4 ++ tempodb/encoding/vparquet4/block.go | 42 ++++++++++++ tempodb/encoding/vparquet4/block_test.go | 81 ++++++++++++++++++++++++ tempodb/encoding/vparquet4/wal_block.go | 5 ++ 10 files changed, 154 insertions(+) create mode 100644 tempodb/encoding/vparquet4/block_test.go diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index 1ce00c99fe5..5d9446e63d4 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -100,6 +100,7 @@ type BackendBlock interface { Searcher BlockMeta() *backend.BlockMeta + Validate(ctx context.Context) error } type WALBlock interface { diff --git a/tempodb/encoding/v2/backend_block.go b/tempodb/encoding/v2/backend_block.go index 99ad1830b25..75608a984cc 100644 --- a/tempodb/encoding/v2/backend_block.go +++ b/tempodb/encoding/v2/backend_block.go @@ -175,3 +175,7 @@ func (b *BackendBlock) FetchTagValues(context.Context, traceql.FetchTagValuesReq func (b *BackendBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, traceql.FetchTagsCallback, common.MetricsCallback, common.SearchOptions) error { return common.ErrUnsupported } + +func (b *BackendBlock) Validate(_ context.Context) error { + return common.ErrUnsupported +} diff --git a/tempodb/encoding/v2/wal_block.go b/tempodb/encoding/v2/wal_block.go index 7f4f8aca7b3..5d264821b1a 100644 --- a/tempodb/encoding/v2/wal_block.go +++ b/tempodb/encoding/v2/wal_block.go @@ -315,6 +315,10 @@ func (a *walBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, trac return common.ErrUnsupported } +func (a *walBlock) Validate(context.Context) error { + return common.ErrUnsupported +} + func (a *walBlock) fullFilename() string { filename := a.fullFilenameSeparator("+") _, e1 := os.Stat(filename) diff --git a/tempodb/encoding/vparquet2/block.go b/tempodb/encoding/vparquet2/block.go index 93de508dfa3..317c4edc058 100644 --- a/tempodb/encoding/vparquet2/block.go +++ b/tempodb/encoding/vparquet2/block.go @@ -44,3 +44,7 @@ func (b *backendBlock) FetchTagValues(context.Context, traceql.FetchTagValuesReq func (b *backendBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, traceql.FetchTagsCallback, common.MetricsCallback, common.SearchOptions) error { return common.ErrUnsupported } + +func (b *backendBlock) Validate(context.Context) error { + return common.ErrUnsupported +} diff --git a/tempodb/encoding/vparquet2/wal_block.go b/tempodb/encoding/vparquet2/wal_block.go index 6b7f451b638..5f089e0fbb4 100644 --- a/tempodb/encoding/vparquet2/wal_block.go +++ b/tempodb/encoding/vparquet2/wal_block.go @@ -344,6 +344,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui return nil } +func (b *walBlock) Validate(context.Context) error { + return common.ErrUnsupported +} + func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) { now := time.Now() startOfRange := uint32(now.Add(-b.ingestionSlack).Unix()) diff --git a/tempodb/encoding/vparquet3/block.go b/tempodb/encoding/vparquet3/block.go index 55dd67fd66c..05f7a64f1ae 100644 --- a/tempodb/encoding/vparquet3/block.go +++ b/tempodb/encoding/vparquet3/block.go @@ -1,6 +1,7 @@ package vparquet3 import ( + "context" "sync" "github.com/grafana/tempo/tempodb/backend" @@ -33,3 +34,7 @@ func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock { func (b *backendBlock) BlockMeta() *backend.BlockMeta { return b.meta } + +func (b *backendBlock) Validate(context.Context) error { + return common.ErrUnsupported +} diff --git a/tempodb/encoding/vparquet3/wal_block.go b/tempodb/encoding/vparquet3/wal_block.go index 7d9e639e6e3..69c89d75867 100644 --- a/tempodb/encoding/vparquet3/wal_block.go +++ b/tempodb/encoding/vparquet3/wal_block.go @@ -355,6 +355,10 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui return nil } +func (b *walBlock) Validate(context.Context) error { + return common.ErrUnsupported +} + func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) { now := time.Now() startOfRange := uint32(now.Add(-b.ingestionSlack).Unix()) diff --git a/tempodb/encoding/vparquet4/block.go b/tempodb/encoding/vparquet4/block.go index f1da95471cc..467234abcf1 100644 --- a/tempodb/encoding/vparquet4/block.go +++ b/tempodb/encoding/vparquet4/block.go @@ -1,8 +1,13 @@ package vparquet4 import ( + "context" + "encoding/binary" + "errors" + "fmt" "sync" + "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" "go.opentelemetry.io/otel" @@ -33,3 +38,40 @@ func newBackendBlock(meta *backend.BlockMeta, r backend.Reader) *backendBlock { func (b *backendBlock) BlockMeta() *backend.BlockMeta { return b.meta } + +// Validate will do a basic sanity check of the state of the parquet file. This can be extended to do more checks in the future. +// This method should lean towards being cost effective over complete. +func (b *backendBlock) Validate(ctx context.Context) error { + if b.meta == nil { + return errors.New("block meta is nil") + } + + // read last 8 bytes of the file to confirm its at least complete. the last 4 should be ascii "PAR1" + // and the 4 bytes before that should be the length of the footer + buff := make([]byte, 8) + err := b.r.ReadRange(ctx, DataFileName, uuid.UUID(b.meta.BlockID), b.meta.TenantID, b.meta.Size_-8, buff, nil) + if err != nil { + return fmt.Errorf("failed to read parquet magic footer: %w", err) + } + + if string(buff[4:]) != "PAR1" { + return fmt.Errorf("invalid parquet magic footer: %x", buff[4:]) + } + + footerSize := int64(binary.LittleEndian.Uint32(buff[:4])) + if footerSize != int64(b.meta.FooterSize) { + return fmt.Errorf("unexpected parquet footer size: %d", footerSize) + } + + // read the first byte from all blooms to confirm they exist + buff = make([]byte, 1) + for i := 0; i < int(b.meta.BloomShardCount); i++ { + bloomName := common.BloomName(i) + err = b.r.ReadRange(ctx, bloomName, uuid.UUID(b.meta.BlockID), b.meta.TenantID, 0, buff, nil) + if err != nil { + return fmt.Errorf("failed to read first byte of bloom(%d): %w", i, err) + } + } + + return nil +} diff --git a/tempodb/encoding/vparquet4/block_test.go b/tempodb/encoding/vparquet4/block_test.go new file mode 100644 index 00000000000..6a07373f742 --- /dev/null +++ b/tempodb/encoding/vparquet4/block_test.go @@ -0,0 +1,81 @@ +package vparquet4 + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/stretchr/testify/require" +) + +func TestValidateFailsOnCorruptParquetFile(t *testing.T) { + ctx := context.Background() + block, w := validBlock(t) + meta := block.meta + + err := block.Validate(ctx) + require.NoError(t, err) + + // Corrupt the file + err = w.Write(ctx, DataFileName, uuid.UUID(meta.BlockID), meta.TenantID, []byte{0, 0, 0, 0, 0, 0, 0, 0}, nil) + require.NoError(t, err) + + err = block.Validate(ctx) + require.Error(t, err) +} + +func TestValidateFailsOnMissingBloom(t *testing.T) { + ctx := context.Background() + block, w := validBlock(t) + meta := block.meta + + err := block.Validate(ctx) + require.NoError(t, err) + + // remove a bloom + err = w.Delete(ctx, common.BloomName(0), backend.KeyPathForBlock(uuid.UUID(meta.BlockID), meta.TenantID)) + require.NoError(t, err) + + err = block.Validate(ctx) + require.Error(t, err) +} + +func validBlock(t *testing.T) (*backendBlock, backend.Writer) { + t.Helper() + + ctx := context.Background() + + rawR, rawW, _, err := local.New(&local.Config{ + Path: t.TempDir(), + }) + require.NoError(t, err) + + r := backend.NewReader(rawR) + w := backend.NewWriter(rawW) + + iter := newTestIterator() + + iter.Add(test.MakeTrace(10, nil), 100, 401) + iter.Add(test.MakeTrace(10, nil), 101, 402) + iter.Add(test.MakeTrace(10, nil), 102, 403) + + cfg := &common.BlockConfig{ + BloomFP: 0.01, + BloomShardSizeBytes: 100 * 1024, + } + + meta := backend.NewBlockMeta("fake", uuid.New(), VersionString, backend.EncNone, "") + meta.TotalObjects = 1 + meta.StartTime = time.Unix(300, 0) + meta.EndTime = time.Unix(305, 0) + + outMeta, err := CreateBlock(ctx, cfg, meta, iter, r, w) + require.NoError(t, err) + + return newBackendBlock(outMeta, r), w +} diff --git a/tempodb/encoding/vparquet4/wal_block.go b/tempodb/encoding/vparquet4/wal_block.go index 2fc8fcb2934..627d1180c2a 100644 --- a/tempodb/encoding/vparquet4/wal_block.go +++ b/tempodb/encoding/vparquet4/wal_block.go @@ -355,6 +355,11 @@ func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end ui return nil } +// TODO: potentially add validation to wal blocks and use in the wal replay code in the ingester. +func (b *walBlock) Validate(context.Context) error { + return common.ErrUnsupported +} + // It controls the block start/end date as a sliding window. func (b *walBlock) adjustTimeRangeForSlack(start, end uint32) (uint32, uint32) { now := time.Now() From 2643445fd51151910bcd316489bab22134e5441b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 09:49:43 -0400 Subject: [PATCH 2/5] Add Validate usage in the ingester Signed-off-by: Joe Elliott --- .../generator/processor/localblocks/processor_test.go | 2 ++ modules/ingester/instance.go | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/modules/generator/processor/localblocks/processor_test.go b/modules/generator/processor/localblocks/processor_test.go index fcd90aa35c4..c7be41cde46 100644 --- a/modules/generator/processor/localblocks/processor_test.go +++ b/modules/generator/processor/localblocks/processor_test.go @@ -361,3 +361,5 @@ func (m *mockBlock) FetchTagNames(context.Context, traceql.FetchTagsRequest, tra } func (m *mockBlock) BlockMeta() *backend.BlockMeta { return m.meta } + +func (m *mockBlock) Validate(context.Context) error { return nil } diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 5d467e1e3fa..3d03736478e 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -597,7 +597,6 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er var rediscoveredBlocks []*LocalBlock for _, id := range ids { - // Ignore blocks that have a matching wal. The wal will be replayed and the local block recreated. // NOTE - Wal replay must be done beforehand. if hasWal(id) { @@ -629,6 +628,16 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er return nil, err } + // validate the block before adding it to the list. if we drop a block here and its not in the wal this is data loss, but there is no way to recover. this is likely due to disk + // level corruption + if err := b.Validate(ctx); err != nil { + level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err) + err = i.local.ClearBlock(id, i.instanceID) + if err != nil { + return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err) + } + } + ib := NewLocalBlock(ctx, b, i.local) rediscoveredBlocks = append(rediscoveredBlocks, ib) From 3a0f62cdc0b98feadaa0c8388fd98fa4eb5eb3aa Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 09:52:36 -0400 Subject: [PATCH 3/5] changelog Signed-off-by: Joe Elliott --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1202c163dfd..c42618ff6e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137) * [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero) * [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137] +* [ENHANCEMENT] Add completed block validation on startup.[#4256](https://github.com/grafana/tempo/pull/4256) (@joe-elliott) * [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero) * [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero) * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) From 57f2ce318366e4ccfe8d05e375715382b3db3dd7 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 5 Nov 2024 16:28:20 -0500 Subject: [PATCH 4/5] add test and fix replay Signed-off-by: Joe Elliott --- modules/ingester/ingester_test.go | 92 +++++++++++++++++++++++++++++++ modules/ingester/instance.go | 5 +- 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 6aafa5c5c9c..d05c5ba36fd 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" + "github.com/google/uuid" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" @@ -32,6 +33,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/grafana/tempo/tempodb/encoding/vparquet4" "github.com/grafana/tempo/tempodb/wal" ) @@ -259,6 +261,96 @@ func TestSearchWAL(t *testing.T) { require.Equal(t, uint32(1), results.Metrics.InspectedTraces) } +func TestRediscoverLocalBlocks(t *testing.T) { + tmpDir := t.TempDir() + + ctx := user.InjectOrgID(context.Background(), "test") + ingester, traces, traceIDs := defaultIngester(t, tmpDir) + + // force cut all traces + for _, instance := range ingester.instances { + err := instance.CutCompleteTraces(0, true) + require.NoError(t, err, "unexpected error cutting traces") + } + + // force complete all blocks + for _, instance := range ingester.instances { + blockID, err := instance.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + + err = instance.CompleteBlock(context.Background(), blockID) + require.NoError(t, err) + + err = instance.ClearCompletingBlock(blockID) + require.NoError(t, err) + } + + // create new ingester. this should rediscover local blocks + ingester, _, _ = defaultIngester(t, tmpDir) + + // should be able to find old traces that were replayed + for i, traceID := range traceIDs { + foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{ + TraceID: traceID, + }) + require.NoError(t, err, "unexpected error querying") + require.NotNil(t, foundTrace.Trace) + trace.SortTrace(foundTrace.Trace) + equal := proto.Equal(traces[i], foundTrace.Trace) + require.True(t, equal) + } +} + +func TestRediscoverDropsInvalidBlocks(t *testing.T) { + tmpDir := t.TempDir() + + ctx := user.InjectOrgID(context.Background(), "test") + ingester, _, _ := defaultIngester(t, tmpDir) + + // force cut all traces + for _, instance := range ingester.instances { + err := instance.CutCompleteTraces(0, true) + require.NoError(t, err, "unexpected error cutting traces") + } + + // force complete all blocks + for _, instance := range ingester.instances { + blockID, err := instance.CutBlockIfReady(0, 0, true) + require.NoError(t, err) + + err = instance.CompleteBlock(context.Background(), blockID) + require.NoError(t, err) + + err = instance.ClearCompletingBlock(blockID) + require.NoError(t, err) + } + + // create new ingester. this should rediscover local blocks. there should be 1 block + ingester, _, _ = defaultIngester(t, tmpDir) + + instance, ok := ingester.instances["test"] + require.True(t, ok) + require.Len(t, instance.completeBlocks, 1) + + // now mangle a complete block + instance, ok = ingester.instances["test"] + require.True(t, ok) + require.Len(t, instance.completeBlocks, 1) + + // this cheats by reaching into the internals of the block and overwriting the parquet file directly. if this test starts failing + // it could be b/c the block internals changed and this no longer breaks a block + block := instance.completeBlocks[0] + err := block.writer.Write(ctx, vparquet4.DataFileName, uuid.UUID(block.BlockMeta().BlockID), "test", []byte("mangled"), nil) + require.NoError(t, err) + + // create new ingester. this should rediscover local blocks. there should be 0 blocks + ingester, _, _ = defaultIngester(t, tmpDir) + + instance, ok = ingester.instances["test"] + require.True(t, ok) + require.Len(t, instance.completeBlocks, 0) +} + // TODO - This test is flaky and commented out until it's fixed // TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed // to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 3d03736478e..ee1973a1826 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -630,12 +630,15 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er // validate the block before adding it to the list. if we drop a block here and its not in the wal this is data loss, but there is no way to recover. this is likely due to disk // level corruption - if err := b.Validate(ctx); err != nil { + err = b.Validate(ctx) + if err != nil && !errors.Is(err, common.ErrUnsupported) { level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err) err = i.local.ClearBlock(id, i.instanceID) if err != nil { return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err) } + + continue } ib := NewLocalBlock(ctx, b, i.local) From f0364d195d6495f14ce0ad46d764c6a2003afc87 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 5 Nov 2024 16:29:55 -0500 Subject: [PATCH 5/5] increment metric Signed-off-by: Joe Elliott --- modules/ingester/instance.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index ee1973a1826..2ab44c0af08 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -633,6 +633,8 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er err = b.Validate(ctx) if err != nil && !errors.Is(err, common.ErrUnsupported) { level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err) + metricReplayErrorsTotal.WithLabelValues(i.instanceID).Inc() + err = i.local.ClearBlock(id, i.instanceID) if err != nil { return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err)