Skip to content

Commit

Permalink
add test and fix replay
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Nov 5, 2024
1 parent 3a0f62c commit 57f2ce3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
92 changes: 92 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
)

Expand Down Expand Up @@ -259,6 +261,96 @@ func TestSearchWAL(t *testing.T) {
require.Equal(t, uint32(1), results.Metrics.InspectedTraces)
}

func TestRediscoverLocalBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, traces, traceIDs := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks
ingester, _, _ = defaultIngester(t, tmpDir)

// should be able to find old traces that were replayed
for i, traceID := range traceIDs {
foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
require.NotNil(t, foundTrace.Trace)
trace.SortTrace(foundTrace.Trace)
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
}
}

func TestRediscoverDropsInvalidBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, _, _ := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks. there should be 1 block
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok := ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// now mangle a complete block
instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// this cheats by reaching into the internals of the block and overwriting the parquet file directly. if this test starts failing
// it could be b/c the block internals changed and this no longer breaks a block
block := instance.completeBlocks[0]
err := block.writer.Write(ctx, vparquet4.DataFileName, uuid.UUID(block.BlockMeta().BlockID), "test", []byte("mangled"), nil)
require.NoError(t, err)

// create new ingester. this should rediscover local blocks. there should be 0 blocks
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 0)
}

// TODO - This test is flaky and commented out until it's fixed
// TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed
// to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs
Expand Down
5 changes: 4 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,15 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er

// validate the block before adding it to the list. if we drop a block here and its not in the wal this is data loss, but there is no way to recover. this is likely due to disk
// level corruption
if err := b.Validate(ctx); err != nil {
err = b.Validate(ctx)
if err != nil && !errors.Is(err, common.ErrUnsupported) {
level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err)
err = i.local.ClearBlock(id, i.instanceID)
if err != nil {
return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err)
}

continue
}

ib := NewLocalBlock(ctx, b, i.local)
Expand Down

0 comments on commit 57f2ce3

Please sign in to comment.