From 77ac9ec3d2f3297b36ef18f25d8e5eadfcc35650 Mon Sep 17 00:00:00 2001 From: Piotr Grabowski Date: Thu, 24 Oct 2024 14:58:46 +0200 Subject: [PATCH] Add integration tests for new "*" configurations (#913) Add `test_wildcard_clickhouse.go` test which checks the `"*": target: [clickhouse]` scenario (everything goes to ClickHouse) and `test_wildcard_disabled.go` test which checks the `"*": target: []` scenario (all query/ingest is disabled except for explicitly configured indexes). Additionally Two small bug fixes related to `"*"` configuration introduced in b520acafb2e2a91ee91b51d4cfe058a4d081c54f: `target: []` is now allowed for query processor indexes and `IsClosed` of the decision is now properly filled in for `makeDefaultWildcard` rule. --- .../quesma-wildcard-clickhouse.yml.template | 44 ++++++++ .../quesma-wildcard-disabled.yml.template | 56 ++++++++++ ci/it/integration_test.go | 10 ++ .../test_reading_clickhouse_tables.go | 16 ++- ci/it/testcases/test_two_pipelines.go | 7 +- ci/it/testcases/test_wildcard_clickhouse.go | 99 +++++++++++++++++ ci/it/testcases/test_wildcard_disabled.go | 104 ++++++++++++++++++ quesma/quesma/config/config_v2.go | 4 +- quesma/table_resolver/rules.go | 1 + 9 files changed, 333 insertions(+), 8 deletions(-) create mode 100644 ci/it/configs/quesma-wildcard-clickhouse.yml.template create mode 100644 ci/it/configs/quesma-wildcard-disabled.yml.template create mode 100644 ci/it/testcases/test_wildcard_clickhouse.go create mode 100644 ci/it/testcases/test_wildcard_disabled.go diff --git a/ci/it/configs/quesma-wildcard-clickhouse.yml.template b/ci/it/configs/quesma-wildcard-clickhouse.yml.template new file mode 100644 index 000000000..514c93b34 --- /dev/null +++ b/ci/it/configs/quesma-wildcard-clickhouse.yml.template @@ -0,0 +1,44 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: my-clickhouse-instance + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +ingestStatistics: true +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + "*": + target: [ my-clickhouse-instance ] + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + "*": + target: [ my-clickhouse-instance ] +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ] + diff --git a/ci/it/configs/quesma-wildcard-disabled.yml.template b/ci/it/configs/quesma-wildcard-disabled.yml.template new file mode 100644 index 000000000..3587e2721 --- /dev/null +++ b/ci/it/configs/quesma-wildcard-disabled.yml.template @@ -0,0 +1,56 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: my-clickhouse-instance + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +ingestStatistics: true +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + "explicitly_disabled1": + target: [ ] + "explicitly_disabled3": + target: [ ] + "query_enabled": + target: [ my-clickhouse-instance ] + "*": + target: [ ] + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + "explicitly_disabled2": + target: [ ] + "explicitly_disabled3": + target: [ ] + "ingest_enabled": + target: [ my-clickhouse-instance ] + "*": + target: [ ] +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ] + diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index 8e304996e..5aa51b573 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -39,3 +39,13 @@ func TestDualWriteAndCommonTableTestcase(t *testing.T) { testCase := testcases.NewDualWriteAndCommonTableTestcase() runIntegrationTest(t, testCase) } + +func TestWildcardDisabledTestcase(t *testing.T) { + testCase := testcases.NewWildcardDisabledTestcase() + runIntegrationTest(t, testCase) +} + +func TestWildcardClickhouseTestcase(t *testing.T) { + testCase := testcases.NewWildcardClickhouseTestcase() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_reading_clickhouse_tables.go b/ci/it/testcases/test_reading_clickhouse_tables.go index 978cba625..b1dbc751a 100644 --- a/ci/it/testcases/test_reading_clickhouse_tables.go +++ b/ci/it/testcases/test_reading_clickhouse_tables.go @@ -6,6 +6,7 @@ package testcases import ( "context" "encoding/json" + "fmt" "github.com/stretchr/testify/assert" "net/http" "testing" @@ -36,6 +37,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) RunTests(ctx context.Contex t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) }) t.Run("test random thing", func(t *testing.T) { a.testRandomThing(ctx, t) }) t.Run("test wildcard goes to elastic", func(t *testing.T) { a.testWildcardGoesToElastic(ctx, t) }) + t.Run("test ingest is disabled", func(t *testing.T) { a.testIngestIsDisabled(ctx, t) }) return nil } @@ -91,5 +93,15 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(c assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) } -// At this moment this configuration does not disable ingest (ingest req's will get routed to ES and handled normally) -// Future test idea -> ensure ingest req gets rejected. +func (a *ReadingClickHouseTablesIntegrationTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) { + // There is no ingest pipeline, so Quesma should reject all ingest requests + for _, tt := range []string{"test_table", "extra_index"} { + t.Run(tt, func(t *testing.T) { + resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 11111}`)) + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) + }) + } +} diff --git a/ci/it/testcases/test_two_pipelines.go b/ci/it/testcases/test_two_pipelines.go index 53310e4eb..66a8850ff 100644 --- a/ci/it/testcases/test_two_pipelines.go +++ b/ci/it/testcases/test_two_pipelines.go @@ -47,14 +47,13 @@ func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) { // Given an index in Elasticsearch which falls under `*` in the configuration - var err error - if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil { + if _, err := a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil { t.Fatalf("Failed to create index: %s", err) } - if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { + if _, err := a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil { t.Fatalf("Failed to insert document: %s", err) } - if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil { + if _, err := a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil { t.Fatalf("Failed to refresh index: %s", err) } // When Quesma searches for that document diff --git a/ci/it/testcases/test_wildcard_clickhouse.go b/ci/it/testcases/test_wildcard_clickhouse.go new file mode 100644 index 000000000..8aa914183 --- /dev/null +++ b/ci/it/testcases/test_wildcard_clickhouse.go @@ -0,0 +1,99 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package testcases + +import ( + "context" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +type WildcardClickhouseTestcase struct { + IntegrationTestcaseBase +} + +func NewWildcardClickhouseTestcase() *WildcardClickhouseTestcase { + return &WildcardClickhouseTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-wildcard-clickhouse.yml.template", + }, + } +} + +func (a *WildcardClickhouseTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + if err != nil { + return err + } + a.Containers = containers + return nil +} + +func (a *WildcardClickhouseTestcase) RunTests(ctx context.Context, t *testing.T) error { + t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) }) + t.Run("test ingest+query works", func(t *testing.T) { a.testIngestQueryWorks(ctx, t) }) + t.Run("test clickhouse table autodiscovery", func(t *testing.T) { a.testClickHouseTableAutodiscovery(ctx, t) }) + return nil +} + +func (a *WildcardClickhouseTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + resp, _ := a.RequestToQuesma(ctx, t, "GET", "/", nil) + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func (a *WildcardClickhouseTestcase) testIngestQueryWorks(ctx context.Context, t *testing.T) { + // First ingest... + resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/test_index/_doc", []byte(`{"name": "Piotr", "age": 22222}`)) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) + + // ...then query inserted data + resp, bodyBytes = a.RequestToQuesma(ctx, t, "POST", "/test_index/_search", []byte(`{"query": {"match_all": {}}}`)) + assert.Contains(t, string(bodyBytes), "Piotr") + assert.Contains(t, string(bodyBytes), "22222") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) + + // Also make sure no such index got created in Elasticsearch + resp, err := a.RequestToElasticsearch(ctx, "GET", "/test_index/_refresh", nil) + if err != nil { + t.Fatalf("Failed to make GET request: %s", err) + } + defer resp.Body.Close() + bodyBytes, err = io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read response body: %s", err) + } + + assert.Equal(t, http.StatusNotFound, resp.StatusCode) + assert.Contains(t, string(bodyBytes), "no such index [test_index]") +} + +func (a *WildcardClickhouseTestcase) testClickHouseTableAutodiscovery(ctx context.Context, t *testing.T) { + // Create test table in ClickHouse + createTableQuery := "CREATE TABLE IF NOT EXISTS existing_clickhouse_table (id UInt32, name String) ENGINE = Memory" + if _, err := a.ExecuteClickHouseStatement(ctx, createTableQuery); err != nil { + t.Fatalf("Failed to create table: %s", err) + } + insertRowsQuery := "INSERT INTO existing_clickhouse_table (id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')" + if _, err := a.ExecuteClickHouseStatement(ctx, insertRowsQuery); err != nil { + t.Fatalf("Failed to insert rows: %s", err) + } + + resp, _ := a.RequestToQuesma(ctx, t, "POST", "/existing_clickhouse_table/_search", []byte(`{"query": {"match_all": {}}}`)) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + + // This returns 500 Internal Server Error, but will be tackled in separate PR. + // (The table has not yet been discovered by Quesma ) + // + // assert.Equal(t, http.StatusOK, resp.StatusCode) + // assert.Contains(t, string(bodyBytes), "Alice") + // assert.Contains(t, string(bodyBytes), "Bob") + // assert.Contains(t, string(bodyBytes), "Charlie") + // assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} diff --git a/ci/it/testcases/test_wildcard_disabled.go b/ci/it/testcases/test_wildcard_disabled.go new file mode 100644 index 000000000..bc186ea19 --- /dev/null +++ b/ci/it/testcases/test_wildcard_disabled.go @@ -0,0 +1,104 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package testcases + +import ( + "context" + "fmt" + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +type WildcardDisabledTestcase struct { + IntegrationTestcaseBase +} + +func NewWildcardDisabledTestcase() *WildcardDisabledTestcase { + return &WildcardDisabledTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-wildcard-disabled.yml.template", + }, + } +} + +func (a *WildcardDisabledTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + if err != nil { + return err + } + a.Containers = containers + return nil +} + +func (a *WildcardDisabledTestcase) RunTests(ctx context.Context, t *testing.T) error { + t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) }) + t.Run("test query is disabled", func(t *testing.T) { a.testQueryIsDisabled(ctx, t) }) + t.Run("test ingest is disabled", func(t *testing.T) { a.testIngestIsDisabled(ctx, t) }) + t.Run("test explicit index query enabled", func(t *testing.T) { a.testExplicitIndexQueryIsEnabled(ctx, t) }) + t.Run("test explicit index ingest enabled", func(t *testing.T) { a.testExplicitIndexIngestIsEnabled(ctx, t) }) + return nil +} + +func (a *WildcardDisabledTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + resp, _ := a.RequestToQuesma(ctx, t, "GET", "/", nil) + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func (a *WildcardDisabledTestcase) testQueryIsDisabled(ctx context.Context, t *testing.T) { + if _, err := a.RequestToElasticsearch(ctx, "PUT", "/elastic_index1", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err := a.RequestToElasticsearch(ctx, "POST", "/elastic_index1/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + + // Quesma should reject all queries + for _, tt := range []string{"test_table", "extra_index", "explicitly_disabled1", "explicitly_disabled2", "explicitly_disabled3", "ingest_enabled", "elastic_index1"} { + t.Run(tt, func(t *testing.T) { + resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_search", tt), []byte(`{"query": {"match_all": {}}}`)) + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) + }) + } +} + +func (a *WildcardDisabledTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) { + if _, err := a.RequestToElasticsearch(ctx, "PUT", "/elastic_index2", nil); err != nil { + t.Fatalf("Failed to create index: %s", err) + } + if _, err := a.RequestToElasticsearch(ctx, "POST", "/elastic_index2/_refresh", nil); err != nil { + t.Fatalf("Failed to refresh index: %s", err) + } + + // Quesma should reject all ingest requests + for _, tt := range []string{"test_table", "extra_index", "explicitly_disabled1", "explicitly_disabled2", "explicitly_disabled3", "query_enabled", "elastic_index2"} { + t.Run(tt, func(t *testing.T) { + resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 22222}`)) + assert.Contains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) + }) + } +} + +func (a *WildcardDisabledTestcase) testExplicitIndexQueryIsEnabled(ctx context.Context, t *testing.T) { + // query_enabled is the only index with query enabled + resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/query_enabled/_search", []byte(`{"query": {"match_all": {}}}`)) + assert.NotContains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + // TODO: the actual request currently fails since there's no such table in ClickHouse +} + +func (a *WildcardDisabledTestcase) testExplicitIndexIngestIsEnabled(ctx context.Context, t *testing.T) { + // ingest_enabled is the only index with ingest enabled + resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/ingest_enabled/_doc", []byte(`{"name": "Piotr", "age": 22222}`)) + assert.NotContains(t, string(bodyBytes), "index_closed_exception") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) +} diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index c74c79793..9e10e73c3 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -381,8 +381,8 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error { return fmt.Errorf("index name '%s' in processor configuration is an index pattern, not allowed", indexName) } if p.Type == QuesmaV1ProcessorQuery { - if len(indexConfig.Target) != 1 && len(indexConfig.Target) != 2 { - return fmt.Errorf("configuration of index %s must have one or two targets (query processor)", indexName) + if len(indexConfig.Target) > 2 { + return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName) } } else { if len(indexConfig.Target) > 2 { diff --git a/quesma/table_resolver/rules.go b/quesma/table_resolver/rules.go index 8ec27fa8d..3650198d9 100644 --- a/quesma/table_resolver/rules.go +++ b/quesma/table_resolver/rules.go @@ -94,6 +94,7 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) return &Decision{ UseConnectors: useConnectors, + IsClosed: len(useConnectors) == 0, Reason: fmt.Sprintf("Using default wildcard ('%s') configuration for %s processor", config.DefaultWildcardIndexName, pipeline), } }