diff --git a/quesma/ingest/alter_table_test.go b/quesma/ingest/alter_table_test.go index dbd592375..658eaaaa6 100644 --- a/quesma/ingest/alter_table_test.go +++ b/quesma/ingest/alter_table_test.go @@ -49,7 +49,7 @@ func TestAlterTable(t *testing.T) { encodings := make(map[schema.FieldEncodingKey]schema.EncodedFieldName) - ip := NewIngestProcessor(fieldsMap, &config.QuesmaConfiguration{}) + ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) for i := range rowsToInsert { alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) assert.NoError(t, err) @@ -109,7 +109,7 @@ func TestAlterTableHeuristic(t *testing.T) { Cols: map[string]*clickhouse.Column{}, } fieldsMap := concurrent.NewMapWith(tableName, table) - ip := NewIngestProcessor(fieldsMap, &config.QuesmaConfiguration{}) + ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) rowsToInsert := make([]string, 0) previousRow := `` diff --git a/quesma/ingest/common_table_test.go b/quesma/ingest/common_table_test.go new file mode 100644 index 000000000..69027a7f8 --- /dev/null +++ b/quesma/ingest/common_table_test.go @@ -0,0 +1,255 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package ingest + +import ( + "context" + "encoding/json" + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "quesma/clickhouse" + "quesma/common_table" + "quesma/jsonprocessor" + "quesma/persistence" + "quesma/quesma/config" + "quesma/quesma/types" + "quesma/schema" + "testing" +) + +func TestIngestToCommonTable(t *testing.T) { + + tests := []struct { + name string + alreadyExistingColumns []*clickhouse.Column // list of columns that exists in the common table and virtual table + documents []types.JSON + expectedStatements []string + virtualTableColumns []string + }{ + { + name: "simple single insert", + documents: []types.JSON{ + {"foo": "bar"}, + }, + expectedStatements: []string{ + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`, + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}`, + }, + virtualTableColumns: []string{"@timestamp", "foo"}, + }, + { + name: "simple inserts", + documents: []types.JSON{ + {"foo": "bar"}, + {"foo": "baz"}, + }, + expectedStatements: []string{ + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`, + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}, {"__quesma_index_name":"test_index","foo":"baz"}`, + }, + virtualTableColumns: []string{"@timestamp", "foo"}, + }, + { + name: "simple inserts and new column", + documents: []types.JSON{ + {"foo": "bar"}, + {"foo": "baz"}, + {"foo": "1", "baz": "qux"}, + }, + expectedStatements: []string{ + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`, + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "baz" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "baz" 'quesmaMetadataV1:fieldName=baz'`, + + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}, {"__quesma_index_name":"test_index","foo":"baz"}, {"__quesma_index_name":"test_index","baz":"qux","foo":"1"} `, + }, + virtualTableColumns: []string{"@timestamp", "baz", "foo"}, + }, + { + name: "simple inserts, column exists, but not ingested", + alreadyExistingColumns: []*clickhouse.Column{ + {Name: "a", Type: clickhouse.BaseType{Name: "String"}}, + }, + documents: []types.JSON{ + {"foo": "bar"}, + {"foo": "baz"}, + {"foo": "1", "baz": "qux"}, + }, + expectedStatements: []string{ + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "foo" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "foo" 'quesmaMetadataV1:fieldName=foo'`, + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "baz" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "baz" 'quesmaMetadataV1:fieldName=baz'`, + + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","foo":"bar"}, {"__quesma_index_name":"test_index","foo":"baz"}, {"__quesma_index_name":"test_index","baz":"qux","foo":"1"} `, + }, + virtualTableColumns: []string{"@timestamp", "a", "baz", "foo"}, + }, + { + name: "ingest to existing column", + alreadyExistingColumns: []*clickhouse.Column{ + {Name: "a", Type: clickhouse.BaseType{Name: "String"}}, + }, + documents: []types.JSON{ + {"a": "bar"}, + }, + expectedStatements: []string{ + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","a":"bar"}`, + }, + virtualTableColumns: []string{"@timestamp", "a"}, + }, + { + name: "ingest to existing column and new column", + alreadyExistingColumns: []*clickhouse.Column{ + {Name: "a", Type: clickhouse.BaseType{Name: "String"}}, + }, + documents: []types.JSON{ + {"a": "bar", "b": "baz"}, + }, + expectedStatements: []string{ + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "b" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "b" 'quesmaMetadataV1:fieldName=b'`, + + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","a":"bar","b":"baz"}`, + }, + virtualTableColumns: []string{"@timestamp", "a", "b"}, + }, + { + name: "ingest to name with a dot", + alreadyExistingColumns: []*clickhouse.Column{}, + documents: []types.JSON{ + {"a.b": "c"}, + }, + expectedStatements: []string{ + `ALTER TABLE "quesma_common_table" ADD COLUMN IF NOT EXISTS "a_b" Nullable(String)`, + `ALTER TABLE "quesma_common_table" COMMENT COLUMN "a_b" 'quesmaMetadataV1:fieldName=a.b'`, + + `INSERT INTO "quesma_common_table" FORMAT JSONEachRow {"__quesma_index_name":"test_index","a_b":"c"}`, + }, + virtualTableColumns: []string{"@timestamp", "a_b"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + indexName := "test_index" + + quesmaConfig := &config.QuesmaConfiguration{ + IndexConfig: map[string]config.IndexConfiguration{ + indexName: { + UseCommonTable: true, + }, + }, + } + + tables := NewTableMap() + + quesmaCommonTable := &clickhouse.Table{ + Name: common_table.TableName, + Cols: map[string]*clickhouse.Column{ + "@timestmap": { + Name: "@timestamp", + Type: clickhouse.BaseType{Name: "DateTime64"}, + }, + common_table.IndexNameColumn: { + Name: common_table.IndexNameColumn, + Type: clickhouse.BaseType{Name: "String"}, + }, + clickhouse.AttributesValuesColumn: { + Name: clickhouse.AttributesValuesColumn, + Type: clickhouse.BaseType{Name: "Map(String, String)"}, + }, + clickhouse.AttributesMetadataColumn: { + Name: clickhouse.AttributesMetadataColumn, + Type: clickhouse.BaseType{Name: "Map(String, String)"}, + }, + }, + Config: NewDefaultCHConfig(), + Created: true, + } + + for _, col := range tt.alreadyExistingColumns { + quesmaCommonTable.Cols[col.Name] = col + } + + tables.Store(common_table.TableName, quesmaCommonTable) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + + virtualTableStorage := persistence.NewStaticJSONDatabase() + + tableDisco := clickhouse.NewTableDiscovery(quesmaConfig, db, virtualTableStorage) + schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, quesmaConfig, clickhouse.SchemaTypeAdapter{}) + + ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig) + ingest.chDb = db + ingest.virtualTableStorage = virtualTableStorage + ingest.schemaRegistry = schemaRegistry + + if len(tt.alreadyExistingColumns) > 0 { + + testTable := &clickhouse.Table{ + Name: indexName, + Cols: map[string]*clickhouse.Column{}, + Config: NewDefaultCHConfig(), + Created: true, + VirtualTable: true, + } + + for _, col := range tt.alreadyExistingColumns { + testTable.Cols[col.Name] = col + } + + tables.Store(indexName, testTable) + err = ingest.storeVirtualTable(testTable) + if err != nil { + t.Fatalf("error storing virtual table: %v", err) + } + } + + ctx := context.Background() + formatter := clickhouse.DefaultColumnNameFormatter() + + transformer := jsonprocessor.IngestTransformerFor(indexName, quesmaConfig) + + for _, stm := range tt.expectedStatements { + mock.ExpectExec(stm).WillReturnResult(sqlmock.NewResult(1, 1)) + } + + err = ingest.ProcessInsertQuery(ctx, indexName, tt.documents, transformer, formatter) + + if err != nil { + t.Fatalf("error processing insert query: %v", err) + } + + vTableAsJson, ok, err := virtualTableStorage.Get(indexName) + if err != nil { + t.Fatalf("error getting virtual table: %v", err) + } + if !ok { + t.Fatalf("virtual table not found") + } + + var vTable common_table.VirtualTable + + err = json.Unmarshal([]byte(vTableAsJson), &vTable) + if err != nil { + t.Fatalf("error unmarshalling virtual table: %v", err) + } + + var virtualTableColumn []string + for _, col := range vTable.Columns { + virtualTableColumn = append(virtualTableColumn, col.Name) + } + + assert.Equal(t, tt.virtualTableColumns, virtualTableColumn) + }) + } +} diff --git a/quesma/ingest/ingest_validator_test.go b/quesma/ingest/ingest_validator_test.go index f26885aa1..ccd2650fb 100644 --- a/quesma/ingest/ingest_validator_test.go +++ b/quesma/ingest/ingest_validator_test.go @@ -166,7 +166,7 @@ func TestIngestValidation(t *testing.T) { }) for i := range inputJson { db, mock := util.InitSqlMockWithPrettyPrint(t, true) - ip := NewIngestProcessorEmpty() + ip := newIngestProcessorEmpty() ip.chDb = db ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap) diff --git a/quesma/ingest/insert_test.go b/quesma/ingest/insert_test.go index f4117ff6b..bde445ee5 100644 --- a/quesma/ingest/insert_test.go +++ b/quesma/ingest/insert_test.go @@ -152,13 +152,13 @@ func ingestProcessorsNonEmpty(cfg *clickhouse.ChTableConfig) []ingestProcessorHe }, Created: created, }) - lms = append(lms, ingestProcessorHelper{NewIngestProcessor(full, &config.QuesmaConfiguration{}), created}) + lms = append(lms, ingestProcessorHelper{newIngestProcessorWithEmptyTableMap(full, &config.QuesmaConfiguration{}), created}) } return lms } func ingestProcessors(config *clickhouse.ChTableConfig) []ingestProcessorHelper { - ingestProcessor := NewIngestProcessorEmpty() + ingestProcessor := newIngestProcessorEmpty() ingestProcessor.schemaRegistry = schema.StaticRegistry{} return append([]ingestProcessorHelper{{ingestProcessor, false}}, ingestProcessorsNonEmpty(config)...) } @@ -282,7 +282,7 @@ func TestInsertVeryBigIntegers(t *testing.T) { for i, bigInt := range bigInts { t.Run("big integer schema field: "+bigInt, func(t *testing.T) { db, mock := util.InitSqlMockWithPrettyPrint(t, true) - lm := NewIngestProcessorEmpty() + lm := newIngestProcessorEmpty() lm.chDb = db defer db.Close() @@ -308,7 +308,7 @@ func TestInsertVeryBigIntegers(t *testing.T) { for i, bigInt := range bigInts { t.Run("big integer attribute field: "+bigInt, func(t *testing.T) { db, mock := util.InitSqlMockWithPrettyPrint(t, true) - lm := NewIngestProcessorEmpty() + lm := newIngestProcessorEmpty() lm.chDb = db lm.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMapNoSchemaFields) defer db.Close() @@ -414,7 +414,7 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) { } schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema - ingest := NewIngestProcessor(tables, quesmaConfig) + ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig) ingest.chDb = db ingest.virtualTableStorage = virtualTableStorage ingest.schemaRegistry = schemaRegistry diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index 7063ab744..9625f6f8b 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -16,6 +16,7 @@ import ( "quesma/index" "quesma/jsonprocessor" "quesma/logger" + "quesma/model" "quesma/persistence" "quesma/quesma/config" "quesma/quesma/recovery" @@ -24,6 +25,8 @@ import ( "quesma/stats" "quesma/telemetry" "quesma/util" + "slices" + "sort" "strings" "sync" "sync/atomic" @@ -698,7 +701,13 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str indexConf, ok := lm.cfg.IndexConfig[tableName] if ok && indexConf.UseCommonTable { - err := lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, true) + // we have clone the data, because we want to process it twice + var clonedJsonData []types.JSON + for _, jsonValue := range jsonData { + clonedJsonData = append(clonedJsonData, jsonValue.Clone()) + } + + err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true) if err != nil { // we ignore an error here, because we want to process the data and don't lose it logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err) @@ -834,11 +843,28 @@ func (ip *IngestProcessor) storeVirtualTable(table *chLib.Table) error { table.Comment = "Virtual table. Version: " + now.Format(time.RFC3339) + var columnsToStore []string + for _, col := range table.Cols { + // We don't want to store attributes columns in the virtual table + if col.Name == chLib.AttributesValuesColumn || col.Name == chLib.AttributesMetadataColumn { + continue + } + columnsToStore = append(columnsToStore, col.Name) + } + + // We always want to store timestamp in the virtual table + // if it's not already there + if !slices.Contains(columnsToStore, model.TimestampFieldName) { + columnsToStore = append(columnsToStore, model.TimestampFieldName) + } + + sort.Strings(columnsToStore) + var columns []common_table.VirtualTableColumn - for _, col := range table.Cols { + for _, col := range columnsToStore { columns = append(columns, common_table.VirtualTableColumn{ - Name: col.Name, + Name: col, }) } @@ -882,29 +908,11 @@ func (ip *IngestProcessor) Ping() error { return ip.chDb.Ping() } -func NewEmptyIngestProcessor(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase) *IngestProcessor { +func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase) *IngestProcessor { ctx, cancel := context.WithCancel(context.Background()) return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage} } -func NewIngestProcessor(tables *TableMap, cfg *config.QuesmaConfiguration) *IngestProcessor { - var tableDefinitions = atomic.Pointer[TableMap]{} - tableDefinitions.Store(tables) - return &IngestProcessor{chDb: nil, tableDiscovery: chLib.NewTableDiscoveryWith(cfg, nil, *tables), - cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), - ingestFieldStatistics: make(IngestFieldStatistics), - virtualTableStorage: persistence.NewStaticJSONDatabase(), - } -} - -func NewIngestProcessorEmpty() *IngestProcessor { - var tableDefinitions = atomic.Pointer[TableMap]{} - tableDefinitions.Store(NewTableMap()) - cfg := &config.QuesmaConfiguration{} - return &IngestProcessor{tableDiscovery: chLib.NewTableDiscovery(cfg, nil, persistence.NewStaticJSONDatabase()), cfg: cfg, - phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)} -} - func NewOnlySchemaFieldsCHConfig() *chLib.ChTableConfig { return &chLib.ChTableConfig{ HasTimestamp: true, diff --git a/quesma/ingest/processor_test.go b/quesma/ingest/processor_test.go index bccf649e2..3c16c4f61 100644 --- a/quesma/ingest/processor_test.go +++ b/quesma/ingest/processor_test.go @@ -7,9 +7,11 @@ import ( "encoding/json" "quesma/clickhouse" "quesma/concurrent" + "quesma/persistence" "quesma/quesma/config" "quesma/quesma/types" "quesma/schema" + "quesma/telemetry" "strings" "sync/atomic" "testing" @@ -17,6 +19,24 @@ import ( "github.com/stretchr/testify/assert" ) +func newIngestProcessorWithEmptyTableMap(tables *TableMap, cfg *config.QuesmaConfiguration) *IngestProcessor { + var tableDefinitions = atomic.Pointer[TableMap]{} + tableDefinitions.Store(tables) + return &IngestProcessor{chDb: nil, tableDiscovery: clickhouse.NewTableDiscoveryWith(cfg, nil, *tables), + cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), + ingestFieldStatistics: make(IngestFieldStatistics), + virtualTableStorage: persistence.NewStaticJSONDatabase(), + } +} + +func newIngestProcessorEmpty() *IngestProcessor { + var tableDefinitions = atomic.Pointer[TableMap]{} + tableDefinitions.Store(NewTableMap()) + cfg := &config.QuesmaConfiguration{} + return &IngestProcessor{tableDiscovery: clickhouse.NewTableDiscovery(cfg, nil, persistence.NewStaticJSONDatabase()), cfg: cfg, + phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)} +} + var hasOthersConfig = &clickhouse.ChTableConfig{ HasTimestamp: false, TimestampDefaultsNow: false, @@ -52,7 +72,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) { tableName, exists := fieldsMap.Load("tableName") assert.True(t, exists) f := func(t1, t2 TableMap) { - ip := NewIngestProcessor(fieldsMap, &config.QuesmaConfiguration{}) + ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings) assert.NoError(t, err) j, err := generateInsertJson(nonSchemaFields, onlySchemaFields) @@ -121,7 +141,7 @@ func TestAddTimestamp(t *testing.T) { PreferCastingToOthers: false, } nameFormatter := clickhouse.DefaultColumnNameFormatter() - ip := NewIngestProcessorEmpty() + ip := newIngestProcessorEmpty() ip.schemaRegistry = schema.StaticRegistry{} jsonData := types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`) columnsFromJson, columnsFromSchema := ip.buildCreateTableQueryNoOurFields(context.Background(), "tableName", jsonData, tableConfig, nameFormatter) @@ -814,7 +834,7 @@ func TestLogManager_GetTable(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var tableDefinitions = atomic.Pointer[TableMap]{} tableDefinitions.Store(&tt.predefinedTables) - ip := NewIngestProcessor(&tt.predefinedTables, &config.QuesmaConfiguration{}) + ip := newIngestProcessorWithEmptyTableMap(&tt.predefinedTables, &config.QuesmaConfiguration{}) assert.Equalf(t, tt.found, ip.FindTable(tt.tableNamePattern) != nil, "GetTable(%v)", tt.tableNamePattern) }) } diff --git a/quesma/main.go b/quesma/main.go index 4ae9cd724..8eaab1fee 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -99,7 +99,7 @@ func main() { common_table.EnsureCommonTableExists(connectionPool) } - ingestProcessor = ingest.NewEmptyIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage) + ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage) } else { logger.Info().Msg("Ingest processor is disabled.") }