diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ce5d0a00d4..cd4b6ac0e36 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -116,11 +116,38 @@ jobs: - name: Test run: make test-e2e + integration-tests-poller: + name: Test integration e2e suite - poller + runs-on: ubuntu-latest + steps: + - name: Set up Go 1.22 + uses: actions/setup-go@v5 + with: + go-version: '>=1.22.4' + + - name: Check out code + uses: actions/checkout@v4 + - name: Poller run: make test-integration-poller + integration-tests-deployment-modes: + name: Test integration e2e suite - deployment modes + runs-on: ubuntu-latest + steps: + - name: Set up Go 1.22 + uses: actions/setup-go@v5 + with: + go-version: '>=1.22.4' + + - name: Check out code + uses: actions/checkout@v4 + + - name: Deployment modes + run: make test-e2e-deployments + integration-tests-serverless: - name: Test serverless integration e2e suite + name: Test integration e2e suite - serverless runs-on: ubuntu-latest steps: - name: Set up Go 1.22 diff --git a/Makefile b/Makefile index 00b1959d756..0d409e43ed3 100644 --- a/Makefile +++ b/Makefile @@ -135,13 +135,19 @@ test-e2e: tools docker-tempo docker-tempo-query ## Run end to end tests test-e2e-serverless: tools docker-tempo docker-serverless ## Run serverless end to end tests $(GOTEST) -v $(GOTEST_OPT) ./integration/e2e/serverless +# runs only deployment modes e2e tests +.PHONY: test-e2e-deployments +test-e2e-deployments: tools docker-tempo docker-tempo-query ## Run end to end tests for deployments + $(GOTEST) -v $(GOTEST_OPT) ./integration/e2e/deployments + +# runs only poller integration tests .PHONY: test-integration-poller test-integration-poller: tools ## Run poller integration tests $(GOTEST) -v $(GOTEST_OPT) ./integration/poller # test-all/bench use a docker image so build it first to make sure we're up to date .PHONY: test-all ## Run all tests -test-all: test-with-cover test-e2e test-e2e-serverless test-integration-poller +test-all: test-with-cover test-e2e test-e2e-serverless test-e2e-deployments test-integration-poller .PHONY: test-bench test-bench: tools docker-tempo ## Run all benchmarks diff --git a/integration/bench/load_test.go b/integration/bench/load_test.go index f501208c3c4..a89d60a5fcd 100644 --- a/integration/bench/load_test.go +++ b/integration/bench/load_test.go @@ -5,7 +5,7 @@ import ( "path/filepath" "testing" - util "github.com/grafana/tempo/integration" + "github.com/grafana/tempo/integration/util" "github.com/grafana/e2e" e2e_db "github.com/grafana/e2e/db" diff --git a/integration/e2e/api_test.go b/integration/e2e/api_test.go index dff44e2c2cb..3d6a18cfe7e 100644 --- a/integration/e2e/api_test.go +++ b/integration/e2e/api_test.go @@ -15,7 +15,7 @@ import ( "time" "github.com/grafana/e2e" - util "github.com/grafana/tempo/integration" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/tempopb" "github.com/stretchr/testify/require" ) @@ -48,10 +48,10 @@ func TestSearchTagsV2(t *testing.T) { firstBatch := batchTmpl{spanCount: 2, name: "foo", resourceAttVal: "bar", spanAttVal: "bar", resourceAttr: "firstRes", SpanAttr: "firstSpan"} secondBatch := batchTmpl{spanCount: 2, name: "baz", resourceAttVal: "qux", spanAttVal: "qux", resourceAttr: "secondRes", SpanAttr: "secondSpan"} - batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, firstBatch.resourceAttr, firstBatch.SpanAttr) + batch := util.MakeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, firstBatch.resourceAttr, firstBatch.SpanAttr) require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) - batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, secondBatch.resourceAttr, secondBatch.SpanAttr) + batch = util.MakeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, secondBatch.resourceAttr, secondBatch.SpanAttr) require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) // Wait for the traces to be written to the WAL @@ -237,9 +237,9 @@ func TestSearchTagsV2(t *testing.T) { // Wait to block flushed to backend, 20 seconds is the complete_block_timeout configuration on all in one, we add // 2s for security. - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(time.Second * 22) - callFlush(t, tempo) + util.CallFlush(t, tempo) // test metrics require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) @@ -290,10 +290,10 @@ func TestSearchTagValuesV2(t *testing.T) { firstBatch := batchTmpl{spanCount: 2, name: "foo", resourceAttVal: "bar", spanAttVal: "bar"} secondBatch := batchTmpl{spanCount: 2, name: "baz", resourceAttVal: "qux", spanAttVal: "qux"} - batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, "xx", "x") + batch := util.MakeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, "xx", "x") require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) - batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, "xx", "x") + batch = util.MakeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, "xx", "x") require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) // Wait for the traces to be written to the WAL @@ -410,9 +410,9 @@ func TestSearchTagValuesV2(t *testing.T) { // Wait to block flushed to backend, 20 seconds is the complete_block_timeout configuration on all in one, we add // 2s for security. - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(time.Second * 22) - callFlush(t, tempo) + util.CallFlush(t, tempo) // test metrics require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) @@ -455,16 +455,16 @@ func TestSearchTags(t *testing.T) { require.NoError(t, err) require.NotNil(t, jaegerClient) - batch := makeThriftBatch() + batch := util.MakeThriftBatch() require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) // Wait for the traces to be written to the WAL time.Sleep(time.Second * 3) callSearchTagsAndAssert(t, tempo, searchTagsResponse{TagNames: []string{"service.name", "x", "xx"}}, 0, 0) - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(time.Second * 30) - callFlush(t, tempo) + util.CallFlush(t, tempo) // test metrics require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) @@ -496,16 +496,16 @@ func TestSearchTagValues(t *testing.T) { require.NoError(t, err) require.NotNil(t, jaegerClient) - batch := makeThriftBatch() + batch := util.MakeThriftBatch() require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) // Wait for the traces to be written to the WAL time.Sleep(time.Second * 3) callSearchTagValuesAndAssert(t, tempo, "service.name", searchTagValuesResponse{TagValues: []string{"my-service"}}, 0, 0) - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(time.Second * 22) - callFlush(t, tempo) + util.CallFlush(t, tempo) require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics)) diff --git a/integration/e2e/backend/backend.go b/integration/e2e/backend/backend.go index 9c542eb455e..30b233b8311 100644 --- a/integration/e2e/backend/backend.go +++ b/integration/e2e/backend/backend.go @@ -8,9 +8,9 @@ import ( "github.com/grafana/e2e" e2e_db "github.com/grafana/e2e/db" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/cmd/tempo/app" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/azure" ) diff --git a/integration/e2e/compression_test.go b/integration/e2e/compression_test.go index 15591dece66..8388846d8d1 100644 --- a/integration/e2e/compression_test.go +++ b/integration/e2e/compression_test.go @@ -8,16 +8,16 @@ import ( "time" "github.com/grafana/e2e" + "github.com/grafana/tempo/integration/util" "github.com/stretchr/testify/require" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/pkg/httpclient" "github.com/grafana/tempo/pkg/tempopb" tempoUtil "github.com/grafana/tempo/pkg/util" ) const ( - configCompression = "config-all-in-one-local.yaml" + configCompression = "deployments/config-all-in-one-local.yaml" ) func TestCompression(t *testing.T) { @@ -41,7 +41,7 @@ func TestCompression(t *testing.T) { apiClientWithCompression := httpclient.NewWithCompression("http://"+tempo.Endpoint(3200), "") - queryAndAssertTrace(t, apiClient, info) + util.QueryAndAssertTrace(t, apiClient, info) queryAndAssertTraceCompression(t, apiClientWithCompression, info) } @@ -53,7 +53,7 @@ func queryAndAssertTraceCompression(t *testing.T, client *httpclient.Client, inf expected, err := info.ConstructTraceFromEpoch() require.NoError(t, err) - assertEqualTrace(t, result, expected) + util.AssertEqualTrace(t, result, expected) // Go's http.Client transparently requests gzip compression and automatically decompresses the // response, to disable this behaviour you have to explicitly set the Accept-Encoding header. @@ -79,5 +79,5 @@ func queryAndAssertTraceCompression(t *testing.T, client *httpclient.Client, inf err = tempopb.UnmarshalFromJSONV1(bodyBytes, m) require.NoError(t, err) - assertEqualTrace(t, expected, m) + util.AssertEqualTrace(t, expected, m) } diff --git a/integration/e2e/cross_cluster_reads_test.go b/integration/e2e/cross_cluster_reads_test.go index e2d37c76bb6..8edf940f305 100644 --- a/integration/e2e/cross_cluster_reads_test.go +++ b/integration/e2e/cross_cluster_reads_test.go @@ -6,10 +6,10 @@ import ( "github.com/grafana/e2e" e2edb "github.com/grafana/e2e/db" + "github.com/grafana/tempo/integration/util" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/pkg/httpclient" tempoUtil "github.com/grafana/tempo/pkg/util" ) @@ -44,13 +44,13 @@ func TestCrossClusterReads(t *testing.T) { require.NoError(t, err) // test metrics - require.NoError(t, tempoDistributorA.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) + require.NoError(t, tempoDistributorA.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total")) // read from cluster B apiClient := httpclient.New("http://"+tempoQueryFrontendB.Endpoint(3200), "") // query an in-memory trace - queryAndAssertTrace(t, apiClient, info) + util.QueryAndAssertTrace(t, apiClient, info) } func createCluster(t *testing.T, s *e2e.Scenario, postfix string) (*e2e.HTTPService, *e2e.HTTPService) { diff --git a/integration/e2e/config-all-in-one-azurite.yaml b/integration/e2e/deployments/config-all-in-one-azurite.yaml similarity index 100% rename from integration/e2e/config-all-in-one-azurite.yaml rename to integration/e2e/deployments/config-all-in-one-azurite.yaml diff --git a/integration/e2e/config-all-in-one-gcs.yaml b/integration/e2e/deployments/config-all-in-one-gcs.yaml similarity index 100% rename from integration/e2e/config-all-in-one-gcs.yaml rename to integration/e2e/deployments/config-all-in-one-gcs.yaml diff --git a/integration/e2e/config-all-in-one-local.yaml b/integration/e2e/deployments/config-all-in-one-local.yaml similarity index 100% rename from integration/e2e/config-all-in-one-local.yaml rename to integration/e2e/deployments/config-all-in-one-local.yaml diff --git a/integration/e2e/config-all-in-one-s3.yaml b/integration/e2e/deployments/config-all-in-one-s3.yaml similarity index 100% rename from integration/e2e/config-all-in-one-s3.yaml rename to integration/e2e/deployments/config-all-in-one-s3.yaml diff --git a/integration/e2e/config-microservices.tmpl.yaml b/integration/e2e/deployments/config-microservices.tmpl.yaml similarity index 100% rename from integration/e2e/config-microservices.tmpl.yaml rename to integration/e2e/deployments/config-microservices.tmpl.yaml diff --git a/integration/e2e/config-scalable-single-binary.yaml b/integration/e2e/deployments/config-scalable-single-binary.yaml similarity index 100% rename from integration/e2e/config-scalable-single-binary.yaml rename to integration/e2e/deployments/config-scalable-single-binary.yaml diff --git a/integration/e2e/deployments/microservices_test.go b/integration/e2e/deployments/microservices_test.go new file mode 100644 index 00000000000..c7eecedb5fd --- /dev/null +++ b/integration/e2e/deployments/microservices_test.go @@ -0,0 +1,191 @@ +package deployments + +import ( + "fmt" + "testing" + "time" + + "github.com/grafana/e2e" + e2edb "github.com/grafana/e2e/db" + "github.com/grafana/tempo/integration/util" + "github.com/grafana/tempo/pkg/httpclient" + tempoUtil "github.com/grafana/tempo/pkg/util" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +const configMicroservices = "config-microservices.tmpl.yaml" + +func TestMicroservicesWithKVStores(t *testing.T) { + testKVStores := []struct { + name string + kvconfig func(hostname string, port int) string + }{ + { + name: "memberlist", + kvconfig: func(string, int) string { + return ` + store: memberlist` + }, + }, + { + name: "etcd", + kvconfig: func(hostname string, port int) string { + return fmt.Sprintf(` + store: etcd + etcd: + endpoints: + - http://%s:%d`, hostname, port) + }, + }, + { + name: "consul", + kvconfig: func(hostname string, port int) string { + return fmt.Sprintf(` + store: consul + consul: + host: http://%s:%d`, hostname, port) + }, + }, + } + + for _, tc := range testKVStores { + t.Run(tc.name, func(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + // Set up KVStore + var kvstore *e2e.HTTPService + switch tc.name { + case "etcd": + kvstore = e2edb.NewETCD() + require.NoError(t, s.StartAndWaitReady(kvstore)) + case "consul": + kvstore = e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(kvstore)) + case "memberlist": + default: + t.Errorf("unknown KVStore %s", tc.name) + } + + KVStoreConfig := tc.kvconfig("", 0) + if kvstore != nil { + KVStoreConfig = tc.kvconfig(kvstore.Name(), kvstore.HTTPPort()) + } + + // copy config template to shared directory and expand template variables + tmplConfig := map[string]any{"KVStore": KVStoreConfig} + _, err = util.CopyTemplateToSharedDir(s, configMicroservices, "config.yaml", tmplConfig) + require.NoError(t, err) + + minio := e2edb.NewMinio(9000, "tempo") + require.NotNil(t, minio) + require.NoError(t, s.StartAndWaitReady(minio)) + + tempoIngester1 := util.NewTempoIngester(1) + tempoIngester2 := util.NewTempoIngester(2) + tempoIngester3 := util.NewTempoIngester(3) + + tempoDistributor := util.NewTempoDistributor() + tempoQueryFrontend := util.NewTempoQueryFrontend() + tempoQuerier := util.NewTempoQuerier() + require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoIngester3, tempoDistributor, tempoQueryFrontend, tempoQuerier)) + + // wait for active ingesters + time.Sleep(1 * time.Second) + matchers := []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "name", + Value: "ingester", + }, + { + Type: labels.MatchEqual, + Name: "state", + Value: "ACTIVE", + }, + } + require.NoError(t, tempoDistributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) + + // Get port for the Jaeger gRPC receiver endpoint + c, err := util.NewJaegerGRPCClient(tempoDistributor.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c) + + info := tempoUtil.NewTraceInfo(time.Now(), "") + require.NoError(t, info.EmitAllBatches(c)) + + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) + + // test metrics + require.NoError(t, tempoDistributor.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total")) + + // test echo + util.AssertEcho(t, "http://"+tempoQueryFrontend.Endpoint(3200)+"/api/echo") + + apiClient := httpclient.New("http://"+tempoQueryFrontend.Endpoint(3200), "") + + // query an in-memory trace + util.QueryAndAssertTrace(t, apiClient, info) + + // wait trace_idle_time and ensure trace is created in ingester + require.NoError(t, tempoIngester1.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + require.NoError(t, tempoIngester2.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + require.NoError(t, tempoIngester3.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + + // flush trace to backend + util.CallFlush(t, tempoIngester1) + util.CallFlush(t, tempoIngester2) + util.CallFlush(t, tempoIngester3) + + // search for trace + util.SearchAndAssertTrace(t, apiClient, info) + util.SearchTraceQLAndAssertTrace(t, apiClient, info) + + // sleep for one maintenance cycle + time.Sleep(5 * time.Second) + + // test metrics + for _, i := range []*e2e.HTTPService{tempoIngester1, tempoIngester2, tempoIngester3} { + require.NoError(t, i.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) + } + require.NoError(t, tempoQuerier.WaitSumMetrics(e2e.Equals(3), "tempodb_blocklist_length")) + require.NoError(t, tempoQueryFrontend.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total")) + + // query trace - should fetch from backend + util.QueryAndAssertTrace(t, apiClient, info) + + // stop an ingester and confirm we can still write and query + err = tempoIngester2.Kill() + require.NoError(t, err) + + // sleep for heartbeat timeout + time.Sleep(1 * time.Second) + + info = tempoUtil.NewTraceInfo(time.Now(), "") + require.NoError(t, info.EmitAllBatches(c)) + + // query by id + util.QueryAndAssertTrace(t, apiClient, info) + + // wait trace_idle_time and ensure trace is created in ingester + require.NoError(t, tempoIngester1.WaitSumMetricsWithOptions(e2e.Less(4), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + require.NoError(t, tempoIngester3.WaitSumMetricsWithOptions(e2e.Less(4), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + + // flush trace to backend + util.CallFlush(t, tempoIngester1) + util.CallFlush(t, tempoIngester3) + + // search for trace + util.SearchAndAssertTrace(t, apiClient, info) + + // stop another ingester and confirm things fail + err = tempoIngester1.Kill() + require.NoError(t, err) + + require.Error(t, info.EmitBatches(c)) + }) + } +} diff --git a/integration/e2e/deployments/single_binary_test.go b/integration/e2e/deployments/single_binary_test.go new file mode 100644 index 00000000000..7966b71bca0 --- /dev/null +++ b/integration/e2e/deployments/single_binary_test.go @@ -0,0 +1,214 @@ +package deployments + +import ( + "context" + "fmt" + "net/http" + "os" + "sync" + "testing" + "time" + + "github.com/grafana/e2e" + "github.com/grafana/tempo/cmd/tempo/app" + "github.com/grafana/tempo/integration/e2e/backend" + "github.com/grafana/tempo/integration/util" + "github.com/grafana/tempo/pkg/httpclient" + tempoUtil "github.com/grafana/tempo/pkg/util" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +const ( + configAllInOneS3 = "config-all-in-one-s3.yaml" + configAllInOneAzurite = "config-all-in-one-azurite.yaml" + configAllInOneGCS = "config-all-in-one-gcs.yaml" +) + +func TestAllInOne(t *testing.T) { + testBackends := []struct { + name string + configFile string + }{ + { + name: "s3", + configFile: configAllInOneS3, + }, + { + name: "azure", + configFile: configAllInOneAzurite, + }, + { + name: "gcs", + configFile: configAllInOneGCS, + }, + } + + storageBackendTestPermutations := []struct { + name string + prefix string + }{ + { + name: "no-prefix", + }, + { + name: "prefix", + prefix: "a/b/c/", + }, + } + + for _, tc := range testBackends { + for _, pc := range storageBackendTestPermutations { + t.Run(tc.name+"-"+pc.name, func(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + // copy config template to shared directory and expand template variables + tmplConfig := map[string]any{"Prefix": pc.prefix} + configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig) + require.NoError(t, err) + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(configFile) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + _, err = backend.New(s, cfg) + require.NoError(t, err) + + tempo := util.NewTempoAllInOne() + require.NoError(t, s.StartAndWaitReady(tempo)) + + // Get port for the Jaeger gRPC receiver endpoint + c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c) + + info := tempoUtil.NewTraceInfo(time.Now(), "") + require.NoError(t, info.EmitAllBatches(c)) + + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) + + // test metrics + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total")) + + // test echo + // nolint:goconst + util.AssertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") + + apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "") + + // query an in-memory trace + util.QueryAndAssertTrace(t, apiClient, info) + + // wait trace_idle_time and ensure trace is created in ingester + require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + + // flush trace to backend + util.CallFlush(t, tempo) + + // search for trace in backend + util.SearchAndAssertTrace(t, apiClient, info) + util.SearchTraceQLAndAssertTrace(t, apiClient, info) + + // sleep + time.Sleep(10 * time.Second) + + // force clear completed block + util.CallFlush(t, tempo) + + fmt.Println(tempo.Endpoint(3200)) + // test metrics + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) + require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics)) + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total")) + + matchers := []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "receiver", + Value: "tempo/jaeger_receiver", + }, + { + Type: labels.MatchEqual, + Name: "transport", + Value: "grpc", + }, + } + + require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Greater(1), []string{"tempo_receiver_accepted_spans"}, e2e.WithLabelMatchers(matchers...))) + require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"tempo_receiver_refused_spans"}, e2e.WithLabelMatchers(matchers...))) + + // query trace - should fetch from backend + util.QueryAndAssertTrace(t, apiClient, info) + + // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0 + now := time.Now() + util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) + + util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix()) + + // find the trace with streaming. using the http server b/c that's what Grafana will do + grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) + require.NoError(t, err) + + util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) + }) + } + } +} + +func TestShutdownDelay(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + // copy config template to shared directory and expand template variables + tmplConfig := map[string]any{"Prefix": ""} + configFile, err := util.CopyTemplateToSharedDir(s, configAllInOneS3, "config.yaml", tmplConfig) + require.NoError(t, err) + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(configFile) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + _, err = backend.New(s, cfg) + require.NoError(t, err) + + tempo := util.NewTempoAllInOne("-shutdown-delay=5s") + + // this line tests confirms that the readiness flag is up + require.NoError(t, s.StartAndWaitReady(tempo)) + + // if we're here the readiness flag is up. now call kill and check the readiness flag is down + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 10; i++ { + res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") + require.NoError(t, err) + res.Body.Close() + + if res.StatusCode == http.StatusServiceUnavailable { + // found it! + return + } + time.Sleep(time.Second) + } + + require.Fail(t, "readiness flag never went down") + }() + + // call stop and allow the code above to test for a unavailable readiness flag + _ = tempo.Stop() + + wg.Wait() +} diff --git a/integration/e2e/deployments/ssb_test.go b/integration/e2e/deployments/ssb_test.go new file mode 100644 index 00000000000..ef03a5579d0 --- /dev/null +++ b/integration/e2e/deployments/ssb_test.go @@ -0,0 +1,123 @@ +package deployments + +import ( + "sync" + "testing" + "time" + + "github.com/grafana/e2e" + e2edb "github.com/grafana/e2e/db" + "github.com/grafana/tempo/integration/util" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/tempo/pkg/httpclient" + tempoUtil "github.com/grafana/tempo/pkg/util" +) + +const ( + configHA = "config-scalable-single-binary.yaml" +) + +func TestScalableSingleBinary(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + minio := e2edb.NewMinio(9000, "tempo") + require.NotNil(t, minio) + require.NoError(t, s.StartAndWaitReady(minio)) + + // copy configuration file over to shared dir + require.NoError(t, util.CopyFileToSharedDir(s, configHA, "config.yaml")) + + // start three scalable single binary tempos in parallel + var wg sync.WaitGroup + var tempo1, tempo2, tempo3 *e2e.HTTPService + wg.Add(3) + go func() { + tempo1 = util.NewTempoScalableSingleBinary(1) + wg.Done() + }() + go func() { + tempo2 = util.NewTempoScalableSingleBinary(2) + wg.Done() + }() + go func() { + tempo3 = util.NewTempoScalableSingleBinary(3) + wg.Done() + }() + wg.Wait() + require.NoError(t, s.StartAndWaitReady(tempo1, tempo2, tempo3)) + + // wait for 2 active ingesters + time.Sleep(1 * time.Second) + matchers := []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "name", + Value: "ingester", + }, + { + Type: labels.MatchEqual, + Name: "state", + Value: "ACTIVE", + }, + } + + t.Logf("tempo1.Endpoint(): %+v", tempo1.Endpoint(3200)) + + require.NoError(t, tempo1.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) + require.NoError(t, tempo2.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) + require.NoError(t, tempo3.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) + + c1, err := util.NewJaegerGRPCClient(tempo1.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c1) + + c2, err := util.NewJaegerGRPCClient(tempo2.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c2) + + c3, err := util.NewJaegerGRPCClient(tempo3.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c3) + + info := tempoUtil.NewTraceInfo(time.Unix(1632169410, 0), "") + require.NoError(t, info.EmitBatches(c1)) + + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) + + // test metrics + require.NoError(t, tempo1.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total")) + + // wait trace_idle_time and ensure trace is created in ingester + time.Sleep(1 * time.Second) + require.NoError(t, tempo1.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + + for _, i := range []*e2e.HTTPService{tempo1, tempo2, tempo3} { + util.CallFlush(t, i) + require.NoError(t, i.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) + util.CallIngesterRing(t, i) + util.CallCompactorRing(t, i) + util.CallStatus(t, i) + util.CallBuildinfo(t, i) + } + + apiClient1 := httpclient.New("http://"+tempo1.Endpoint(3200), "") + + util.QueryAndAssertTrace(t, apiClient1, info) + + err = tempo1.Kill() + require.NoError(t, err) + + // Push to one of the instances that are still running. + require.NoError(t, info.EmitBatches(c2)) + + err = tempo2.Kill() + require.NoError(t, err) + + err = tempo3.Kill() + require.NoError(t, err) +} diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go deleted file mode 100644 index 635ac8c9d77..00000000000 --- a/integration/e2e/e2e_test.go +++ /dev/null @@ -1,641 +0,0 @@ -package e2e - -import ( - "context" - "encoding/json" - "fmt" - "io" - "math/rand" - "net/http" - "os" - "sync" - "testing" - "time" - - thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" - - "github.com/grafana/e2e" - e2edb "github.com/grafana/e2e/db" - - "github.com/grafana/tempo/cmd/tempo/app" - util "github.com/grafana/tempo/integration" - "github.com/grafana/tempo/integration/e2e/backend" - "github.com/grafana/tempo/pkg/httpclient" - "github.com/grafana/tempo/pkg/model/trace" - "github.com/grafana/tempo/pkg/tempopb" - tempoUtil "github.com/grafana/tempo/pkg/util" -) - -const ( - configMicroservices = "config-microservices.tmpl.yaml" - configHA = "config-scalable-single-binary.yaml" - - configAllInOneS3 = "config-all-in-one-s3.yaml" - configAllInOneAzurite = "config-all-in-one-azurite.yaml" - configAllInOneGCS = "config-all-in-one-gcs.yaml" -) - -func TestAllInOne(t *testing.T) { - testBackends := []struct { - name string - configFile string - }{ - { - name: "s3", - configFile: configAllInOneS3, - }, - { - name: "azure", - configFile: configAllInOneAzurite, - }, - { - name: "gcs", - configFile: configAllInOneGCS, - }, - } - - storageBackendTestPermutations := []struct { - name string - prefix string - }{ - { - name: "no-prefix", - }, - { - name: "prefix", - prefix: "a/b/c/", - }, - } - - for _, tc := range testBackends { - for _, pc := range storageBackendTestPermutations { - t.Run(tc.name+"-"+pc.name, func(t *testing.T) { - s, err := e2e.NewScenario("tempo_e2e") - require.NoError(t, err) - defer s.Close() - - // copy config template to shared directory and expand template variables - tmplConfig := map[string]any{"Prefix": pc.prefix} - configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig) - require.NoError(t, err) - - // set up the backend - cfg := app.Config{} - buff, err := os.ReadFile(configFile) - require.NoError(t, err) - err = yaml.UnmarshalStrict(buff, &cfg) - require.NoError(t, err) - _, err = backend.New(s, cfg) - require.NoError(t, err) - - tempo := util.NewTempoAllInOne() - require.NoError(t, s.StartAndWaitReady(tempo)) - - // Get port for the Jaeger gRPC receiver endpoint - c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) - require.NoError(t, err) - require.NotNil(t, c) - - info := tempoUtil.NewTraceInfo(time.Now(), "") - require.NoError(t, info.EmitAllBatches(c)) - - expected, err := info.ConstructTraceFromEpoch() - require.NoError(t, err) - - // test metrics - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) - - // test echo - // nolint:goconst - assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") - - apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "") - - // query an in-memory trace - queryAndAssertTrace(t, apiClient, info) - - // wait trace_idle_time and ensure trace is created in ingester - require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - - // flush trace to backend - callFlush(t, tempo) - - // search for trace in backend - util.SearchAndAssertTrace(t, apiClient, info) - util.SearchTraceQLAndAssertTrace(t, apiClient, info) - - // sleep - time.Sleep(10 * time.Second) - - // force clear completed block - callFlush(t, tempo) - - fmt.Println(tempo.Endpoint(3200)) - // test metrics - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) - require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics)) - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total")) - - matchers := []*labels.Matcher{ - { - Type: labels.MatchEqual, - Name: "receiver", - Value: "tempo/jaeger_receiver", - }, - { - Type: labels.MatchEqual, - Name: "transport", - Value: "grpc", - }, - } - - require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Greater(1), []string{"tempo_receiver_accepted_spans"}, e2e.WithLabelMatchers(matchers...))) - require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"tempo_receiver_refused_spans"}, e2e.WithLabelMatchers(matchers...))) - - // query trace - should fetch from backend - queryAndAssertTrace(t, apiClient, info) - - // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0 - now := time.Now() - util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) - - util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix()) - - // find the trace with streaming. using the http server b/c that's what Grafana will do - grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) - require.NoError(t, err) - - util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) - }) - } - } -} - -func TestMicroservicesWithKVStores(t *testing.T) { - testKVStores := []struct { - name string - kvconfig func(hostname string, port int) string - }{ - { - name: "memberlist", - kvconfig: func(string, int) string { - return ` - store: memberlist` - }, - }, - { - name: "etcd", - kvconfig: func(hostname string, port int) string { - return fmt.Sprintf(` - store: etcd - etcd: - endpoints: - - http://%s:%d`, hostname, port) - }, - }, - { - name: "consul", - kvconfig: func(hostname string, port int) string { - return fmt.Sprintf(` - store: consul - consul: - host: http://%s:%d`, hostname, port) - }, - }, - } - - for _, tc := range testKVStores { - t.Run(tc.name, func(t *testing.T) { - s, err := e2e.NewScenario("tempo_e2e") - require.NoError(t, err) - defer s.Close() - - // Set up KVStore - var kvstore *e2e.HTTPService - switch tc.name { - case "etcd": - kvstore = e2edb.NewETCD() - require.NoError(t, s.StartAndWaitReady(kvstore)) - case "consul": - kvstore = e2edb.NewConsul() - require.NoError(t, s.StartAndWaitReady(kvstore)) - case "memberlist": - default: - t.Errorf("unknown KVStore %s", tc.name) - } - - KVStoreConfig := tc.kvconfig("", 0) - if kvstore != nil { - KVStoreConfig = tc.kvconfig(kvstore.Name(), kvstore.HTTPPort()) - } - - // copy config template to shared directory and expand template variables - tmplConfig := map[string]any{"KVStore": KVStoreConfig} - _, err = util.CopyTemplateToSharedDir(s, configMicroservices, "config.yaml", tmplConfig) - require.NoError(t, err) - - minio := e2edb.NewMinio(9000, "tempo") - require.NotNil(t, minio) - require.NoError(t, s.StartAndWaitReady(minio)) - - tempoIngester1 := util.NewTempoIngester(1) - tempoIngester2 := util.NewTempoIngester(2) - tempoIngester3 := util.NewTempoIngester(3) - - tempoDistributor := util.NewTempoDistributor() - tempoQueryFrontend := util.NewTempoQueryFrontend() - tempoQuerier := util.NewTempoQuerier() - require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoIngester3, tempoDistributor, tempoQueryFrontend, tempoQuerier)) - - // wait for active ingesters - time.Sleep(1 * time.Second) - matchers := []*labels.Matcher{ - { - Type: labels.MatchEqual, - Name: "name", - Value: "ingester", - }, - { - Type: labels.MatchEqual, - Name: "state", - Value: "ACTIVE", - }, - } - require.NoError(t, tempoDistributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) - - // Get port for the Jaeger gRPC receiver endpoint - c, err := util.NewJaegerGRPCClient(tempoDistributor.Endpoint(14250)) - require.NoError(t, err) - require.NotNil(t, c) - - info := tempoUtil.NewTraceInfo(time.Now(), "") - require.NoError(t, info.EmitAllBatches(c)) - - expected, err := info.ConstructTraceFromEpoch() - require.NoError(t, err) - - // test metrics - require.NoError(t, tempoDistributor.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) - - // test echo - assertEcho(t, "http://"+tempoQueryFrontend.Endpoint(3200)+"/api/echo") - - apiClient := httpclient.New("http://"+tempoQueryFrontend.Endpoint(3200), "") - - // query an in-memory trace - queryAndAssertTrace(t, apiClient, info) - - // wait trace_idle_time and ensure trace is created in ingester - require.NoError(t, tempoIngester1.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - require.NoError(t, tempoIngester2.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - require.NoError(t, tempoIngester3.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - - // flush trace to backend - callFlush(t, tempoIngester1) - callFlush(t, tempoIngester2) - callFlush(t, tempoIngester3) - - // search for trace - util.SearchAndAssertTrace(t, apiClient, info) - util.SearchTraceQLAndAssertTrace(t, apiClient, info) - - // sleep for one maintenance cycle - time.Sleep(5 * time.Second) - - // test metrics - for _, i := range []*e2e.HTTPService{tempoIngester1, tempoIngester2, tempoIngester3} { - require.NoError(t, i.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) - } - require.NoError(t, tempoQuerier.WaitSumMetrics(e2e.Equals(3), "tempodb_blocklist_length")) - require.NoError(t, tempoQueryFrontend.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total")) - - // query trace - should fetch from backend - queryAndAssertTrace(t, apiClient, info) - - // stop an ingester and confirm we can still write and query - err = tempoIngester2.Kill() - require.NoError(t, err) - - // sleep for heartbeat timeout - time.Sleep(1 * time.Second) - - info = tempoUtil.NewTraceInfo(time.Now(), "") - require.NoError(t, info.EmitAllBatches(c)) - - // query by id - queryAndAssertTrace(t, apiClient, info) - - // wait trace_idle_time and ensure trace is created in ingester - require.NoError(t, tempoIngester1.WaitSumMetricsWithOptions(e2e.Less(4), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - require.NoError(t, tempoIngester3.WaitSumMetricsWithOptions(e2e.Less(4), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - - // flush trace to backend - callFlush(t, tempoIngester1) - callFlush(t, tempoIngester3) - - // search for trace - util.SearchAndAssertTrace(t, apiClient, info) - - // stop another ingester and confirm things fail - err = tempoIngester1.Kill() - require.NoError(t, err) - - require.Error(t, info.EmitBatches(c)) - }) - } -} - -func TestShutdownDelay(t *testing.T) { - s, err := e2e.NewScenario("tempo_e2e") - require.NoError(t, err) - defer s.Close() - - // copy config template to shared directory and expand template variables - tmplConfig := map[string]any{"Prefix": ""} - configFile, err := util.CopyTemplateToSharedDir(s, configAllInOneS3, "config.yaml", tmplConfig) - require.NoError(t, err) - - // set up the backend - cfg := app.Config{} - buff, err := os.ReadFile(configFile) - require.NoError(t, err) - err = yaml.UnmarshalStrict(buff, &cfg) - require.NoError(t, err) - _, err = backend.New(s, cfg) - require.NoError(t, err) - - tempo := util.NewTempoAllInOne("-shutdown-delay=5s") - - // this line tests confirms that the readiness flag is up - require.NoError(t, s.StartAndWaitReady(tempo)) - - // if we're here the readiness flag is up. now call kill and check the readiness flag is down - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - - for i := 0; i < 10; i++ { - res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") - require.NoError(t, err) - res.Body.Close() - - if res.StatusCode == http.StatusServiceUnavailable { - // found it! - return - } - time.Sleep(time.Second) - } - - require.Fail(t, "readiness flag never went down") - }() - - // call stop and allow the code above to test for a unavailable readiness flag - _ = tempo.Stop() - - wg.Wait() -} - -func TestScalableSingleBinary(t *testing.T) { - s, err := e2e.NewScenario("tempo_e2e") - require.NoError(t, err) - defer s.Close() - - minio := e2edb.NewMinio(9000, "tempo") - require.NotNil(t, minio) - require.NoError(t, s.StartAndWaitReady(minio)) - - // copy configuration file over to shared dir - require.NoError(t, util.CopyFileToSharedDir(s, configHA, "config.yaml")) - - // start three scalable single binary tempos in parallel - var wg sync.WaitGroup - var tempo1, tempo2, tempo3 *e2e.HTTPService - wg.Add(3) - go func() { - tempo1 = util.NewTempoScalableSingleBinary(1) - wg.Done() - }() - go func() { - tempo2 = util.NewTempoScalableSingleBinary(2) - wg.Done() - }() - go func() { - tempo3 = util.NewTempoScalableSingleBinary(3) - wg.Done() - }() - wg.Wait() - require.NoError(t, s.StartAndWaitReady(tempo1, tempo2, tempo3)) - - // wait for 2 active ingesters - time.Sleep(1 * time.Second) - matchers := []*labels.Matcher{ - { - Type: labels.MatchEqual, - Name: "name", - Value: "ingester", - }, - { - Type: labels.MatchEqual, - Name: "state", - Value: "ACTIVE", - }, - } - - t.Logf("tempo1.Endpoint(): %+v", tempo1.Endpoint(3200)) - - require.NoError(t, tempo1.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) - require.NoError(t, tempo2.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) - require.NoError(t, tempo3.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`tempo_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics)) - - c1, err := util.NewJaegerGRPCClient(tempo1.Endpoint(14250)) - require.NoError(t, err) - require.NotNil(t, c1) - - c2, err := util.NewJaegerGRPCClient(tempo2.Endpoint(14250)) - require.NoError(t, err) - require.NotNil(t, c2) - - c3, err := util.NewJaegerGRPCClient(tempo3.Endpoint(14250)) - require.NoError(t, err) - require.NotNil(t, c3) - - info := tempoUtil.NewTraceInfo(time.Unix(1632169410, 0), "") - require.NoError(t, info.EmitBatches(c1)) - - expected, err := info.ConstructTraceFromEpoch() - require.NoError(t, err) - - // test metrics - require.NoError(t, tempo1.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) - - // wait trace_idle_time and ensure trace is created in ingester - time.Sleep(1 * time.Second) - require.NoError(t, tempo1.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - - for _, i := range []*e2e.HTTPService{tempo1, tempo2, tempo3} { - callFlush(t, i) - require.NoError(t, i.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) - callIngesterRing(t, i) - callCompactorRing(t, i) - callStatus(t, i) - callBuildinfo(t, i) - } - - apiClient1 := httpclient.New("http://"+tempo1.Endpoint(3200), "") - - queryAndAssertTrace(t, apiClient1, info) - - err = tempo1.Kill() - require.NoError(t, err) - - // Push to one of the instances that are still running. - require.NoError(t, info.EmitBatches(c2)) - - err = tempo2.Kill() - require.NoError(t, err) - - err = tempo3.Kill() - require.NoError(t, err) -} - -func makeThriftBatch() *thrift.Batch { - return makeThriftBatchWithSpanCount(1) -} - -func makeThriftBatchWithSpanCount(n int) *thrift.Batch { - return makeThriftBatchWithSpanCountAttributeAndName(n, "my operation", "", "y", "xx", "x") -} - -func makeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, spanValue, resourceTag, spanTag string) *thrift.Batch { - var spans []*thrift.Span - - traceIDLow := rand.Int63() - traceIDHigh := rand.Int63() - for i := 0; i < n; i++ { - spans = append(spans, &thrift.Span{ - TraceIdLow: traceIDLow, - TraceIdHigh: traceIDHigh, - SpanId: rand.Int63(), - ParentSpanId: 0, - OperationName: name, - References: nil, - Flags: 0, - StartTime: time.Now().UnixNano() / 1000, // microsecconds - Duration: 1, - Tags: []*thrift.Tag{ - { - Key: spanTag, - VStr: &spanValue, - }, - }, - Logs: nil, - }) - } - - return &thrift.Batch{ - Process: &thrift.Process{ - ServiceName: "my-service", - Tags: []*thrift.Tag{ - { - Key: resourceTag, - VType: thrift.TagType_STRING, - VStr: &resourceValue, - }, - }, - }, - Spans: spans, - } -} - -func callFlush(t *testing.T, ingester *e2e.HTTPService) { - fmt.Printf("Calling /flush on %s\n", ingester.Name()) - res, err := e2e.DoGet("http://" + ingester.Endpoint(3200) + "/flush") - require.NoError(t, err) - require.Equal(t, http.StatusNoContent, res.StatusCode) -} - -func callIngesterRing(t *testing.T, svc *e2e.HTTPService) { - endpoint := "/ingester/ring" - fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) - res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) -} - -func callCompactorRing(t *testing.T, svc *e2e.HTTPService) { - endpoint := "/compactor/ring" - fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) - res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) -} - -func callStatus(t *testing.T, svc *e2e.HTTPService) { - endpoint := "/status/endpoints" - fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) - res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) -} - -func callBuildinfo(t *testing.T, svc *e2e.HTTPService) { - endpoint := "/api/status/buildinfo" - fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) - res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) - - // Check that the actual JSON response contains all the expected keys (we disregard the values) - var jsonResponse map[string]any - keys := []string{"version", "revision", "branch", "buildDate", "buildUser", "goVersion"} - body, err := io.ReadAll(res.Body) - require.NoError(t, err) - err = json.Unmarshal(body, &jsonResponse) - require.NoError(t, err) - for _, key := range keys { - _, ok := jsonResponse[key] - require.True(t, ok) - } - defer res.Body.Close() -} - -func assertEcho(t *testing.T, url string) { - res, err := e2e.DoGet(url) - require.NoError(t, err) - require.Equal(t, http.StatusOK, res.StatusCode) - defer res.Body.Close() -} - -func queryAndAssertTrace(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo) { - resp, err := client.QueryTrace(info.HexID()) - require.NoError(t, err) - - expected, err := info.ConstructTraceFromEpoch() - require.NoError(t, err) - - assertEqualTrace(t, resp, expected) -} - -func assertEqualTrace(t *testing.T, a, b *tempopb.Trace) { - t.Helper() - trace.SortTraceAndAttributes(a) - trace.SortTraceAndAttributes(b) - - assert.Equal(t, a, b) -} - -func spanCount(a *tempopb.Trace) float64 { - count := 0 - for _, batch := range a.ResourceSpans { - for _, spans := range batch.ScopeSpans { - count += len(spans.Spans) - } - } - - return float64(count) -} diff --git a/integration/e2e/encodings_test.go b/integration/e2e/encodings_test.go index 96b12e1abee..4cbdbd96c3e 100644 --- a/integration/e2e/encodings_test.go +++ b/integration/e2e/encodings_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + util2 "github.com/grafana/tempo/integration/util" v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/e2e" @@ -13,10 +14,10 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/tempo/cmd/tempo/app" - "github.com/grafana/tempo/integration" "github.com/grafana/tempo/integration/e2e/backend" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/httpclient" - "github.com/grafana/tempo/pkg/util" + tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb/encoding" ) @@ -35,7 +36,7 @@ func TestEncodings(t *testing.T) { // copy config template to shared directory and expand template variables tmplConfig := map[string]any{"Version": enc.Version()} - config, err := integration.CopyTemplateToSharedDir(s, configAllEncodings, "config.yaml", tmplConfig) + config, err := util2.CopyTemplateToSharedDir(s, configAllEncodings, "config.yaml", tmplConfig) require.NoError(t, err) // load final config @@ -49,44 +50,44 @@ func TestEncodings(t *testing.T) { _, err = backend.New(s, cfg) require.NoError(t, err) - tempo := integration.NewTempoAllInOne() + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) // Get port for the Jaeger gRPC receiver endpoint - c, err := integration.NewJaegerGRPCClient(tempo.Endpoint(14250)) + c, err := util2.NewJaegerGRPCClient(tempo.Endpoint(14250)) require.NoError(t, err) require.NotNil(t, c) - info := util.NewTraceInfo(time.Now(), "") + info := tempoUtil.NewTraceInfo(time.Now(), "") require.NoError(t, info.EmitAllBatches(c)) expected, err := info.ConstructTraceFromEpoch() require.NoError(t, err) // test metrics - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(util.SpanCount(expected)), "tempo_distributor_spans_received_total")) // test echo - assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") + util.AssertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "") // query an in-memory trace - queryAndAssertTrace(t, apiClient, info) + util.QueryAndAssertTrace(t, apiClient, info) // wait trace_idle_time and ensure trace is created in ingester require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) // flush trace to backend - callFlush(t, tempo) + util.CallFlush(t, tempo) // v2 does not support querying and must be skipped if enc.Version() != v2.VersionString { // search for trace in backend multiple times with different attributes to make sure // we search with different scopes and with attributes from dedicated columns for i := 0; i < repeatedSearchCount; i++ { - integration.SearchAndAssertTrace(t, apiClient, info) - integration.SearchTraceQLAndAssertTrace(t, apiClient, info) + util2.SearchAndAssertTrace(t, apiClient, info) + util2.SearchTraceQLAndAssertTrace(t, apiClient, info) } } @@ -94,7 +95,7 @@ func TestEncodings(t *testing.T) { time.Sleep(10 * time.Second) // force clear completed block - callFlush(t, tempo) + util.CallFlush(t, tempo) // test metrics require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) @@ -104,10 +105,10 @@ func TestEncodings(t *testing.T) { } // query trace - should fetch from backend - queryAndAssertTrace(t, apiClient, info) + util.QueryAndAssertTrace(t, apiClient, info) // create grpc client used for streaming - grpcClient, err := integration.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) + grpcClient, err := util2.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) require.NoError(t, err) if enc.Version() == v2.VersionString { @@ -119,9 +120,9 @@ func TestEncodings(t *testing.T) { now := time.Now() for i := 0; i < repeatedSearchCount; i++ { // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0 - integration.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) + util2.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) // find the trace with streaming. using the http server b/c that's what Grafana will do - integration.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) + util2.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) } }) } diff --git a/integration/e2e/https_test.go b/integration/e2e/https_test.go index 60ad5316c2c..cdee96dc049 100644 --- a/integration/e2e/https_test.go +++ b/integration/e2e/https_test.go @@ -10,9 +10,9 @@ import ( "github.com/grafana/e2e" "github.com/grafana/tempo/cmd/tempo/app" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/integration/e2e/backend" e2e_ca "github.com/grafana/tempo/integration/e2e/ca" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/httpclient" tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/stretchr/testify/require" @@ -73,7 +73,7 @@ func TestHTTPS(t *testing.T) { require.Equal(t, http.StatusOK, resp.StatusCode) // query an in-memory trace - queryAndAssertTrace(t, apiClient, info) + util.QueryAndAssertTrace(t, apiClient, info) util.SearchAndAssertTrace(t, apiClient, info) util.SearchTraceQLAndAssertTrace(t, apiClient, info) diff --git a/integration/e2e/limits_test.go b/integration/e2e/limits_test.go index f1392328bb5..0110dd32cef 100644 --- a/integration/e2e/limits_test.go +++ b/integration/e2e/limits_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/grafana/dskit/user" + util2 "github.com/grafana/tempo/integration/util" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,7 +24,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "google.golang.org/genproto/googleapis/rpc/errdetails" - util "github.com/grafana/tempo/integration" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/httpclient" "github.com/grafana/tempo/pkg/tempopb" tempoUtil "github.com/grafana/tempo/pkg/util" @@ -45,28 +46,28 @@ func TestLimits(t *testing.T) { require.NoError(t, err) defer s.Close() - require.NoError(t, util.CopyFileToSharedDir(s, configLimits, "config.yaml")) - tempo := util.NewTempoAllInOne() + require.NoError(t, util2.CopyFileToSharedDir(s, configLimits, "config.yaml")) + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) // Get port for the otlp receiver endpoint - c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) + c, err := util2.NewJaegerGRPCClient(tempo.Endpoint(14250)) require.NoError(t, err) require.NotNil(t, c) // should fail b/c the trace is too large. each batch should be ~70 bytes - batch := makeThriftBatchWithSpanCount(2) + batch := util.MakeThriftBatchWithSpanCount(2) require.NoError(t, c.EmitBatch(context.Background(), batch), "max trace size") // push a trace - require.NoError(t, c.EmitBatch(context.Background(), makeThriftBatchWithSpanCount(1))) + require.NoError(t, c.EmitBatch(context.Background(), util.MakeThriftBatchWithSpanCount(1))) // should fail b/c this will be too many traces - batch = makeThriftBatch() + batch = util.MakeThriftBatch() require.NoError(t, c.EmitBatch(context.Background(), batch), "too many traces") // should fail b/c due to ingestion rate limit - batch = makeThriftBatchWithSpanCount(10) + batch = util.MakeThriftBatchWithSpanCount(10) err = c.EmitBatch(context.Background(), batch) require.Error(t, err) @@ -105,8 +106,8 @@ func TestOTLPLimits(t *testing.T) { require.NoError(t, err) defer s.Close() - require.NoError(t, util.CopyFileToSharedDir(s, configLimits, "config.yaml")) - tempo := util.NewTempoAllInOne() + require.NoError(t, util2.CopyFileToSharedDir(s, configLimits, "config.yaml")) + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) protoSpans := test.MakeProtoSpans(100) @@ -141,8 +142,8 @@ func TestOTLPLimitsVanillaClient(t *testing.T) { require.NoError(t, err) defer s.Close() - require.NoError(t, util.CopyFileToSharedDir(s, configLimits, "config.yaml")) - tempo := util.NewTempoAllInOne() + require.NoError(t, util2.CopyFileToSharedDir(s, configLimits, "config.yaml")) + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) trace := test.MakeTrace(10, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) @@ -203,22 +204,22 @@ func TestQueryLimits(t *testing.T) { require.NoError(t, err) defer s.Close() - require.NoError(t, util.CopyFileToSharedDir(s, configLimitsQuery, "config.yaml")) - tempo := util.NewTempoAllInOne() + require.NoError(t, util2.CopyFileToSharedDir(s, configLimitsQuery, "config.yaml")) + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) // Get port for the otlp receiver endpoint - c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) + c, err := util2.NewJaegerGRPCClient(tempo.Endpoint(14250)) require.NoError(t, err) require.NotNil(t, c) // make a trace with 10 spans and push them one at a time, flush in between each one to force different blocks - batch := makeThriftBatchWithSpanCount(5) + batch := util.MakeThriftBatchWithSpanCount(5) allSpans := batch.Spans for i := range batch.Spans { batch.Spans = allSpans[i : i+1] require.NoError(t, c.EmitBatch(context.Background(), batch)) - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(2 * time.Second) // trace idle and flush time are both 1ms } @@ -254,12 +255,12 @@ func TestLimitsPartialSuccess(t *testing.T) { s, err := e2e.NewScenario("tempo_e2e") require.NoError(t, err) defer s.Close() - require.NoError(t, util.CopyFileToSharedDir(s, configLimitsPartialError, "config.yaml")) - tempo := util.NewTempoAllInOne() + require.NoError(t, util2.CopyFileToSharedDir(s, configLimitsPartialError, "config.yaml")) + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) // otel grpc exporter - exporter, err := util.NewOtelGRPCExporter(tempo.Endpoint(4317)) + exporter, err := util2.NewOtelGRPCExporter(tempo.Endpoint(4317)) require.NoError(t, err) err = exporter.Start(context.Background(), componenttest.NewNopHost()) @@ -329,22 +330,22 @@ func TestQueryRateLimits(t *testing.T) { require.NoError(t, err) defer s.Close() - require.NoError(t, util.CopyFileToSharedDir(s, configLimits429, "config.yaml")) - tempo := util.NewTempoAllInOne() + require.NoError(t, util2.CopyFileToSharedDir(s, configLimits429, "config.yaml")) + tempo := util2.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) // Get port for the otlp receiver endpoint - c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) + c, err := util2.NewJaegerGRPCClient(tempo.Endpoint(14250)) require.NoError(t, err) require.NotNil(t, c) // make a trace with 10 spans and push them one at a time, flush in between each one to force different blocks - batch := makeThriftBatchWithSpanCount(5) + batch := util.MakeThriftBatchWithSpanCount(5) allSpans := batch.Spans for i := range batch.Spans { batch.Spans = allSpans[i : i+1] require.NoError(t, c.EmitBatch(context.Background(), batch)) - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(2 * time.Second) // trace idle and flush time are both 1ms } // now try to query it back. this should fail b/c the frontend queue doesn't have room @@ -365,7 +366,7 @@ func TestQueryRateLimits(t *testing.T) { require.ErrorContains(t, err, "failed with response: 429") // 429 GRPC Search - grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) + grpcClient, err := util2.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) require.NoError(t, err) resp, err := grpcClient.Search(context.Background(), &tempopb.SearchRequest{ diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index f7278c865f3..c660cf5a773 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -12,14 +12,13 @@ import ( "time" "github.com/grafana/e2e" + "github.com/grafana/tempo/integration/util" thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - util "github.com/grafana/tempo/integration" ) const ( diff --git a/integration/e2e/multi_tenant_test.go b/integration/e2e/multi_tenant_test.go index cb9e4013324..4a959e54bff 100644 --- a/integration/e2e/multi_tenant_test.go +++ b/integration/e2e/multi_tenant_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/dskit/user" "github.com/grafana/e2e" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/pkg/collector" "github.com/grafana/tempo/pkg/httpclient" "github.com/grafana/tempo/pkg/tempopb" @@ -20,7 +21,6 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/tempo/cmd/tempo/app" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/integration/e2e/backend" ) @@ -89,7 +89,7 @@ func testSearch(t *testing.T, tenant string, tenantSize int) { traceMap = getAttrsAndSpanNames(trace) // store it to assert tests require.NoError(t, err) - expectedSpans = expectedSpans + spanCount(trace) + expectedSpans = expectedSpans + util.SpanCount(trace) } // assert that we have one trace and each tenant and correct number of spans received @@ -100,7 +100,7 @@ func testSearch(t *testing.T, tenant string, tenantSize int) { time.Sleep(time.Second * 3) // test echo - assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") + util.AssertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") // client will have testcase tenant id apiClient := httpclient.New("http://"+tempo.Endpoint(3200), tenant) @@ -115,14 +115,14 @@ func testSearch(t *testing.T, tenant string, tenantSize int) { assert.ElementsMatch(t, traceMap.spanNames, respTm.spanNames) // flush trace to backend - callFlush(t, tempo) + util.CallFlush(t, tempo) // search and traceql search, note: SearchAndAssertTrace also calls SearchTagValues util.SearchAndAssertTrace(t, apiClient, info) util.SearchTraceQLAndAssertTrace(t, apiClient, info) // force clear completed block - callFlush(t, tempo) + util.CallFlush(t, tempo) // wait for flush to complete for all tenants, each tenant will have one block require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(float64(tenantSize)), "tempo_ingester_blocks_flushed_total")) diff --git a/integration/e2e/overrides_test.go b/integration/e2e/overrides_test.go index 2e49dbbf476..90d278d2e82 100644 --- a/integration/e2e/overrides_test.go +++ b/integration/e2e/overrides_test.go @@ -12,12 +12,18 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/tempo/cmd/tempo/app" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/integration/e2e/backend" + "github.com/grafana/tempo/integration/util" "github.com/grafana/tempo/modules/overrides/userconfigurable/client" "github.com/grafana/tempo/pkg/httpclient" ) +const ( + configAllInOneS3 = "deployments/config-all-in-one-s3.yaml" + configAllInOneAzurite = "deployments/config-all-in-one-azurite.yaml" + configAllInOneGCS = "deployments/config-all-in-one-gcs.yaml" +) + func TestOverrides(t *testing.T) { testBackends := []struct { name string diff --git a/integration/e2e/query_plugin_test.go b/integration/e2e/query_plugin_test.go index 2f96986d5b0..900afa6cb6d 100644 --- a/integration/e2e/query_plugin_test.go +++ b/integration/e2e/query_plugin_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/grafana/e2e" - util "github.com/grafana/tempo/integration" + "github.com/grafana/tempo/integration/util" thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -100,9 +100,9 @@ func TestSearchUsingBackendTagsService(t *testing.T) { // Wait for the traces to be written to the WAL time.Sleep(time.Second * 3) - callFlush(t, tempo) + util.CallFlush(t, tempo) time.Sleep(time.Second * 1) - callFlush(t, tempo) + util.CallFlush(t, tempo) callJaegerQuerySearchServicesAssert(t, tempoQuery, servicesOrOpJaegerQueryResponse{ Data: []string{ diff --git a/integration/e2e/receivers_test.go b/integration/e2e/receivers_test.go index 8ad27f0a4ab..5822dc39190 100644 --- a/integration/e2e/receivers_test.go +++ b/integration/e2e/receivers_test.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/dskit/user" "github.com/grafana/e2e" + "github.com/grafana/tempo/integration/util" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -22,14 +23,13 @@ import ( tracenoop "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/pkg/httpclient" tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" ) const ( - configAllInOneLocal = "config-all-in-one-local.yaml" + configAllInOneLocal = "deployments/config-all-in-one-local.yaml" ) func TestReceivers(t *testing.T) { @@ -146,7 +146,7 @@ func TestReceivers(t *testing.T) { require.NoError(t, err) // just compare spanCount because otel flattens all ILS into one - assert.Equal(t, spanCount(req), spanCount(trace)) + assert.Equal(t, util.SpanCount(req), util.SpanCount(trace)) }) } } diff --git a/integration/e2e/serverless/serverless_test.go b/integration/e2e/serverless/serverless_test.go index 69b7e3d101a..c9c8e880d47 100644 --- a/integration/e2e/serverless/serverless_test.go +++ b/integration/e2e/serverless/serverless_test.go @@ -4,13 +4,13 @@ import ( "testing" "time" + "github.com/grafana/tempo/integration/util" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/e2e" e2e_db "github.com/grafana/e2e/db" - util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/pkg/httpclient" tempoUtil "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb/backend" diff --git a/integration/poller/poller_test.go b/integration/poller/poller_test.go index 35c6ebe15d7..798f1105c30 100644 --- a/integration/poller/poller_test.go +++ b/integration/poller/poller_test.go @@ -12,12 +12,12 @@ import ( "github.com/go-kit/log" "github.com/google/uuid" "github.com/grafana/e2e" + "github.com/grafana/tempo/integration/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" "github.com/grafana/tempo/cmd/tempo/app" - util "github.com/grafana/tempo/integration" e2eBackend "github.com/grafana/tempo/integration/e2e/backend" "github.com/grafana/tempo/pkg/blockboundary" "github.com/grafana/tempo/tempodb/backend" diff --git a/integration/util.go b/integration/util/util.go similarity index 76% rename from integration/util.go rename to integration/util/util.go index bb820a11873..53791b23274 100644 --- a/integration/util.go +++ b/integration/util/util.go @@ -1,14 +1,17 @@ -package integration +package util // Collection of utilities to share between our various load tests import ( "bytes" "context" + "encoding/json" "errors" "fmt" "html/template" "io" + "math/rand" + "net/http" "os" "path/filepath" "strconv" @@ -18,7 +21,10 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/e2e" + "github.com/grafana/tempo/pkg/model/trace" jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" + thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -470,3 +476,140 @@ func traceIDInResults(t *testing.T, hexID string, resp *tempopb.SearchResponse) return false } + +func MakeThriftBatch() *thrift.Batch { + return MakeThriftBatchWithSpanCount(1) +} + +func MakeThriftBatchWithSpanCount(n int) *thrift.Batch { + return MakeThriftBatchWithSpanCountAttributeAndName(n, "my operation", "", "y", "xx", "x") +} + +func MakeThriftBatchWithSpanCountAttributeAndName(n int, name, resourceValue, spanValue, resourceTag, spanTag string) *thrift.Batch { + var spans []*thrift.Span + + traceIDLow := rand.Int63() + traceIDHigh := rand.Int63() + for i := 0; i < n; i++ { + spans = append(spans, &thrift.Span{ + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: rand.Int63(), + ParentSpanId: 0, + OperationName: name, + References: nil, + Flags: 0, + StartTime: time.Now().UnixNano() / 1000, // microsecconds + Duration: 1, + Tags: []*thrift.Tag{ + { + Key: spanTag, + VStr: &spanValue, + }, + }, + Logs: nil, + }) + } + + return &thrift.Batch{ + Process: &thrift.Process{ + ServiceName: "my-service", + Tags: []*thrift.Tag{ + { + Key: resourceTag, + VType: thrift.TagType_STRING, + VStr: &resourceValue, + }, + }, + }, + Spans: spans, + } +} + +func CallFlush(t *testing.T, ingester *e2e.HTTPService) { + fmt.Printf("Calling /flush on %s\n", ingester.Name()) + res, err := e2e.DoGet("http://" + ingester.Endpoint(3200) + "/flush") + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, res.StatusCode) +} + +func CallIngesterRing(t *testing.T, svc *e2e.HTTPService) { + endpoint := "/ingester/ring" + fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) + res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) +} + +func CallCompactorRing(t *testing.T, svc *e2e.HTTPService) { + endpoint := "/compactor/ring" + fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) + res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) +} + +func CallStatus(t *testing.T, svc *e2e.HTTPService) { + endpoint := "/status/endpoints" + fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) + res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) +} + +func CallBuildinfo(t *testing.T, svc *e2e.HTTPService) { + endpoint := "/api/status/buildinfo" + fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) + res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) + + // Check that the actual JSON response contains all the expected keys (we disregard the values) + var jsonResponse map[string]any + keys := []string{"version", "revision", "branch", "buildDate", "buildUser", "goVersion"} + body, err := io.ReadAll(res.Body) + require.NoError(t, err) + err = json.Unmarshal(body, &jsonResponse) + require.NoError(t, err) + for _, key := range keys { + _, ok := jsonResponse[key] + require.True(t, ok) + } + defer res.Body.Close() +} + +func AssertEcho(t *testing.T, url string) { + res, err := e2e.DoGet(url) + require.NoError(t, err) + require.Equal(t, http.StatusOK, res.StatusCode) + defer res.Body.Close() +} + +func QueryAndAssertTrace(t *testing.T, client *httpclient.Client, info *tempoUtil.TraceInfo) { + resp, err := client.QueryTrace(info.HexID()) + require.NoError(t, err) + + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) + + AssertEqualTrace(t, resp, expected) +} + +func AssertEqualTrace(t *testing.T, a, b *tempopb.Trace) { + t.Helper() + trace.SortTraceAndAttributes(a) + trace.SortTraceAndAttributes(b) + + assert.Equal(t, a, b) +} + +func SpanCount(a *tempopb.Trace) float64 { + count := 0 + for _, batch := range a.ResourceSpans { + for _, spans := range batch.ScopeSpans { + count += len(spans.Spans) + } + } + + return float64(count) +}