diff --git a/quesma/ingest/alter_table_test.go b/quesma/ingest/alter_table_test.go index 658eaaaa6..fee88f80c 100644 --- a/quesma/ingest/alter_table_test.go +++ b/quesma/ingest/alter_table_test.go @@ -51,7 +51,7 @@ func TestAlterTable(t *testing.T) { ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) for i := range rowsToInsert { - alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) + alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) assert.NoError(t, err) insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields) assert.Equal(t, expectedInsert[i], insert) @@ -130,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) { assert.Equal(t, int64(0), ip.ingestCounter) for i := range rowsToInsert { - _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) + _, _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings) assert.NoError(t, err) } assert.Equal(t, tc.expected, len(table.Cols)) diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index 1ad74e7cc..a5abdaa2b 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -286,6 +286,13 @@ func getAttributesByArrayName(arrayName string, return attributes } +type AlterDDL struct { + tableName string + columnName string + columnType string + comment string +} + // This function generates ALTER TABLE commands for adding new columns // to the table based on the attributesMap and the table name // AttributesMap contains the attributes that are not part of the schema @@ -295,12 +302,12 @@ func (ip *IngestProcessor) generateNewColumns( attrsMap map[string][]interface{}, table *chLib.Table, alteredAttributesIndexes []int, - encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string { + encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL) { var alterCmd []string + alterDDLMap := make(map[string]AlterDDL) attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap) attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap) var deleteIndexes []int - reverseMap := reverseFieldEncoding(encodings, table.Name) // HACK Alert: @@ -339,6 +346,7 @@ func (ip *IngestProcessor) generateNewColumns( alterCmd = append(alterCmd, alterTable) alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment) + alterDDLMap[attrKeys[i]] = AlterDDL{tableName: table.Name, columnName: attrKeys[i], columnType: columnType, comment: comment} alterCmd = append(alterCmd, alterColumn) deleteIndexes = append(deleteIndexes, i) @@ -358,7 +366,7 @@ func (ip *IngestProcessor) generateNewColumns( attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...) attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...) } - return alterCmd + return alterCmd, alterDDLMap } // This struct contains the information about the columns that aren't part of the schema @@ -496,39 +504,39 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table, data types.JSON, inValidJson types.JSON, config *chLib.ChTableConfig, - encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) { + encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL, types.JSON, []NonSchemaField, error) { jsonAsBytesSlice, err := json.Marshal(data) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } // we find all non-schema fields jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice)) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if len(config.Attributes) == 0 { - return nil, jsonMap, nil, nil + return nil, nil, jsonMap, nil, nil } schemaFieldsJson, err := json.Marshal(jsonMap) if err != nil { - return nil, jsonMap, nil, err + return nil, nil, jsonMap, nil, err } mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t) if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js' - return nil, jsonMap, nil, nil + return nil, nil, jsonMap, nil, nil } // check attributes precondition if len(config.Attributes) <= 0 { - return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff) + return nil, nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff) } attrsMap, _ := BuildAttrsMap(mDiff, config) @@ -538,9 +546,10 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table, // we only want to add fields that are not part of the schema e.g we don't // have columns for them var alterCmd []string + alterDDLMap := make(map[string]AlterDDL) atomic.AddInt64(&ip.ingestCounter, 1) if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok { - alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings) + alterCmd, alterDDLMap = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings) } // If there are some invalid fields, we need to add them to the attributes map // to not lose them and be able to store them later by @@ -551,12 +560,12 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table, nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } onlySchemaFields := RemoveNonSchemaFields(jsonMap, table) - return alterCmd, onlySchemaFields, nonSchemaFields, nil + return alterCmd, alterDDLMap, onlySchemaFields, nonSchemaFields, nil } func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) { @@ -598,7 +607,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema. func (ip *IngestProcessor) processInsertQuery(ctx context.Context, tableName string, jsonData []types.JSON, transformer jsonprocessor.IngestTransformer, - tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) { + tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, map[string]AlterDDL, error) { // this is pre ingest transformer // here we transform the data before it's structure evaluation and insertion // @@ -607,7 +616,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, for _, jsonValue := range jsonData { result, err := preIngestTransformer.Transform(jsonValue) if err != nil { - return nil, fmt.Errorf("error while rewriting json: %v", err) + return nil, nil, fmt.Errorf("error while rewriting json: %v", err) } processed = append(processed, result) } @@ -652,7 +661,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err) - return nil, err + return nil, nil, err } // Set pointer to table after creating it table = ip.FindTable(tableName) @@ -662,26 +671,30 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, tableConfig = table.Config var jsonsReadyForInsertion []string var alterCmd []string + alterDDLMapGlobal := make(map[string]AlterDDL) var preprocessedJsons []types.JSON var invalidJsons []types.JSON preprocessedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, jsonData, transformer) if err != nil { - return nil, fmt.Errorf("error preprocessJsons: %v", err) + return nil, nil, fmt.Errorf("error preprocessJsons: %v", err) } for i, preprocessedJson := range preprocessedJsons { - alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson, + alter, alterDDLMap, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson, invalidJsons[i], tableConfig, encodings) if err != nil { - return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err) + return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err) } insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields) if err != nil { - return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) + return nil, nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) } alterCmd = append(alterCmd, alter...) + for key, value := range alterDDLMap { + alterDDLMapGlobal[key] = value + } if err != nil { - return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) + return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) } jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson) } @@ -689,7 +702,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, insertValues := strings.Join(jsonsReadyForInsertion, ", ") insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues) - return generateSqlStatements(createTableCmd, alterCmd, insert), nil + return generateSqlStatements(createTableCmd, alterCmd, insert), alterDDLMapGlobal, nil } func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string, @@ -708,18 +721,19 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str clonedJsonData = append(clonedJsonData, jsonValue.Clone()) } - err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true) + err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true, nil, tableName) if err != nil { // we ignore an error here, because we want to process the data and don't lose it logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err) } - + sourceIndexSchema := findSchemaPointer(lm.schemaRegistry, tableName) + sourceIndex := tableName pipeline := jsonprocessor.IngestTransformerPipeline{} pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName}) pipeline = append(pipeline, transformer) tableName = common_table.TableName - err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false) + err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false, sourceIndexSchema, sourceIndex) if err != nil { return fmt.Errorf("error processing insert query to a common table: %w", err) } @@ -727,20 +741,41 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str return nil } - return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false) + return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false, nil, tableName) } func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string, jsonData []types.JSON, transformer jsonprocessor.IngestTransformer, - tableFormatter TableColumNameFormatter, isVirtualTable bool) error { - statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable) + tableFormatter TableColumNameFormatter, isVirtualTable bool, sourceIndexSchema *schema.Schema, sourceIndex string) error { + + statements, alterDDLMap, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable) if err != nil { return err } var logVirtualTableDDL bool // maybe this should be a part of the config or sth + // TODO that's a hack, we add columns to quesma-common-table that + // came from mappings instead of ingest + if sourceIndexSchema != nil { + if ip.cfg.IndexConfig[sourceIndex].UseCommonTable && len(alterDDLMap) > 0 { + var columnsFromDynamicMapping []string + for _, field := range sourceIndexSchema.Fields { + if _, ok := alterDDLMap[field.InternalPropertyName.AsString()]; !ok { + if field.Origin == schema.FieldSourceMapping { + columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", tableName, field.InternalPropertyName, field.InternalPropertyType)) + metadata := comment_metadata.NewCommentMetadata() + metadata.Values[comment_metadata.ElasticFieldName] = field.PropertyName.AsString() + comment := metadata.Marshall() + columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", tableName, field.InternalPropertyName, comment)) + } + } + } + statements = append(columnsFromDynamicMapping, statements...) + } + } + if isVirtualTable && logVirtualTableDDL { for _, statement := range statements { if strings.HasPrefix(statement, "ALTER") || strings.HasPrefix(statement, "CREATE") { diff --git a/quesma/ingest/processor_test.go b/quesma/ingest/processor_test.go index 6f69ce8cc..b29796f1a 100644 --- a/quesma/ingest/processor_test.go +++ b/quesma/ingest/processor_test.go @@ -72,7 +72,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) { assert.True(t, exists) f := func(t1, t2 TableMap) { ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{}) - alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings) + alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings) assert.NoError(t, err) j, err := generateInsertJson(nonSchemaFields, onlySchemaFields) assert.NoError(t, err) diff --git a/quesma/schema/registry.go b/quesma/schema/registry.go index a61b01c2f..1567dd293 100644 --- a/quesma/schema/registry.go +++ b/quesma/schema/registry.go @@ -117,7 +117,7 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string continue } - fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType} + fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType, Origin: FieldSourceMapping} } } @@ -246,7 +246,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword} } } else { - fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type} + fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type, Origin: existing.Origin} } } } diff --git a/quesma/schema/registry_test.go b/quesma/schema/registry_test.go index 5b3d2dfcc..94c9e8831 100644 --- a/quesma/schema/registry_test.go +++ b/quesma/schema/registry_test.go @@ -319,7 +319,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}, - "new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}}, + "new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText, Origin: schema.FieldSourceMapping}}, true, "") resultSchema, resultFound = s.FindSchema(schema.TableName(tableName)) assert.True(t, resultFound, "schema not found") diff --git a/quesma/schema/schema.go b/quesma/schema/schema.go index 87c4ad160..749df3f66 100644 --- a/quesma/schema/schema.go +++ b/quesma/schema/schema.go @@ -6,6 +6,16 @@ import ( "strings" ) +// FieldSource is an enum that represents the source of a field in the schema +type FieldSource int + +const ( + FieldSourceIngest FieldSource = iota + FieldSourceMapping + FieldSourceAutoDiscovery + FieldSourceStaticConfiguration +) + type ( Schema struct { Fields map[FieldName]Field @@ -23,6 +33,7 @@ type ( InternalPropertyName FieldName InternalPropertyType string Type QuesmaType + Origin FieldSource } TableName string FieldName string