Skip to content

Commit

Permalink
feat: tempo-cli: add support for dropping multiple traces in single o…
Browse files Browse the repository at this point in the history
…peration
  • Loading branch information
ndk committed Nov 3, 2024
1 parent e19ab71 commit f66c463
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 34 deletions.
76 changes: 42 additions & 34 deletions cmd/tempo-cli/cmd-rewrite-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strconv"
"strings"

"github.com/go-kit/log"
"github.com/google/uuid"
Expand Down Expand Up @@ -43,48 +44,55 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
return err
}

id, err := util.HexStringToTraceID(cmd.TraceID)
if err != nil {
return err
traceIDs := strings.Split(cmd.TraceID, ",")
ids := make([]common.ID, 0, len(traceIDs))
for _, traceID := range traceIDs {
id, err := util.HexStringToTraceID(traceID)
if err != nil {
return err
}
ids = append(ids, id)
}

blocks, err := blocksWithTraceID(context.Background(), r, cmd.TenantID, id)
if err != nil {
return err
}
for _, id := range ids {
blocks, err := blocksWithTraceID(context.Background(), r, cmd.TenantID, id)
if err != nil {
return err
}

if len(blocks) == 0 {
fmt.Println("\ntrace not found in any block. aborting")
return nil
}
if len(blocks) == 0 {
fmt.Println("\ntrace not found in any block. skipping")
continue
}

// print out blocks that have the trace id
fmt.Println("\n\ntrace found in:")
for _, block := range blocks {
fmt.Printf(" %v sz: %d traces: %d\n", block.BlockID, block.Size_, block.TotalObjects)
}
// print out blocks that have the trace id
fmt.Println("\n\ntrace found in:")
for _, block := range blocks {
fmt.Printf(" %v sz: %d traces: %d\n", block.BlockID, block.Size_, block.TotalObjects)
}

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
return nil
}
if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
continue
}

fmt.Println("rewriting blocks:")
for _, block := range blocks {
fmt.Printf(" rewriting %v\n", block.BlockID)
newBlock, err := rewriteBlock(context.Background(), r, w, block, id)
if err != nil {
return err
fmt.Println("rewriting blocks:")
for _, block := range blocks {
fmt.Printf(" rewriting %v\n", block.BlockID)
newBlock, err := rewriteBlock(context.Background(), r, w, block, id)
if err != nil {
return err
}
fmt.Printf(" rewrote to new block: %v\n", newBlock.BlockID)
}
fmt.Printf(" rewrote to new block: %v\n", newBlock.BlockID)
}

fmt.Println("marking old blocks compacted")
for _, block := range blocks {
fmt.Printf(" marking %v\n", block.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(block.BlockID), block.TenantID)
if err != nil {
return err
fmt.Println("marking old blocks compacted")
for _, block := range blocks {
fmt.Printf(" marking %v\n", block.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(block.BlockID), block.TenantID)
if err != nil {
return err
}
}
}

Expand Down
127 changes: 127 additions & 0 deletions cmd/tempo-cli/cmd-rewrite-blocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package main

import (
"context"
"encoding/hex"
"fmt"
"os"
"strings"
"testing"

"github.com/parquet-go/parquet-go"
"github.com/stretchr/testify/require"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/parquetquery"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
)

func TestDropTraceCmd(t *testing.T) {
for s := 2; s < 10; s++ {
s := s
t.Run(fmt.Sprintf("drop every %d trace", s), func(t *testing.T) {
tempDir := t.TempDir()
err := os.CopyFS(tempDir, os.DirFS("./test-data2"))
require.NoError(t, err)

cmd := dropTraceCmd{
backendOptions: backendOptions{
Backend: "local",
Bucket: tempDir,
},
TenantID: "single-tenant",
DropTrace: true,
}

before := getAllTraceIDs(t, cmd.backendOptions.Bucket, cmd.TenantID)

var toRemove []string
for i, traceID := range before {
if i%s == 0 {
toRemove = append(toRemove, traceID)
}
}
cmd.TraceID = strings.Join(toRemove, ",")

err = cmd.Run(&globalOptions{})
require.NoError(t, err)

after := getAllTraceIDs(t, cmd.backendOptions.Bucket, cmd.TenantID)
after = append(after, toRemove...)

require.ElementsMatch(t, before, after)
})
}
}

func getAllTraceIDs(t *testing.T, dir string, tenant string) []string {
t.Helper()

rawR, _, _, err := local.New(&local.Config{
Path: dir,
})
require.NoError(t, err)

reader := backend.NewReader(rawR)
ctx := context.Background()

tenants, err := reader.Tenants(ctx)
require.NoError(t, err)
require.Equal(t, []string{tenant}, tenants)

blocks, _, err := reader.Blocks(ctx, tenant)
require.NoError(t, err)

var traceIDs []string
for _, block := range blocks {
meta, err := reader.BlockMeta(ctx, block, tenant)
require.NoError(t, err)
rr := vparquet4.NewBackendReaderAt(ctx, reader, vparquet4.DataFileName, meta)
br := tempo_io.NewBufferedReaderAt(rr, int64(meta.Size_), 2*1024*1024, 64)
parquetSchema := parquet.SchemaOf(&vparquet4.Trace{})
o := []parquet.FileOption{
parquet.SkipBloomFilters(true),
parquet.SkipPageIndex(true),
parquet.FileSchema(parquetSchema),
parquet.FileReadMode(parquet.ReadModeAsync),
}
pf, err := parquet.OpenFile(br, int64(meta.Size_), o...)
require.NoError(t, err)
r := parquet.NewReader(pf, parquetSchema)
defer func() {
err := r.Close()
require.NoError(t, err)
}()
traceIDIndex, _ := parquetquery.GetColumnIndexByPath(pf, vparquet4.TraceIDColumnName)
require.GreaterOrEqual(t, traceIDIndex, 0)
defer func() {
err := r.Close()
require.NoError(t, err)
}()

rows := make([]parquet.Row, r.NumRows())
n, err := r.ReadRows(rows)
require.NoError(t, err)
require.Len(t, rows, n)

getTraceID := func(row parquet.Row) common.ID {
for _, v := range row {
if v.Column() == traceIDIndex {
return v.ByteArray()
}
}

return nil
}

for _, row := range rows {
traceID := getTraceID(row)
traceIDs = append(traceIDs, hex.EncodeToString(traceID))
}
}

return traceIDs
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rowGroups":["6IyULoGuhkdAfMbAitQ5UQ=="]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format":"vParquet4","blockID":"54aa0b42-d9e3-40ae-ab2e-3f96b2036232","tenantID":"single-tenant","startTime":"2024-11-03T12:50:54Z","endTime":"2024-11-03T12:51:57Z","totalObjects":10,"size":59698,"compactionLevel":0,"encoding":"none","indexPageSize":0,"totalRecords":1,"dataEncoding":"","bloomShards":1,"footerSize":17870}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rowGroups":["+dkSCCdj5rqDenl14oO13Q=="]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format":"vParquet4","blockID":"715bd008-5ae1-4c5d-a020-53b7313c2b0c","tenantID":"single-tenant","startTime":"2024-11-03T12:50:55Z","endTime":"2024-11-03T12:51:52Z","totalObjects":5,"size":47783,"compactionLevel":0,"encoding":"none","indexPageSize":0,"totalRecords":1,"dataEncoding":"","bloomShards":1,"footerSize":17770}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rowGroups":["9gWrxA79L52xrT54Fi9mJw=="]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format":"vParquet4","blockID":"c498e1b0-3d66-465a-92a7-6e2ad0176465","tenantID":"single-tenant","startTime":"2024-11-03T12:50:56Z","endTime":"2024-11-03T12:51:42Z","totalObjects":5,"size":48843,"compactionLevel":0,"encoding":"none","indexPageSize":0,"totalRecords":1,"dataEncoding":"","bloomShards":1,"footerSize":17718}
1 change: 1 addition & 0 deletions cmd/tempo-cli/test-data2/tempo_cluster_seed.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"UID":"990fa882-530a-4c44-899d-e7011e8d673a","created_at":"2024-11-03T12:50:59.446432341Z","version":{"version":"v2.6.1","revision":"e19ab7152","branch":"main","buildUser":"","buildDate":"","goVersion":"go1.23.2"}}

0 comments on commit f66c463

Please sign in to comment.