Skip to content

Commit

Permalink
Add integration tests for new "*" configurations (#913)
Browse files Browse the repository at this point in the history
Add `test_wildcard_clickhouse.go` test which checks the `"*": target:
[clickhouse]` scenario (everything goes to ClickHouse) and
`test_wildcard_disabled.go` test which checks the `"*": target: []`
scenario (all query/ingest is disabled except for explicitly configured
indexes).

Additionally Two small bug fixes related to `"*"` configuration
introduced in b520aca: `target: []` is
now allowed for query processor indexes and `IsClosed` of the decision
is now properly filled in for `makeDefaultWildcard` rule.
  • Loading branch information
avelanarius authored Oct 24, 2024
1 parent 1bdac85 commit 77ac9ec
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 8 deletions.
44 changes: 44 additions & 0 deletions ci/it/configs/quesma-wildcard-clickhouse.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
"*":
target: [ my-clickhouse-instance ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
"*":
target: [ my-clickhouse-instance ]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]

56 changes: 56 additions & 0 deletions ci/it/configs/quesma-wildcard-disabled.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-instance
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
"explicitly_disabled1":
target: [ ]
"explicitly_disabled3":
target: [ ]
"query_enabled":
target: [ my-clickhouse-instance ]
"*":
target: [ ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
"explicitly_disabled2":
target: [ ]
"explicitly_disabled3":
target: [ ]
"ingest_enabled":
target: [ my-clickhouse-instance ]
"*":
target: [ ]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-instance ]

10 changes: 10 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ func TestDualWriteAndCommonTableTestcase(t *testing.T) {
testCase := testcases.NewDualWriteAndCommonTableTestcase()
runIntegrationTest(t, testCase)
}

func TestWildcardDisabledTestcase(t *testing.T) {
testCase := testcases.NewWildcardDisabledTestcase()
runIntegrationTest(t, testCase)
}

func TestWildcardClickhouseTestcase(t *testing.T) {
testCase := testcases.NewWildcardClickhouseTestcase()
runIntegrationTest(t, testCase)
}
16 changes: 14 additions & 2 deletions ci/it/testcases/test_reading_clickhouse_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package testcases
import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
Expand Down Expand Up @@ -36,6 +37,7 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) RunTests(ctx context.Contex
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test random thing", func(t *testing.T) { a.testRandomThing(ctx, t) })
t.Run("test wildcard goes to elastic", func(t *testing.T) { a.testWildcardGoesToElastic(ctx, t) })
t.Run("test ingest is disabled", func(t *testing.T) { a.testIngestIsDisabled(ctx, t) })
return nil
}

Expand Down Expand Up @@ -91,5 +93,15 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(c
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

// At this moment this configuration does not disable ingest (ingest req's will get routed to ES and handled normally)
// Future test idea -> ensure ingest req gets rejected.
func (a *ReadingClickHouseTablesIntegrationTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) {
// There is no ingest pipeline, so Quesma should reject all ingest requests
for _, tt := range []string{"test_table", "extra_index"} {
t.Run(tt, func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 11111}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}
}
7 changes: 3 additions & 4 deletions ci/it/testcases/test_two_pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ func (a *QueryAndIngestPipelineTestcase) testBasicRequest(ctx context.Context, t

func (a *QueryAndIngestPipelineTestcase) testWildcardGoesToElastic(ctx context.Context, t *testing.T) {
// Given an index in Elasticsearch which falls under `*` in the configuration
var err error
if _, err = a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil {
if _, err := a.RequestToElasticsearch(ctx, "PUT", "/unmentioned_index", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil {
if _, err := a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_doc/1", []byte(`{"name": "Alice"}`)); err != nil {
t.Fatalf("Failed to insert document: %s", err)
}
if _, err = a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil {
if _, err := a.RequestToElasticsearch(ctx, "POST", "/unmentioned_index/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}
// When Quesma searches for that document
Expand Down
99 changes: 99 additions & 0 deletions ci/it/testcases/test_wildcard_clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"testing"
)

type WildcardClickhouseTestcase struct {
IntegrationTestcaseBase
}

func NewWildcardClickhouseTestcase() *WildcardClickhouseTestcase {
return &WildcardClickhouseTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-wildcard-clickhouse.yml.template",
},
}
}

func (a *WildcardClickhouseTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
if err != nil {
return err
}
a.Containers = containers
return nil
}

func (a *WildcardClickhouseTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test ingest+query works", func(t *testing.T) { a.testIngestQueryWorks(ctx, t) })
t.Run("test clickhouse table autodiscovery", func(t *testing.T) { a.testClickHouseTableAutodiscovery(ctx, t) })
return nil
}

func (a *WildcardClickhouseTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "GET", "/", nil)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func (a *WildcardClickhouseTestcase) testIngestQueryWorks(ctx context.Context, t *testing.T) {
// First ingest...
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/test_index/_doc", []byte(`{"name": "Piotr", "age": 22222}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))

// ...then query inserted data
resp, bodyBytes = a.RequestToQuesma(ctx, t, "POST", "/test_index/_search", []byte(`{"query": {"match_all": {}}}`))
assert.Contains(t, string(bodyBytes), "Piotr")
assert.Contains(t, string(bodyBytes), "22222")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))

// Also make sure no such index got created in Elasticsearch
resp, err := a.RequestToElasticsearch(ctx, "GET", "/test_index/_refresh", nil)
if err != nil {
t.Fatalf("Failed to make GET request: %s", err)
}
defer resp.Body.Close()
bodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %s", err)
}

assert.Equal(t, http.StatusNotFound, resp.StatusCode)
assert.Contains(t, string(bodyBytes), "no such index [test_index]")
}

func (a *WildcardClickhouseTestcase) testClickHouseTableAutodiscovery(ctx context.Context, t *testing.T) {
// Create test table in ClickHouse
createTableQuery := "CREATE TABLE IF NOT EXISTS existing_clickhouse_table (id UInt32, name String) ENGINE = Memory"
if _, err := a.ExecuteClickHouseStatement(ctx, createTableQuery); err != nil {
t.Fatalf("Failed to create table: %s", err)
}
insertRowsQuery := "INSERT INTO existing_clickhouse_table (id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')"
if _, err := a.ExecuteClickHouseStatement(ctx, insertRowsQuery); err != nil {
t.Fatalf("Failed to insert rows: %s", err)
}

resp, _ := a.RequestToQuesma(ctx, t, "POST", "/existing_clickhouse_table/_search", []byte(`{"query": {"match_all": {}}}`))
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))

// This returns 500 Internal Server Error, but will be tackled in separate PR.
// (The table has not yet been discovered by Quesma )
//
// assert.Equal(t, http.StatusOK, resp.StatusCode)
// assert.Contains(t, string(bodyBytes), "Alice")
// assert.Contains(t, string(bodyBytes), "Bob")
// assert.Contains(t, string(bodyBytes), "Charlie")
// assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}
104 changes: 104 additions & 0 deletions ci/it/testcases/test_wildcard_disabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
)

type WildcardDisabledTestcase struct {
IntegrationTestcaseBase
}

func NewWildcardDisabledTestcase() *WildcardDisabledTestcase {
return &WildcardDisabledTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-wildcard-disabled.yml.template",
},
}
}

func (a *WildcardDisabledTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
if err != nil {
return err
}
a.Containers = containers
return nil
}

func (a *WildcardDisabledTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test query is disabled", func(t *testing.T) { a.testQueryIsDisabled(ctx, t) })
t.Run("test ingest is disabled", func(t *testing.T) { a.testIngestIsDisabled(ctx, t) })
t.Run("test explicit index query enabled", func(t *testing.T) { a.testExplicitIndexQueryIsEnabled(ctx, t) })
t.Run("test explicit index ingest enabled", func(t *testing.T) { a.testExplicitIndexIngestIsEnabled(ctx, t) })
return nil
}

func (a *WildcardDisabledTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "GET", "/", nil)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func (a *WildcardDisabledTestcase) testQueryIsDisabled(ctx context.Context, t *testing.T) {
if _, err := a.RequestToElasticsearch(ctx, "PUT", "/elastic_index1", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err := a.RequestToElasticsearch(ctx, "POST", "/elastic_index1/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}

// Quesma should reject all queries
for _, tt := range []string{"test_table", "extra_index", "explicitly_disabled1", "explicitly_disabled2", "explicitly_disabled3", "ingest_enabled", "elastic_index1"} {
t.Run(tt, func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_search", tt), []byte(`{"query": {"match_all": {}}}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}
}

func (a *WildcardDisabledTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) {
if _, err := a.RequestToElasticsearch(ctx, "PUT", "/elastic_index2", nil); err != nil {
t.Fatalf("Failed to create index: %s", err)
}
if _, err := a.RequestToElasticsearch(ctx, "POST", "/elastic_index2/_refresh", nil); err != nil {
t.Fatalf("Failed to refresh index: %s", err)
}

// Quesma should reject all ingest requests
for _, tt := range []string{"test_table", "extra_index", "explicitly_disabled1", "explicitly_disabled2", "explicitly_disabled3", "query_enabled", "elastic_index2"} {
t.Run(tt, func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 22222}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}
}

func (a *WildcardDisabledTestcase) testExplicitIndexQueryIsEnabled(ctx context.Context, t *testing.T) {
// query_enabled is the only index with query enabled
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/query_enabled/_search", []byte(`{"query": {"match_all": {}}}`))
assert.NotContains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
// TODO: the actual request currently fails since there's no such table in ClickHouse
}

func (a *WildcardDisabledTestcase) testExplicitIndexIngestIsEnabled(ctx context.Context, t *testing.T) {
// ingest_enabled is the only index with ingest enabled
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/ingest_enabled/_doc", []byte(`{"name": "Piotr", "age": 22222}`))
assert.NotContains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}
4 changes: 2 additions & 2 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
return fmt.Errorf("index name '%s' in processor configuration is an index pattern, not allowed", indexName)
}
if p.Type == QuesmaV1ProcessorQuery {
if len(indexConfig.Target) != 1 && len(indexConfig.Target) != 2 {
return fmt.Errorf("configuration of index %s must have one or two targets (query processor)", indexName)
if len(indexConfig.Target) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
}
} else {
if len(indexConfig.Target) > 2 {
Expand Down
1 change: 1 addition & 0 deletions quesma/table_resolver/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string)

return &Decision{
UseConnectors: useConnectors,
IsClosed: len(useConnectors) == 0,
Reason: fmt.Sprintf("Using default wildcard ('%s') configuration for %s processor", config.DefaultWildcardIndexName, pipeline),
}
}
Expand Down

0 comments on commit 77ac9ec

Please sign in to comment.