Skip to content

Commit

Permalink
Fix incorrect field encodings for fields from mappings (but missing f…
Browse files Browse the repository at this point in the history
…rom JSON) (#1050)

In the `CREATE TABLE`, Quesma includes fields we got from `PUT
/:index/_mapping` (but are missing from the JSON). This code path
incorrectly computed field encodings - used the encoded field name in
the comment instead of the field name before encoding.

Fix the issue by using the `reverseMap` (field name before encoding) in
that code path.

Fixes #1045
  • Loading branch information
avelanarius authored Nov 29, 2024
1 parent b5628e2 commit 15f9343
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 7 deletions.
6 changes: 6 additions & 0 deletions ci/it/configs/quesma-ingest.yml.template
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ processors:
nested_test:
target:
- my-clickhouse-instance
encodings_test:
target:
- my-clickhouse-instance
"*":
target:
- my-minimal-elasticsearch
Expand Down Expand Up @@ -167,6 +170,9 @@ processors:
nested_test:
target:
- my-clickhouse-instance
encodings_test:
target:
- my-clickhouse-instance
"*":
target:
- my-minimal-elasticsearch
Expand Down
21 changes: 21 additions & 0 deletions ci/it/testcases/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,27 @@ func (tc *IntegrationTestcaseBase) FetchClickHouseColumns(ctx context.Context, t
return result, nil
}

func (tc *IntegrationTestcaseBase) FetchClickHouseComments(ctx context.Context, tableName string) (map[string]string, error) {
rows, err := tc.ExecuteClickHouseQuery(ctx, fmt.Sprintf("SELECT name, comment FROM system.columns WHERE table = '%s'", tableName))
if err != nil {
return nil, err
}
defer rows.Close()

result := make(map[string]string)
for rows.Next() {
var name, comment string
if err := rows.Scan(&name, &comment); err != nil {
return nil, err
}
result[name] = comment
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}

func (tc *IntegrationTestcaseBase) RequestToQuesma(ctx context.Context, t *testing.T, method, uri string, requestBody []byte) (*http.Response, []byte) {
endpoint := tc.getQuesmaEndpoint()
resp, err := tc.doRequest(ctx, method, endpoint+uri, requestBody, nil)
Expand Down
40 changes: 40 additions & 0 deletions ci/it/testcases/test_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (a *IngestTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test kibana_sample_data_ecommerce ingest to ClickHouse (with PUT mapping)", func(t *testing.T) { a.testKibanaSampleEcommerceIngestWithMappingToClickHouse(ctx, t) })
t.Run("test ignored fields", func(t *testing.T) { a.testIgnoredFields(ctx, t) })
t.Run("test nested fields", func(t *testing.T) { a.testNestedFields(ctx, t) })
t.Run("test field encodings (mappings bug)", func(t *testing.T) { a.testFieldEncodingsMappingsBug(ctx, t) })
return nil
}

Expand Down Expand Up @@ -608,3 +609,42 @@ func (it *IngestTestcase) testNestedFields(ctx context.Context, t *testing.T) {

assert.False(t, rows.Next())
}

// Reproducer for issue #1045
func (a *IngestTestcase) testFieldEncodingsMappingsBug(ctx context.Context, t *testing.T) {
resp, _ := a.RequestToQuesma(ctx, t, "PUT", "/encodings_test", []byte(`
{
"mappings": {
"properties": {
"Field1": {
"type": "text"
},
"Field2": {
"type": "text"
}
}
},
"settings": {
"index": {}
}
}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)

resp, _ = a.RequestToQuesma(ctx, t, "POST", "/encodings_test/_doc", []byte(`
{
"Field1": "abc"
}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)

resp, _ = a.RequestToQuesma(ctx, t, "POST", "/encodings_test/_doc", []byte(`
{
"Field2": "cde"
}`))
assert.Equal(t, http.StatusOK, resp.StatusCode)

comments, err := a.FetchClickHouseComments(ctx, "encodings_test")
assert.NoError(t, err, "error fetching clickhouse comments")

assert.Equal(t, "quesmaMetadataV1:fieldName=Field1", comments["field1"])
assert.Equal(t, "quesmaMetadataV1:fieldName=Field2", comments["field2"])
}
10 changes: 5 additions & 5 deletions quesma/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
{"new_field": "bar"},
},
expectedStatements: []string{
`CREATE TABLE IF NOT EXISTS "test_index" ( "@timestamp" DateTime64(3) DEFAULT now64(), "attributes_values" Map(String,String), "attributes_metadata" Map(String,String), "new_field" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=new_field', "schema_field" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=schema_field', ) ENGINE = MergeTree ORDER BY ("@timestamp") COMMENT 'created by Quesma'`,
`CREATE TABLE IF NOT EXISTS "test_index" ( "@timestamp" DateTime64(3) DEFAULT now64(), "attributes_values" Map(String,String), "attributes_metadata" Map(String,String), "new_field" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=new_field', "nested_field" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=nested.field', ) ENGINE = MergeTree ORDER BY ("@timestamp") COMMENT 'created by Quesma'`,
`INSERT INTO "test_index" FORMAT JSONEachRow {"new_field":"bar"}`,
},
},
Expand All @@ -402,9 +402,9 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
indexSchema := schema.Schema{
ExistsInDataSource: false,
Fields: map[schema.FieldName]schema.Field{
"schema_field": {
PropertyName: "schema_field",
InternalPropertyName: "schema_field",
"nested.field": {
PropertyName: "nested.field",
InternalPropertyName: "nested_field",
InternalPropertyType: "String",
Type: schema.QuesmaTypeKeyword},
},
Expand All @@ -431,7 +431,7 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
resolver.Decisions["test_index"] = decision

schemaRegistry.FieldEncodings = make(map[schema.FieldEncodingKey]schema.EncodedFieldName)
schemaRegistry.FieldEncodings[schema.FieldEncodingKey{TableName: indexName, FieldName: "schema_field"}] = "schema_field"
schemaRegistry.FieldEncodings[schema.FieldEncodingKey{TableName: indexName, FieldName: "nested.field"}] = "nested_field"

ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
ingest.chDb = db
Expand Down
6 changes: 4 additions & 2 deletions quesma/ingest/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func columnsToString(columnsFromJson []CreateTableEntry,
}

// There might be some columns from schema which were not present in the JSON
for propertyName, column := range columnsFromSchema {
for _, column := range columnsFromSchema {
if first {
first = false
} else {
result.WriteString(",\n")
}

propertyName := reverseMap[schema.EncodedFieldName(column.ClickHouseColumnName)].FieldName

columnMetadata := comment_metadata.NewCommentMetadata()
columnMetadata.Values[comment_metadata.ElasticFieldName] = string(propertyName)
columnMetadata.Values[comment_metadata.ElasticFieldName] = propertyName
comment := columnMetadata.Marshall()

result.WriteString(util.Indent(1))
Expand Down

0 comments on commit 15f9343

Please sign in to comment.