Skip to content

Commit

Permalink
Common table - missing test - part#1 ingest (#805)
Browse files Browse the repository at this point in the history
This PR 
1. Add tests.
2. Fix bugs found by these tests.

---------

Signed-off-by: Rafał Strzaliński <[email protected]>
Co-authored-by: Przemysław Hejman <[email protected]>
  • Loading branch information
nablaone and mieciu authored Oct 7, 2024
1 parent 10f8aa0 commit d594e43
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 34 deletions.
4 changes: 2 additions & 2 deletions quesma/ingest/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAlterTable(t *testing.T) {

encodings := make(map[schema.FieldEncodingKey]schema.EncodedFieldName)

ip := NewIngestProcessor(fieldsMap, &config.QuesmaConfiguration{})
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
for i := range rowsToInsert {
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestAlterTableHeuristic(t *testing.T) {
Cols: map[string]*clickhouse.Column{},
}
fieldsMap := concurrent.NewMapWith(tableName, table)
ip := NewIngestProcessor(fieldsMap, &config.QuesmaConfiguration{})
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})

rowsToInsert := make([]string, 0)
previousRow := ``
Expand Down
255 changes: 255 additions & 0 deletions quesma/ingest/common_table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package ingest

import (
"context"
"encoding/json"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
"quesma/common_table"
"quesma/jsonprocessor"
"quesma/persistence"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/schema"
"testing"
)

func TestIngestToCommonTable(t *testing.T) {

tests := []struct {
name string
alreadyExistingColumns []*clickhouse.Column // list of columns that exists in the common table and virtual table
documents []types.JSON
expectedStatements []string
virtualTableColumns []string
}{
{
name: "simple single insert",
documents: []types.JSON{
{"foo": "bar"},
},
expectedStatements: []string{
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`,
`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}`,
},
virtualTableColumns: []string{"@timestamp", "foo"},
},
{
name: "simple inserts",
documents: []types.JSON{
{"foo": "bar"},
{"foo": "baz"},
},
expectedStatements: []string{
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`,
`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}, {"__quesma_index_name":"test_index","foo":"baz"}`,
},
virtualTableColumns: []string{"@timestamp", "foo"},
},
{
name: "simple inserts and new column",
documents: []types.JSON{
{"foo": "bar"},
{"foo": "baz"},
{"foo": "1", "baz": "qux"},
},
expectedStatements: []string{
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`,
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "baz" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "baz" 'quesmaMetadataV1:fieldName=baz'`,

`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}, {"__quesma_index_name":"test_index","foo":"baz"}, {"__quesma_index_name":"test_index","baz":"qux","foo":"1"} `,
},
virtualTableColumns: []string{"@timestamp", "baz", "foo"},
},
{
name: "simple inserts, column exists, but not ingested",
alreadyExistingColumns: []*clickhouse.Column{
{Name: "a", Type: clickhouse.BaseType{Name: "String"}},
},
documents: []types.JSON{
{"foo": "bar"},
{"foo": "baz"},
{"foo": "1", "baz": "qux"},
},
expectedStatements: []string{
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`,
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "baz" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "baz" 'quesmaMetadataV1:fieldName=baz'`,

`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}, {"__quesma_index_name":"test_index","foo":"baz"}, {"__quesma_index_name":"test_index","baz":"qux","foo":"1"} `,
},
virtualTableColumns: []string{"@timestamp", "a", "baz", "foo"},
},
{
name: "ingest to existing column",
alreadyExistingColumns: []*clickhouse.Column{
{Name: "a", Type: clickhouse.BaseType{Name: "String"}},
},
documents: []types.JSON{
{"a": "bar"},
},
expectedStatements: []string{
`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","a":"bar"}`,
},
virtualTableColumns: []string{"@timestamp", "a"},
},
{
name: "ingest to existing column and new column",
alreadyExistingColumns: []*clickhouse.Column{
{Name: "a", Type: clickhouse.BaseType{Name: "String"}},
},
documents: []types.JSON{
{"a": "bar", "b": "baz"},
},
expectedStatements: []string{
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "b" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "b" 'quesmaMetadataV1:fieldName=b'`,

`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","a":"bar","b":"baz"}`,
},
virtualTableColumns: []string{"@timestamp", "a", "b"},
},
{
name: "ingest to name with a dot",
alreadyExistingColumns: []*clickhouse.Column{},
documents: []types.JSON{
{"a.b": "c"},
},
expectedStatements: []string{
`ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "a_b" Nullable(String)`,
`ALTER TABLE "quesma_common_table" COMMENT COLUMN "a_b" 'quesmaMetadataV1:fieldName=a.b'`,

`INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","a_b":"c"}`,
},
virtualTableColumns: []string{"@timestamp", "a_b"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

indexName := "test_index"

quesmaConfig := &config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{
indexName: {
UseCommonTable: true,
},
},
}

tables := NewTableMap()

quesmaCommonTable := &clickhouse.Table{
Name: common_table.TableName,
Cols: map[string]*clickhouse.Column{
"@timestmap": {
Name: "@timestamp",
Type: clickhouse.BaseType{Name: "DateTime64"},
},
common_table.IndexNameColumn: {
Name: common_table.IndexNameColumn,
Type: clickhouse.BaseType{Name: "String"},
},
clickhouse.AttributesValuesColumn: {
Name: clickhouse.AttributesValuesColumn,
Type: clickhouse.BaseType{Name: "Map(String, String)"},
},
clickhouse.AttributesMetadataColumn: {
Name: clickhouse.AttributesMetadataColumn,
Type: clickhouse.BaseType{Name: "Map(String, String)"},
},
},
Config: NewDefaultCHConfig(),
Created: true,
}

for _, col := range tt.alreadyExistingColumns {
quesmaCommonTable.Cols[col.Name] = col
}

tables.Store(common_table.TableName, quesmaCommonTable)

db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}

virtualTableStorage := persistence.NewStaticJSONDatabase()

tableDisco := clickhouse.NewTableDiscovery(quesmaConfig, db, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, quesmaConfig, clickhouse.SchemaTypeAdapter{})

ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
ingest.chDb = db
ingest.virtualTableStorage = virtualTableStorage
ingest.schemaRegistry = schemaRegistry

if len(tt.alreadyExistingColumns) > 0 {

testTable := &clickhouse.Table{
Name: indexName,
Cols: map[string]*clickhouse.Column{},
Config: NewDefaultCHConfig(),
Created: true,
VirtualTable: true,
}

for _, col := range tt.alreadyExistingColumns {
testTable.Cols[col.Name] = col
}

tables.Store(indexName, testTable)
err = ingest.storeVirtualTable(testTable)
if err != nil {
t.Fatalf("error storing virtual table: %v", err)
}
}

ctx := context.Background()
formatter := clickhouse.DefaultColumnNameFormatter()

transformer := jsonprocessor.IngestTransformerFor(indexName, quesmaConfig)

for _, stm := range tt.expectedStatements {
mock.ExpectExec(stm).WillReturnResult(sqlmock.NewResult(1, 1))
}

err = ingest.ProcessInsertQuery(ctx, indexName, tt.documents, transformer, formatter)

if err != nil {
t.Fatalf("error processing insert query: %v", err)
}

vTableAsJson, ok, err := virtualTableStorage.Get(indexName)
if err != nil {
t.Fatalf("error getting virtual table: %v", err)
}
if !ok {
t.Fatalf("virtual table not found")
}

var vTable common_table.VirtualTable

err = json.Unmarshal([]byte(vTableAsJson), &vTable)
if err != nil {
t.Fatalf("error unmarshalling virtual table: %v", err)
}

var virtualTableColumn []string
for _, col := range vTable.Columns {
virtualTableColumn = append(virtualTableColumn, col.Name)
}

assert.Equal(t, tt.virtualTableColumns, virtualTableColumn)
})
}
}
2 changes: 1 addition & 1 deletion quesma/ingest/ingest_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestIngestValidation(t *testing.T) {
})
for i := range inputJson {
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
ip := NewIngestProcessorEmpty()
ip := newIngestProcessorEmpty()
ip.chDb = db
ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap)

Expand Down
10 changes: 5 additions & 5 deletions quesma/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ func ingestProcessorsNonEmpty(cfg *clickhouse.ChTableConfig) []ingestProcessorHe
},
Created: created,
})
lms = append(lms, ingestProcessorHelper{NewIngestProcessor(full, &config.QuesmaConfiguration{}), created})
lms = append(lms, ingestProcessorHelper{newIngestProcessorWithEmptyTableMap(full, &config.QuesmaConfiguration{}), created})
}
return lms
}

func ingestProcessors(config *clickhouse.ChTableConfig) []ingestProcessorHelper {
ingestProcessor := NewIngestProcessorEmpty()
ingestProcessor := newIngestProcessorEmpty()
ingestProcessor.schemaRegistry = schema.StaticRegistry{}
return append([]ingestProcessorHelper{{ingestProcessor, false}}, ingestProcessorsNonEmpty(config)...)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {
for i, bigInt := range bigInts {
t.Run("big integer schema field: "+bigInt, func(t *testing.T) {
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
lm := NewIngestProcessorEmpty()
lm := newIngestProcessorEmpty()
lm.chDb = db
defer db.Close()

Expand All @@ -308,7 +308,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {
for i, bigInt := range bigInts {
t.Run("big integer attribute field: "+bigInt, func(t *testing.T) {
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
lm := NewIngestProcessorEmpty()
lm := newIngestProcessorEmpty()
lm.chDb = db
lm.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMapNoSchemaFields)
defer db.Close()
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
}
schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema

ingest := NewIngestProcessor(tables, quesmaConfig)
ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
ingest.chDb = db
ingest.virtualTableStorage = virtualTableStorage
ingest.schemaRegistry = schemaRegistry
Expand Down
Loading

0 comments on commit d594e43

Please sign in to comment.