diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index 1ad74e7cc..b44b9b30d 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -641,6 +641,15 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, ignoredFields := ip.getIgnoredFields(tableName) columnsFromJson := JsonToColumns("", jsonData[0], 1, tableConfig, tableFormatter, ignoredFields) + + fieldOrigins := make(map[schema.FieldName]schema.FieldSource) + + for _, column := range columnsFromJson { + fieldOrigins[schema.FieldName(column.ClickHouseColumnName)] = schema.FieldSourceIngest + } + + ip.schemaRegistry.UpdateFieldsOrigins(schema.TableName(tableName), fieldOrigins) + // This comes externally from (configuration) // So we need to convert that separately columnsFromSchema := SchemaToColumns(findSchemaPointer(ip.schemaRegistry, tableName), tableFormatter, tableName, ip.schemaRegistry.GetFieldEncodings()) diff --git a/quesma/quesma/schema_transformer.go b/quesma/quesma/schema_transformer.go index bdba9b7d6..db7dc02a2 100644 --- a/quesma/quesma/schema_transformer.go +++ b/quesma/quesma/schema_transformer.go @@ -463,7 +463,10 @@ func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, quer cols := make([]string, 0, len(indexSchema.Fields)) for _, col := range indexSchema.Fields { - cols = append(cols, col.InternalPropertyName.AsString()) + // Take only fields that are ingested + if col.Origin == schema.FieldSourceIngest { + cols = append(cols, col.InternalPropertyName.AsString()) + } } sort.Strings(cols) @@ -491,7 +494,10 @@ func (s *SchemaCheckPass) applyFullTextField(indexSchema schema.Schema, query *m for _, field := range indexSchema.Fields { if field.Type.IsFullText() { - fullTextFields = append(fullTextFields, field.InternalPropertyName.AsString()) + // Take only fields that are ingested + if field.Origin == schema.FieldSourceIngest { + fullTextFields = append(fullTextFields, field.InternalPropertyName.AsString()) + } } } diff --git a/quesma/schema/registry.go b/quesma/schema/registry.go index a61b01c2f..9f7fb9c89 100644 --- a/quesma/schema/registry.go +++ b/quesma/schema/registry.go @@ -16,6 +16,7 @@ type ( Registry interface { AllSchemas() map[TableName]Schema FindSchema(name TableName) (Schema, bool) + UpdateFieldsOrigins(name TableName, fields map[FieldName]FieldSource) UpdateDynamicConfiguration(name TableName, table Table) UpdateFieldEncodings(encodings map[FieldEncodingKey]EncodedFieldName) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldName @@ -34,6 +35,8 @@ type ( dynamicConfiguration map[string]Table fieldEncodingsLock sync.RWMutex fieldEncodings map[FieldEncodingKey]EncodedFieldName + fieldOriginsLock sync.RWMutex + fieldOrigins map[TableName]map[FieldName]FieldSource } typeAdapter interface { Convert(string) (QuesmaType, bool) @@ -95,6 +98,7 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) { s.populateAliases(indexConfiguration, fields, aliases) s.removeIgnoredFields(indexConfiguration, fields, aliases) s.removeGeoPhysicalFields(fields) + s.populateFieldsOrigins(indexName, fields) if tableDefinition, ok := definitions[indexName]; ok { schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource, tableDefinition.DatabaseName) } else { @@ -117,7 +121,7 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string continue } - fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType} + fields[FieldName(column.Name)] = Field{PropertyName: FieldName(column.Name), InternalPropertyName: FieldName(column.Name), Type: columnType, Origin: FieldSourceMapping} } } @@ -246,7 +250,7 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: QuesmaTypeKeyword} } } else { - fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type} + fields[propertyName] = Field{PropertyName: propertyName, InternalPropertyName: FieldName(column.Name), InternalPropertyType: column.Type, Type: existing.Type, Origin: existing.Origin} } } } @@ -275,3 +279,25 @@ func (s *schemaRegistry) removeGeoPhysicalFields(fields map[FieldName]Field) { } } } + +func (s *schemaRegistry) populateFieldsOrigins(indexName string, fields map[FieldName]Field) { + s.fieldOriginsLock.RLock() + if fieldOrigins, ok := s.fieldOrigins[TableName(indexName)]; ok { + for fieldName, field := range fields { + if origin, ok := fieldOrigins[field.InternalPropertyName]; ok { + field.Origin = origin + fields[fieldName] = field + } + } + } + s.fieldOriginsLock.RUnlock() +} + +func (s *schemaRegistry) UpdateFieldsOrigins(name TableName, fields map[FieldName]FieldSource) { + s.fieldOriginsLock.Lock() + defer s.fieldOriginsLock.Unlock() + if s.fieldOrigins == nil { + s.fieldOrigins = make(map[TableName]map[FieldName]FieldSource) + } + s.fieldOrigins[name] = fields +} diff --git a/quesma/schema/registry_test.go b/quesma/schema/registry_test.go index 5b3d2dfcc..94c9e8831 100644 --- a/quesma/schema/registry_test.go +++ b/quesma/schema/registry_test.go @@ -319,7 +319,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}, - "new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}}, + "new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText, Origin: schema.FieldSourceMapping}}, true, "") resultSchema, resultFound = s.FindSchema(schema.TableName(tableName)) assert.True(t, resultFound, "schema not found") diff --git a/quesma/schema/schema.go b/quesma/schema/schema.go index 87c4ad160..a9ad15e33 100644 --- a/quesma/schema/schema.go +++ b/quesma/schema/schema.go @@ -6,6 +6,14 @@ import ( "strings" ) +// FieldSource is an enum that represents the source of a field in the schema +type FieldSource int + +const ( + FieldSourceIngest FieldSource = iota + FieldSourceMapping +) + type ( Schema struct { Fields map[FieldName]Field @@ -23,6 +31,7 @@ type ( InternalPropertyName FieldName InternalPropertyType string Type QuesmaType + Origin FieldSource } TableName string FieldName string diff --git a/quesma/schema/static_registry.go b/quesma/schema/static_registry.go index a4f77c05e..6c39edfb4 100644 --- a/quesma/schema/static_registry.go +++ b/quesma/schema/static_registry.go @@ -46,3 +46,7 @@ func (e *StaticRegistry) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldNa } return e.FieldEncodings } + +func (e *StaticRegistry) UpdateFieldsOrigins(name TableName, fields map[FieldName]FieldSource) { + +}