Skip to content

Commit

Permalink
tempo-cli: support dropping multiple traces in a single operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ndk committed Nov 9, 2024
1 parent e19ab71 commit d9e4f30
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## main / unreleased
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)
* [CHANGE] **BREAKING CHANGE** Change the AWS Lambda serverless build tooling output from "main" to "bootstrap". Refer to https://aws.amazon.com/blogs/compute/migrating-aws-lambda-functions-from-the-go1-x-runtime-to-the-custom-runtime-on-amazon-linux-2/ for migration steps [#3852](https://github.com/grafana/tempo/pull/3852) (@zatlodan)
* [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210)
* [FEATURE] Export cost attribution usage metrics from distributor [#4162](https://github.com/grafana/tempo/pull/4162) (@mdisibio)
Expand Down
111 changes: 72 additions & 39 deletions cmd/tempo-cli/cmd-rewrite-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strconv"
"strings"

"github.com/go-kit/log"
"github.com/google/uuid"
Expand All @@ -19,16 +20,16 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
)

type dropTraceCmd struct {
type dropTracesCmd struct {
backendOptions

TraceID string `arg:"" help:"trace ID to retrieve"`
TraceIDs string `arg:"" help:"Trace IDs to drop"`
TenantID string `arg:"" help:"tenant ID to search"`
DropTrace bool `name:"drop-trace" help:"actually attempt to drop the trace" default:"false"`
}

func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
fmt.Printf("beginning process to drop trace %v from tenant %v\n", cmd.TraceID, cmd.TenantID)
func (cmd *dropTracesCmd) Run(opts *globalOptions) error {
fmt.Printf("beginning process to drop traces %v from tenant %v\n", cmd.TraceIDs, cmd.TenantID)
fmt.Println("**warning**: compaction must be disabled or a compactor may duplicate a block as this process is rewriting it")
fmt.Println("")
if cmd.DropTrace {
Expand All @@ -38,51 +39,74 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
fmt.Println("")
}

r, w, c, err := loadBackend(&cmd.backendOptions, ctx)
if err != nil {
return err
}
ctx := context.Background()

id, err := util.HexStringToTraceID(cmd.TraceID)
r, w, c, err := loadBackend(&cmd.backendOptions, opts)
if err != nil {
return err
}

blocks, err := blocksWithTraceID(context.Background(), r, cmd.TenantID, id)
if err != nil {
return err
type pair struct {
traceIDs []common.ID
blockMeta *backend.BlockMeta
}
tracesByBlock := map[backend.UUID]pair{}

if len(blocks) == 0 {
fmt.Println("\ntrace not found in any block. aborting")
return nil
}
// Group trace IDs by blocks
ids := strings.Split(cmd.TraceIDs, ",")
for _, id := range ids {
traceID, err := util.HexStringToTraceID(id)
if err != nil {
return err
}

// print out blocks that have the trace id
fmt.Println("\n\ntrace found in:")
for _, block := range blocks {
fmt.Printf(" %v sz: %d traces: %d\n", block.BlockID, block.Size_, block.TotalObjects)
}
// It might be significantly improved if common.BackendBlock supported bulk searches.
blocks, err := blocksWithTraceID(ctx, r, cmd.TenantID, traceID)
if err != nil {
return err
}

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
return nil
if len(blocks) == 0 {
fmt.Printf("\ntrace %s not found in any block. skipping\n", util.TraceIDToHexString(traceID))
}
for _, block := range blocks {
p, ok := tracesByBlock[block.BlockID]
if !ok {
p = pair{blockMeta: block}
}
p.traceIDs = append(p.traceIDs, traceID)
tracesByBlock[block.BlockID] = p
}
}

fmt.Println("rewriting blocks:")
for _, block := range blocks {
fmt.Printf(" rewriting %v\n", block.BlockID)
newBlock, err := rewriteBlock(context.Background(), r, w, block, id)
// Remove traces from blocks
for _, p := range tracesByBlock {
// print out trace IDs to be removed in the block
strTraceIDs := make([]string, len(p.traceIDs))
for i, tid := range p.traceIDs {
strTraceIDs[i] = util.TraceIDToHexString(tid)
}
fmt.Printf("\nFound %d traces: %v in block: %v\n", len(strTraceIDs), strTraceIDs, p.blockMeta.BlockID)
fmt.Printf("blockInfo: ID: %v, Size: %d Total Traces: %d\n", p.blockMeta.BlockID, p.blockMeta.Size_, p.blockMeta.TotalObjects)

if !cmd.DropTrace {
fmt.Println("**not dropping trace, use --drop-trace to actually drop**")
continue
}

fmt.Printf(" rewriting %v\n", p.blockMeta.BlockID)
newMeta, err := rewriteBlock(ctx, r, w, p.blockMeta, p.traceIDs)
if err != nil {
return err
}
fmt.Printf(" rewrote to new block: %v\n", newBlock.BlockID)
}
if newMeta == nil {
fmt.Printf(" block %v was removed\n", p.blockMeta.BlockID)
} else {
fmt.Printf(" rewrote to new block: %v\n", newMeta.BlockID)
}

fmt.Println("marking old blocks compacted")
for _, block := range blocks {
fmt.Printf(" marking %v\n", block.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(block.BlockID), block.TenantID)
fmt.Printf(" marking %v compacted\n", p.blockMeta.BlockID)
err = c.MarkBlockCompacted((uuid.UUID)(p.blockMeta.BlockID), p.blockMeta.TenantID)
if err != nil {
return err
}
Expand All @@ -93,7 +117,7 @@ func (cmd *dropTraceCmd) Run(ctx *globalOptions) error {
return nil
}

func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceID common.ID) (*backend.BlockMeta, error) {
func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta *backend.BlockMeta, traceIDs []common.ID) (*backend.BlockMeta, error) {
enc, err := encoding.FromVersion(meta.Version)
if err != nil {
return nil, fmt.Errorf("error getting encoder: %w", err)
Expand Down Expand Up @@ -131,7 +155,12 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta

// hook to drop the trace
DropObject: func(id common.ID) bool {
return bytes.Equal(id, traceID)
for _, tid := range traceIDs {
if bytes.Equal(id, tid) {
return true
}
}
return false
},

// setting to prevent panics. should we track and report these?
Expand All @@ -152,20 +181,24 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta
}

if len(out) != 1 {
if meta.TotalObjects == int64(len(traceIDs)) {
// we removed all traces from the block
return nil, nil
}
return nil, fmt.Errorf("expected 1 block, got %d", len(out))
}

newMeta := out[0]

if newMeta.TotalObjects != meta.TotalObjects-1 {
if newMeta.TotalObjects != meta.TotalObjects-int64(len(traceIDs)) {
return nil, fmt.Errorf("expected output to have one less object then in. out: %d in: %d", newMeta.TotalObjects, meta.TotalObjects)
}

return newMeta, nil
}

func blocksWithTraceID(ctx context.Context, r backend.Reader, tenantID string, traceID common.ID) ([]*backend.BlockMeta, error) {
blockIDs, _, err := r.Blocks(context.Background(), tenantID)
blockIDs, _, err := r.Blocks(ctx, tenantID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,7 +243,7 @@ func isInBlock(ctx context.Context, r backend.Reader, blockNum int, id uuid.UUID
fmt.Print(strconv.Itoa(blockNum))
}

meta, err := r.BlockMeta(context.Background(), id, tenantID)
meta, err := r.BlockMeta(ctx, id, tenantID)
if err != nil && !errors.Is(err, backend.ErrDoesNotExist) {
return nil, err
}
Expand Down
Loading

0 comments on commit d9e4f30

Please sign in to comment.