Skip to content

Commit

Permalink
Fixing quesma common table dynamic mapping case
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski committed Oct 11, 2024
1 parent 654b6c7 commit 1a8a78b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 34 deletions.
4 changes: 2 additions & 2 deletions quesma/ingest/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestAlterTable(t *testing.T) {

ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
for i := range rowsToInsert {
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.Equal(t, expectedInsert[i], insert)
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) {

assert.Equal(t, int64(0), ip.ingestCounter)
for i := range rowsToInsert {
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
_, _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
}
assert.Equal(t, tc.expected, len(table.Cols))
Expand Down
91 changes: 63 additions & 28 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ func getAttributesByArrayName(arrayName string,
return attributes
}

type AlterDDL struct {
tableName string
columnName string
columnType string
comment string
}

// This function generates ALTER TABLE commands for adding new columns
// to the table based on the attributesMap and the table name
// AttributesMap contains the attributes that are not part of the schema
Expand All @@ -295,12 +302,12 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap map[string][]interface{},
table *chLib.Table,
alteredAttributesIndexes []int,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL) {
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap)
attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap)
var deleteIndexes []int

reverseMap := reverseFieldEncoding(encodings, table.Name)

// HACK Alert:
Expand Down Expand Up @@ -339,6 +346,7 @@ func (ip *IngestProcessor) generateNewColumns(
alterCmd = append(alterCmd, alterTable)

alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment)
alterDDLMap[attrKeys[i]] = AlterDDL{tableName: table.Name, columnName: attrKeys[i], columnType: columnType, comment: comment}
alterCmd = append(alterCmd, alterColumn)

deleteIndexes = append(deleteIndexes, i)
Expand All @@ -358,7 +366,7 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...)
attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...)
}
return alterCmd
return alterCmd, alterDDLMap
}

// This struct contains the information about the columns that aren't part of the schema
Expand Down Expand Up @@ -496,39 +504,39 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
data types.JSON,
inValidJson types.JSON,
config *chLib.ChTableConfig,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL, types.JSON, []NonSchemaField, error) {

jsonAsBytesSlice, err := json.Marshal(data)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// we find all non-schema fields
jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

if len(config.Attributes) == 0 {
return nil, jsonMap, nil, nil
return nil, nil, jsonMap, nil, nil
}

schemaFieldsJson, err := json.Marshal(jsonMap)

if err != nil {
return nil, jsonMap, nil, err
return nil, nil, jsonMap, nil, err
}

mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t)

if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return nil, jsonMap, nil, nil
return nil, nil, jsonMap, nil, nil
}

// check attributes precondition
if len(config.Attributes) <= 0 {
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
return nil, nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
}
attrsMap, _ := BuildAttrsMap(mDiff, config)

Expand All @@ -538,9 +546,10 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
atomic.AddInt64(&ip.ingestCounter, 1)
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
alterCmd, alterDDLMap = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
}
// If there are some invalid fields, we need to add them to the attributes map
// to not lose them and be able to store them later by
Expand All @@ -551,12 +560,12 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)

if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

onlySchemaFields := RemoveNonSchemaFields(jsonMap, table)

return alterCmd, onlySchemaFields, nonSchemaFields, nil
return alterCmd, alterDDLMap, onlySchemaFields, nonSchemaFields, nil
}

func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
Expand Down Expand Up @@ -598,7 +607,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) {
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, map[string]AlterDDL, error) {
// this is pre ingest transformer
// here we transform the data before it's structure evaluation and insertion
//
Expand All @@ -607,7 +616,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
for _, jsonValue := range jsonData {
result, err := preIngestTransformer.Transform(jsonValue)
if err != nil {
return nil, fmt.Errorf("error while rewriting json: %v", err)
return nil, nil, fmt.Errorf("error while rewriting json: %v", err)
}
processed = append(processed, result)
}
Expand Down Expand Up @@ -652,7 +661,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err)
return nil, err
return nil, nil, err
}
// Set pointer to table after creating it
table = ip.FindTable(tableName)
Expand All @@ -662,34 +671,38 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableConfig = table.Config
var jsonsReadyForInsertion []string
var alterCmd []string
alterDDLMapGlobal := make(map[string]AlterDDL)
var preprocessedJsons []types.JSON
var invalidJsons []types.JSON
preprocessedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, jsonData, transformer)
if err != nil {
return nil, fmt.Errorf("error preprocessJsons: %v", err)
return nil, nil, fmt.Errorf("error preprocessJsons: %v", err)
}
for i, preprocessedJson := range preprocessedJsons {
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
alter, alterDDLMap, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
invalidJsons[i], tableConfig, encodings)

if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
}
insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
if err != nil {
return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return nil, nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
alterCmd = append(alterCmd, alter...)
for key, value := range alterDDLMap {
alterDDLMapGlobal[key] = value
}
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
}

insertValues := strings.Join(jsonsReadyForInsertion, ", ")
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues)

return generateSqlStatements(createTableCmd, alterCmd, insert), nil
return generateSqlStatements(createTableCmd, alterCmd, insert), alterDDLMapGlobal, nil
}

func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
Expand All @@ -708,39 +721,61 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
}

err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true, nil, tableName)
if err != nil {
// we ignore an error here, because we want to process the data and don't lose it
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
}

sourceIndexSchema := findSchemaPointer(lm.schemaRegistry, tableName)
sourceIndex := tableName
pipeline := jsonprocessor.IngestTransformerPipeline{}
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
pipeline = append(pipeline, transformer)
tableName = common_table.TableName

err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false, sourceIndexSchema, sourceIndex)
if err != nil {
return fmt.Errorf("error processing insert query to a common table: %w", err)
}

return nil
}

return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false)
return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false, nil, tableName)

}

func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, isVirtualTable bool) error {
statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
tableFormatter TableColumNameFormatter, isVirtualTable bool, sourceIndexSchema *schema.Schema, sourceIndex string) error {

statements, alterDDLMap, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
if err != nil {
return err
}

var logVirtualTableDDL bool // maybe this should be a part of the config or sth

// TODO that's a hack, we add columns to quesma-common-table that
// came from mappings instead of ingest
if sourceIndexSchema != nil {
if ip.cfg.IndexConfig[sourceIndex].UseCommonTable && len(alterDDLMap) > 0 {
var columnsFromDynamicMapping []string
for _, field := range sourceIndexSchema.Fields {
if _, ok := alterDDLMap[field.InternalPropertyName.AsString()]; !ok {
if field.Origin == schema.FieldSourceMapping {
columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", tableName, field.InternalPropertyName, field.InternalPropertyType))
metadata := comment_metadata.NewCommentMetadata()
metadata.Values[comment_metadata.ElasticFieldName] = field.PropertyName.AsString()
comment := metadata.Marshall()
columnsFromDynamicMapping = append(columnsFromDynamicMapping, fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", tableName, field.InternalPropertyName, comment))
}
}
}
statements = append(columnsFromDynamicMapping, statements...)
}
}

if isVirtualTable && logVirtualTableDDL {
for _, statement := range statements {
if strings.HasPrefix(statement, "ALTER") || strings.HasPrefix(statement, "CREATE") {
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {
assert.True(t, exists)
f := func(t1, t2 TableMap) {
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
assert.NoError(t, err)
j, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions quesma/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string
continue
}

fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType}
fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType, Origin: FieldSourceMapping}
}
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword}
}
} else {
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type}
fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type, Origin: existing.Origin}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/schema/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText, Origin: schema.FieldSourceMapping}},
true, "")
resultSchema, resultFound = s.FindSchema(schema.TableName(tableName))
assert.True(t, resultFound, "schema not found")
Expand Down
11 changes: 11 additions & 0 deletions quesma/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ import (
"strings"
)

// FieldSource is an enum that represents the source of a field in the schema
type FieldSource int

const (
FieldSourceIngest FieldSource = iota
FieldSourceMapping
FieldSourceAutoDiscovery
FieldSourceStaticConfiguration
)

type (
Schema struct {
Fields map[FieldName]Field
Expand All @@ -23,6 +33,7 @@ type (
InternalPropertyName FieldName
InternalPropertyType string
Type QuesmaType
Origin FieldSource
}
TableName string
FieldName string
Expand Down

0 comments on commit 1a8a78b

Please sign in to comment.