Skip to content

Commit

Permalink
Don't remove Elasticsarch indices/datastreams from _resolve endpoint (
Browse files Browse the repository at this point in the history
#989)

I guess this is due to some historical reasons which are no longer
accurate.
After this patch, you can create data stream with ES index without
issues:
<img width="600" alt="image"
src="https://github.com/user-attachments/assets/d6b48cdd-77fb-47ee-82d3-eedb5b0c5031">

At this moment, we simply return all(*) the sources. After all, created
data view may outlive Quesma and related routing configuration.

(*) all the Elasticsearch indices/datastreams/aliases matching desired
pattern plus our ClickHouse tables presented as Data Streams.

Closes: #996
  • Loading branch information
mieciu authored Nov 20, 2024
1 parent 0b40e2c commit d59a9f2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ processors:
target: [ c ]
logs-3:
target: [ c, e ]
logs-dual-query:
## WARNING `logs-dual-query` (with two dashes) falls under default index pattern for logs in Elasticsearch and results in not index, but datastream creation
logs-dual_query:
target: [ c, e ]
logs-4:
target:
Expand All @@ -52,7 +53,7 @@ processors:
target: [ c ]
logs-3:
target: [ c, e ]
logs-dual-query:
logs-dual_query:
target: [ c, e ]
logs-4:
target:
Expand Down
64 changes: 60 additions & 4 deletions ci/it/testcases/test_dual_write_and_common_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (a *DualWriteAndCommonTableTestcase) RunTests(ctx context.Context, t *testi
t.Run("test dual query returns data from clickhouse", func(t *testing.T) { a.testDualQueryReturnsDataFromClickHouse(ctx, t) })
t.Run("test dual writes work", func(t *testing.T) { a.testDualWritesWork(ctx, t) })
t.Run("test wildcard goes to elastic", func(t *testing.T) { a.testWildcardGoesToElastic(ctx, t) })
t.Run("test _resolve/index/* works properly", func(t *testing.T) { a.testResolveEndpointInQuesma(ctx, t) })
return nil
}

Expand Down Expand Up @@ -100,10 +101,10 @@ func (a *DualWriteAndCommonTableTestcase) testIngestToCommonTableWorks(ctx conte
}

func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "POST", "/logs-dual-query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
resp, _ := a.RequestToQuesma(ctx, t, "POST", "/logs-dual_query/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)

chQuery := "SELECT * FROM 'logs-dual-query'"
chQuery := "SELECT * FROM 'logs-dual_query'"
rows, err := a.ExecuteClickHouseQuery(ctx, chQuery)
if err != nil {
t.Fatalf("Failed to execute query: %s", err)
Expand Down Expand Up @@ -140,12 +141,12 @@ func (a *DualWriteAndCommonTableTestcase) testDualQueryReturnsDataFromClickHouse
assert.Equal(t, 31337, age)

// In the meantime let's delete the index from Elasticsearch
_, _ = a.RequestToElasticsearch(ctx, "DELETE", "/logs-dual-query", nil)
resp, _ = a.RequestToElasticsearch(ctx, "DELETE", "/logs-dual_query", nil)
if err != nil {
t.Fatalf("Failed to make DELETE request: %s", err)
}
// FINAL TEST - WHETHER QUESMA RETURNS DATA FROM CLICKHOUSE
resp, bodyBytes := a.RequestToQuesma(ctx, t, "GET", "/logs-dual-query/_search", []byte(`{"query": {"match_all": {}}}`))
resp, bodyBytes := a.RequestToQuesma(ctx, t, "GET", "/logs-dual_query/_search", []byte(`{"query": {"match_all": {}}}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Contains(t, string(bodyBytes), "Przemyslaw")
assert.Contains(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
Expand Down Expand Up @@ -291,3 +292,58 @@ func (a *DualWriteAndCommonTableTestcase) testWildcardGoesToElastic(ctx context.
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

func (a *DualWriteAndCommonTableTestcase) testResolveEndpointInQuesma(ctx context.Context, t *testing.T) {
// When Quesma searches for that document
resp, bodyBytes := a.RequestToQuesma(ctx, t, "GET", "/_resolve/index/*", nil)

var jsonResponse map[string]interface{}
if err := json.Unmarshal(bodyBytes, &jsonResponse); err != nil {
t.Fatalf("Failed to unmarshal response body: %s", err)
}

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
expectedResponse := map[string]interface{}{
"indices": []interface{}{
map[string]interface{}{
"name": "logs-3",
"attributes": []interface{}{"open"},
},
map[string]interface{}{
"name": "quesma_virtual_tables",
"attributes": []interface{}{"open"},
},
map[string]interface{}{
"name": "unmentioned_index",
"attributes": []interface{}{"open"},
},
},
"aliases": []interface{}{},
"data_streams": []interface{}{
map[string]interface{}{
"name": "logs-2",
"backing_indices": []interface{}{"logs-2"},
"timestamp_field": "@timestamp",
},
map[string]interface{}{
"name": "logs-3",
"backing_indices": []interface{}{"logs-3"},
"timestamp_field": "@timestamp",
},
map[string]interface{}{
"name": "logs-4",
"backing_indices": []interface{}{"logs-4"},
"timestamp_field": "@timestamp",
},
map[string]interface{}{
"name": "logs-dual_query",
"backing_indices": []interface{}{"logs-dual_query"},
"timestamp_field": "@timestamp",
},
},
}
assert.ElementsMatchf(t, expectedResponse["indices"], jsonResponse["indices"], "indices do not match")
assert.ElementsMatchf(t, expectedResponse["aliases"], jsonResponse["aliases"], "aliases do not match")
assert.ElementsMatchf(t, expectedResponse["data_streams"], jsonResponse["data_streams"], "data_streams do not match")
}
77 changes: 26 additions & 51 deletions quesma/quesma/functionality/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,48 @@ package resolve

import (
"quesma/elasticsearch"
"quesma/logger"
"quesma/quesma/config"
"quesma/schema"
"slices"
)

// HandleResolve combines results from both schema.Registry (ClickHouse) and Elasticsearch,
// This endpoint is used in Kibana/OSD when creating Data Views/Index Patterns.
func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfiguration) (elasticsearch.Sources, error) {
// In the _resolve endpoint we want to combine the results from both schema.Registry and Elasticsearch
sourcesToShow := &elasticsearch.Sources{}

normalizedPattern := elasticsearch.NormalizePattern(pattern)
normalizedPattern := elasticsearch.NormalizePattern(pattern) // changes `_all` to `*`

// Optimization: if it's not a pattern, let's try avoiding querying Elasticsearch - let's first try
// finding that index in schema.Registry:
if !elasticsearch.IsIndexPattern(normalizedPattern) {
if foundSchema, found := sr.FindSchema(schema.TableName(normalizedPattern)); found {
if !foundSchema.ExistsInDataSource {
// index configured by the user, but not present in the data source
return elasticsearch.Sources{}, nil
}

return elasticsearch.Sources{
Indices: []elasticsearch.Index{},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{
Name: normalizedPattern,
BackingIndices: []string{normalizedPattern},
TimestampField: `@timestamp`,
},
},
}, nil
}

// ...index not found in schema.Registry (meaning the user did not configure it), but it could exist in Elastic
}

// Combine results from both schema.Registry and Elasticsearch:

// todo avoid creating new instances all the time
sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
sourcesFromElasticsearch, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
if err != nil {
return elasticsearch.Sources{}, err
logger.Warn().Msgf("Failed fetching resolving sources matching `%s`: %v", pattern, err)
} else {
sourcesToShow = &sourcesFromElasticsearch
}

combineSourcesFromElasticWithRegistry(&sourcesFromElastic, sr.AllSchemas(), normalizedPattern)
return sourcesFromElastic, nil
}
tablesFromClickHouse := getMatchingClickHouseTables(sr.AllSchemas(), normalizedPattern)

func combineSourcesFromElasticWithRegistry(sourcesFromElastic *elasticsearch.Sources, schemas map[schema.TableName]schema.Schema, normalizedPattern string) {
sourcesFromElastic.Indices =
slices.DeleteFunc(sourcesFromElastic.Indices, func(i elasticsearch.Index) bool {
_, exists := schemas[schema.TableName(i.Name)]
return exists
})
sourcesFromElastic.DataStreams = slices.DeleteFunc(sourcesFromElastic.DataStreams, func(i elasticsearch.DataStream) bool {
_, exists := schemas[schema.TableName(i.Name)]
return exists
})
addClickHouseTablesToSourcesFromElastic(sourcesToShow, tablesFromClickHouse)
return *sourcesToShow, nil
}

func getMatchingClickHouseTables(schemas map[schema.TableName]schema.Schema, normalizedPattern string) (tables []string) {
for name, currentSchema := range schemas {
indexName := name.AsString()

if config.MatchName(normalizedPattern, indexName) && currentSchema.ExistsInDataSource {
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
Name: indexName,
BackingIndices: []string{indexName},
TimestampField: `@timestamp`,
})
tables = append(tables, indexName)
}
}
return tables
}

func addClickHouseTablesToSourcesFromElastic(sourcesFromElastic *elasticsearch.Sources, chTableNames []string) {
for _, name := range chTableNames { // Quesma presents CH tables as Elasticsearch Data Streams.
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
Name: name,
BackingIndices: []string{name},
TimestampField: `@timestamp`,
})
}
}
7 changes: 4 additions & 3 deletions quesma/quesma/functionality/resolve/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {
},
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index3"}},
Indices: []elasticsearch.Index{{Name: "index1"}, {Name: "index3"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{},
},
Expand All @@ -81,9 +81,10 @@ func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {
},
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index4"}},
Indices: []elasticsearch.Index{{Name: "index1"}, {Name: "index4"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{Name: "index3"},
{Name: "index5"},
{Name: "index1", BackingIndices: []string{"index1"}, TimestampField: `@timestamp`},
{Name: "index2", BackingIndices: []string{"index2"}, TimestampField: `@timestamp`},
Expand All @@ -95,7 +96,7 @@ func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
combineSourcesFromElasticWithRegistry(&tt.sourcesFromElastic, tt.schemas, tt.normalizedPattern)
addClickHouseTablesToSourcesFromElastic(&tt.sourcesFromElastic, getMatchingClickHouseTables(tt.schemas, tt.normalizedPattern))
assert.ElementsMatchf(t, tt.sourcesFromElastic.Aliases, tt.expectedResult.Aliases, "Aliases don't match")
assert.ElementsMatchf(t, tt.sourcesFromElastic.Indices, tt.expectedResult.Indices, "Indices don't match")
assert.ElementsMatchf(t, tt.sourcesFromElastic.DataStreams, tt.expectedResult.DataStreams, "DataStreams don't match")
Expand Down

0 comments on commit d59a9f2

Please sign in to comment.