diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index fc1e1d06a18..e83d955234b 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -658,7 +658,7 @@ func (t *App) setupModuleManager() error { // composite targets SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor, MetricsGenerator}, ScalableSingleBinary: {SingleBinary}, - //GeneratorBuilder: {MetricsGenerator, BlockBuilder}, + // GeneratorBuilder: {MetricsGenerator, BlockBuilder}, } for mod, targets := range deps { diff --git a/integration/e2e/ingest/ingest_storage_test.go b/integration/e2e/ingest/ingest_storage_test.go index 3173a608743..8cbf5f4ada0 100644 --- a/integration/e2e/ingest/ingest_storage_test.go +++ b/integration/e2e/ingest/ingest_storage_test.go @@ -104,5 +104,4 @@ func TestIngest(t *testing.T) { require.NoError(t, err) util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) - } diff --git a/integration/e2e/ingest/kafka.go b/integration/e2e/ingest/kafka.go index 3caa78fc6fd..be39258c384 100644 --- a/integration/e2e/ingest/kafka.go +++ b/integration/e2e/ingest/kafka.go @@ -39,7 +39,7 @@ func NewKafka() *e2e.HTTPService { "kafka", kafkaImage, e2e.NewCommand("/etc/confluent/docker/run"), - //e2e.NewCmdReadinessProbe(e2e.NewCommand("kafka-topics", "--bootstrap-server", "broker:29092", "--list")), + // e2e.NewCmdReadinessProbe(e2e.NewCommand("kafka-topics", "--bootstrap-server", "broker:29092", "--list")), e2e.NewCmdReadinessProbe(e2e.NewCommand("sh", "-c", "nc -z localhost 9092 || exit 1")), // TODO: A bit unstable, sometimes it fails 9092, 29092, diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go new file mode 100644 index 00000000000..cb495a953a5 --- /dev/null +++ b/modules/blockbuilder/blockbuilder_test.go @@ -0,0 +1,159 @@ +package blockbuilder + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/grafana/tempo/modules/storage" + "github.com/grafana/tempo/pkg/ingest" + "github.com/grafana/tempo/pkg/ingest/testkafka" + "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/grafana/tempo/tempodb/wal" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/atomic" +) + +const testTopic = "test-topic" + +func TestBlockbuilder(t *testing.T) { + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(errors.New("test done")) }) + + k, address := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test-topic") + t.Cleanup(k.Close) + + kafkaCommits := atomic.NewInt32(0) + k.ControlKey(kmsg.OffsetCommit.Int16(), func(kmsg.Request) (kmsg.Response, error, bool) { + kafkaCommits.Add(1) + return nil, nil, false + }) + + store := newStore(t) + store.EnablePolling(ctx, nil) + cfg := blockbuilderConfig(t, address) + + b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + require.NoError(t, services.StartAndAwaitRunning(ctx, b)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) + }) + + req := test.MakePushBytesRequest(t, 10, nil) + records, err := ingest.Encode(0, util.FakeTenantID, req, 1_000_000) + require.NoError(t, err) + + client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) + + res := client.ProduceSync(ctx, records...) + require.NoError(t, res.FirstErr()) + + // Wait for record to be consumed and committed. + require.Eventually(t, func() bool { + return kafkaCommits.Load() > 0 + }, time.Minute, time.Second) + + // Wait for the block to be flushed. + require.Eventually(t, func() bool { + return len(store.BlockMetas(util.FakeTenantID)) == 1 + }, time.Minute, time.Second) +} + +func blockbuilderConfig(_ *testing.T, address string) Config { + cfg := Config{} + flagext.DefaultValues(&cfg) + + flagext.DefaultValues(&cfg.blockConfig) + + flagext.DefaultValues(&cfg.IngestStorageConfig.Kafka) + cfg.IngestStorageConfig.Kafka.Address = address + cfg.IngestStorageConfig.Kafka.Topic = testTopic + cfg.IngestStorageConfig.Kafka.ConsumerGroup = "test-consumer-group" + + cfg.AssignedPartitions = []int32{0} + cfg.LookbackOnNoCommit = 1 * time.Minute + cfg.ConsumeCycleDuration = 5 * time.Second + + return cfg +} + +func newStore(t *testing.T) storage.Store { + tmpDir := t.TempDir() + s, err := storage.NewStore(storage.Config{ + Trace: tempodb.Config{ + Backend: backend.Local, + Local: &local.Config{ + Path: tmpDir, + }, + Block: &common.BlockConfig{ + IndexDownsampleBytes: 2, + BloomFP: 0.01, + BloomShardSizeBytes: 100_000, + Version: encoding.LatestEncoding().Version(), + Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: tmpDir, + }, + BlocklistPoll: 5 * time.Second, + BlocklistPollFallback: true, + }, + }, nil, test.NewTestingLogger(t)) + require.NoError(t, err) + return s +} + +var _ ring.PartitionRingReader = (*mockPartitionRingReader)(nil) + +func newPartitionRingReader() *mockPartitionRingReader { + return &mockPartitionRingReader{ + r: ring.NewPartitionRing(ring.PartitionRingDesc{ + Partitions: map[int32]ring.PartitionDesc{ + 0: {State: ring.PartitionActive}, + }, + }), + } +} + +type mockPartitionRingReader struct { + r *ring.PartitionRing +} + +func (m *mockPartitionRingReader) PartitionRing() *ring.PartitionRing { + return m.r +} + +var _ Overrides = (*mockOverrides)(nil) + +type mockOverrides struct { + dc backend.DedicatedColumns +} + +func (m *mockOverrides) DedicatedColumns(_ string) backend.DedicatedColumns { return m.dc } + +func newKafkaClient(t *testing.T, config ingest.KafkaConfig) *kgo.Client { + writeClient, err := kgo.NewClient( + kgo.SeedBrokers(config.Address), + kgo.AllowAutoTopicCreation(), + kgo.DefaultProduceTopic(config.Topic), + // We will choose the partition of each record. + kgo.RecordPartitioner(kgo.ManualPartitioner()), + ) + require.NoError(t, err) + t.Cleanup(writeClient.Close) + + return writeClient +} diff --git a/modules/blockbuilder/config.go b/modules/blockbuilder/config.go index a5dbcb437c3..8fd79cf9cbc 100644 --- a/modules/blockbuilder/config.go +++ b/modules/blockbuilder/config.go @@ -16,6 +16,10 @@ type BlockConfig struct { BlockCfg common.BlockConfig `yaml:"-,inline"` } +func (c *BlockConfig) RegisterFlags(f *flag.FlagSet) { + c.RegisterFlagsAndApplyDefaults("", f) +} + func (c *BlockConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.Uint64Var(&c.MaxBlockBytes, prefix+".max-block-bytes", 20*1024*1024, "Maximum size of a block.") // TODO - Review default @@ -38,6 +42,10 @@ func (c *Config) Validate() error { return nil } +func (c *Config) RegisterFlags(f *flag.FlagSet) { + c.RegisterFlagsAndApplyDefaults("", f) +} + func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.Var(newPartitionAssignmentVar(&c.AssignedPartitions), prefix+".assigned-partitions", "List of partitions assigned to this block builder.") f.DurationVar(&c.ConsumeCycleDuration, prefix+".consume-cycle-duration", 5*time.Minute, "Interval between consumption cycles.") diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index 2edf20ca4f3..549e1467411 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -43,7 +43,11 @@ func newPartitionProcessor(logger log.Logger, blockCfg BlockConfig, overrides Ov } func (p *writer) PushBytes(tenant string, req *tempopb.PushBytesRequest) error { - level.Info(p.logger).Log("msg", "pushing bytes", "tenant", tenant, "num_traces", len(req.Traces)) + level.Info(p.logger).Log( + "msg", "pushing bytes", + "tenant", tenant, + "num_traces", len(req.Traces), + ) i, err := p.instanceForTenant(tenant) if err != nil { diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index 24be2fa570c..45ee79c6151 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -12,6 +12,17 @@ import ( "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/wal" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "block_builder", + Name: "flushed_blocks", + }, + []string{"tenant_id"}, ) type tenantStore struct { @@ -27,7 +38,6 @@ type tenantStore struct { } func newTenantStore(tenantID string, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { - i := &tenantStore{ tenantID: tenantID, cfg: cfg, @@ -49,14 +59,11 @@ func (i *tenantStore) cutHeadBlock() error { i.walBlocks = append(i.walBlocks, i.headBlock) i.headBlock = nil } + return nil } func (i *tenantStore) resetHeadBlock() error { - if err := i.cutHeadBlock(); err != nil { - return err - } - meta := &backend.BlockMeta{ BlockID: backend.NewUUID(), // TODO - Deterministic UUID TenantID: i.tenantID, @@ -106,9 +113,13 @@ func (i *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { if err := store.WriteBlock(ctx, block); err != nil { return err } + metricBlockBuilderFlushedBlocks.WithLabelValues(i.tenantID).Inc() } - return nil + // Clear the blocks + i.walBlocks = i.walBlocks[:0] + + return i.resetHeadBlock() } func (i *tenantStore) buildWriteableBlock(ctx context.Context, b common.WALBlock) (tempodb.WriteableBlock, error) { diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index 95ad35e5ce2..fb3aa932fe9 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -1623,7 +1623,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist l := dslog.Level{} _ = l.Set("error") mw := receiver.MultiTenancyMiddleware() - d, err := New(distributorConfig, clientConfig, ingestersRing, generator_client.Config{}, nil, overrides, mw, logger, l, prometheus.NewPedanticRegistry()) + d, err := New(distributorConfig, clientConfig, ingestersRing, generator_client.Config{}, nil, nil, overrides, mw, logger, l, prometheus.NewPedanticRegistry()) require.NoError(t, err) return d, ingesters diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 84a20a920d2..1cdea075ddb 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -340,7 +340,7 @@ func TestInstanceSearchTagAndValuesV2(t *testing.T) { // test that we are creating cache files for search tag values v2 // check that we have cache files for all complete blocks for all the cache keys - limit := i.limiter.limits.MaxBytesPerTagValuesQuery("fake") + limit := i.limiter.Limits().MaxBytesPerTagValuesQuery("fake") cacheKeys := cacheKeysForTestSearchTagValuesV2(tagKey, queryThatMatches, limit) for _, cacheKey := range cacheKeys { for _, b := range i.completeBlocks { diff --git a/pkg/ingest/blockbuilder/consumer.go b/pkg/ingest/blockbuilder/consumer.go index 763d7594958..29b334c3698 100644 --- a/pkg/ingest/blockbuilder/consumer.go +++ b/pkg/ingest/blockbuilder/consumer.go @@ -10,8 +10,7 @@ import ( "github.com/twmb/franz-go/plugin/kprom" ) -type Consumer struct { -} +type Consumer struct{} type PartitionsFn func(context.Context, *kgo.Client, map[string][]int32) diff --git a/pkg/ingest/encoding.go b/pkg/ingest/encoding.go index 77b849103f2..65916f69e10 100644 --- a/pkg/ingest/encoding.go +++ b/pkg/ingest/encoding.go @@ -31,7 +31,8 @@ func Encode(partitionID int32, tenantID string, req *tempopb.PushBytesRequest, m var records []*kgo.Record batch := encoderPool.Get().(*tempopb.PushBytesRequest) - defer encoderPool.Put(req) + batch.Reset() + defer encoderPool.Put(batch) if batch.Traces == nil { batch.Traces = make([]tempopb.PreallocBytes, 0, 1024) // TODO - Why 1024? Lucky number? diff --git a/pkg/ingest/encoding_test.go b/pkg/ingest/encoding_test.go index 957f9e6090a..cf64e8607bf 100644 --- a/pkg/ingest/encoding_test.go +++ b/pkg/ingest/encoding_test.go @@ -3,28 +3,28 @@ package ingest import ( "math/rand" "testing" - "time" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + + "github.com/grafana/tempo/pkg/tempopb" ) func TestEncoderDecoder(t *testing.T) { tests := []struct { name string - stream logproto.Stream + req *tempopb.PushBytesRequest maxSize int expectSplit bool }{ { name: "Small trace, no split", - stream: generateStream(10, 100), + req: generateRequest(10, 100), maxSize: 1024 * 1024, expectSplit: false, }, { name: "Large trace, expect split", - stream: generateStream(1000, 1000), + req: generateRequest(1000, 1000), maxSize: 1024 * 10, expectSplit: true, }, @@ -32,10 +32,9 @@ func TestEncoderDecoder(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - decoder, err := NewDecoder() - require.NoError(t, err) + decoder := NewDecoder() - records, err := Encode(0, "test-tenant", tt.stream, tt.maxSize) + records, err := Encode(0, "test-tenant", tt.req, tt.maxSize) require.NoError(t, err) if tt.expectSplit { @@ -44,32 +43,28 @@ func TestEncoderDecoder(t *testing.T) { require.Equal(t, 1, len(records)) } - var decodedEntries []logproto.Entry - var decodedLabels labels.Labels + var decodedEntries []tempopb.PreallocBytes + var decodedIDs []tempopb.PreallocBytes for _, record := range records { - stream, ls, err := decoder.Decode(record.Value) + decoder.Reset() + req, err := decoder.Decode(record.Value) require.NoError(t, err) - decodedEntries = append(decodedEntries, stream.Entries...) - if decodedLabels == nil { - decodedLabels = ls - } else { - require.Equal(t, decodedLabels, ls) - } + decodedEntries = append(decodedEntries, req.Traces...) + decodedIDs = append(decodedIDs, req.Ids...) } - require.Equal(t, tt.stream.Labels, decodedLabels.String()) - require.Equal(t, len(tt.stream.Entries), len(decodedEntries)) - for i, entry := range tt.stream.Entries { - require.Equal(t, entry.Timestamp.UTC(), decodedEntries[i].Timestamp.UTC()) - require.Equal(t, entry.Line, decodedEntries[i].Line) + require.Equal(t, len(tt.req.Traces), len(decodedEntries)) + for i := range tt.req.Traces { + require.Equal(t, tt.req.Traces[i], decodedEntries[i]) + require.Equal(t, tt.req.Ids[i], decodedIDs[i]) } }) } } func TestEncoderSingleEntryTooLarge(t *testing.T) { - stream := generateStream(1, 1000) + stream := generateRequest(1, 1000) _, err := Encode(0, "test-tenant", stream, 100) require.Error(t, err) @@ -77,34 +72,29 @@ func TestEncoderSingleEntryTooLarge(t *testing.T) { } func TestDecoderInvalidData(t *testing.T) { - decoder, err := NewDecoder() - require.NoError(t, err) + decoder := NewDecoder() - _, _, err = decoder.Decode([]byte("invalid data")) + _, err := decoder.Decode([]byte("invalid data")) require.Error(t, err) } func TestEncoderDecoderEmptyStream(t *testing.T) { - decoder, err := NewDecoder() - require.NoError(t, err) + decoder := NewDecoder() - stream := logproto.Stream{ - Labels: `{app="test"}`, - } + req := &tempopb.PushBytesRequest{} - records, err := Encode(0, "test-tenant", stream, 10<<20) + records, err := Encode(0, "test-tenant", req, 10<<20) require.NoError(t, err) require.Len(t, records, 1) - decodedStream, decodedLabels, err := decoder.Decode(records[0].Value) + decodedReq, err := decoder.Decode(records[0].Value) require.NoError(t, err) - require.Equal(t, stream.Labels, decodedLabels.String()) - require.Empty(t, decodedStream.Entries) + require.Equal(t, req.Traces, decodedReq.Traces) } func BenchmarkEncodeDecode(b *testing.B) { - decoder, _ := NewDecoder() - stream := generateStream(1000, 200) + decoder := NewDecoder() + stream := generateRequest(1000, 200) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -113,7 +103,7 @@ func BenchmarkEncodeDecode(b *testing.B) { b.Fatal(err) } for _, record := range records { - _, _, err := decoder.Decode(record.Value) + _, err := decoder.Decode(record.Value) if err != nil { b.Fatal(err) } @@ -122,28 +112,26 @@ func BenchmarkEncodeDecode(b *testing.B) { } // Helper function to generate a test trace -func generateStream(entries, lineLength int) logproto.Stream { - stream := logproto.Stream{ - Labels: `{app="test", env="prod"}`, - Entries: make([]logproto.Entry, entries), +func generateRequest(entries, lineLength int) *tempopb.PushBytesRequest { + stream := &tempopb.PushBytesRequest{ + Traces: make([]tempopb.PreallocBytes, entries), + Ids: make([]tempopb.PreallocBytes, entries), } for i := 0; i < entries; i++ { - stream.Entries[i] = logproto.Entry{ - Timestamp: time.Now(), - Line: generateRandomString(lineLength), - } + stream.Traces[i].Slice = generateRandomString(lineLength) + stream.Ids[i].Slice = generateRandomString(lineLength) } return stream } // Helper function to generate a random string -func generateRandomString(length int) string { +func generateRandomString(length int) []byte { const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" b := make([]byte, length) for i := range b { b[i] = charset[rand.Intn(len(charset))] } - return string(b) + return b } diff --git a/pkg/ingest/util.go b/pkg/ingest/util.go index 8e0cbb64a5e..d2bbac77ada 100644 --- a/pkg/ingest/util.go +++ b/pkg/ingest/util.go @@ -6,10 +6,8 @@ import ( "strconv" ) -var ( - // Regular expression used to parse the ingester numeric ID. - ingesterIDRegexp = regexp.MustCompile("-([0-9]+)$") -) +// Regular expression used to parse the ingester numeric ID. +var ingesterIDRegexp = regexp.MustCompile("-([0-9]+)$") // IngesterPartitionID returns the partition ID owner the the given ingester. func IngesterPartitionID(ingesterID string) (int32, error) { diff --git a/pkg/util/test/logger.go b/pkg/util/test/logger.go new file mode 100644 index 00000000000..efbeef90fa7 --- /dev/null +++ b/pkg/util/test/logger.go @@ -0,0 +1,42 @@ +package test + +import ( + "sync" + "testing" + "time" + + "github.com/go-kit/log" +) + +var _ log.Logger = (*TestingLogger)(nil) + +type TestingLogger struct { + t testing.TB + mtx *sync.Mutex +} + +func NewTestingLogger(t testing.TB) *TestingLogger { + return &TestingLogger{ + t: t, + mtx: &sync.Mutex{}, + } +} + +// WithT returns a new logger that logs to t. Writes between the new logger and the original logger are synchronized. +func (l *TestingLogger) WithT(t testing.TB) log.Logger { + return &TestingLogger{ + t: t, + mtx: l.mtx, + } +} + +func (l *TestingLogger) Log(keyvals ...interface{}) error { + // Prepend log with timestamp. + keyvals = append([]interface{}{time.Now().String()}, keyvals...) + + l.mtx.Lock() + l.t.Log(keyvals...) + l.mtx.Unlock() + + return nil +} diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index b33370581d1..fae87246a64 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -354,3 +354,18 @@ func MakeTraceWithTags(traceID []byte, service string, intValue int64) *tempopb. }) return trace } + +func MakePushBytesRequest(t *testing.T, requests int, traceID []byte) *tempopb.PushBytesRequest { + trace := MakeTrace(requests, traceID) + b, err := proto.Marshal(trace) + require.NoError(t, err) + + req := &tempopb.PushBytesRequest{ + Traces: make([]tempopb.PreallocBytes, 0), + Ids: make([]tempopb.PreallocBytes, 0), + } + req.Traces = append(req.Traces, tempopb.PreallocBytes{Slice: b}) + req.Ids = append(req.Ids, tempopb.PreallocBytes{Slice: traceID}) + + return req +}