From 50792b18f0d5ec7744cb8431be844948a5ea350a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Hejman?= Date: Mon, 16 Sep 2024 14:46:50 +0200 Subject: [PATCH] Resolve table name with schema (#768) --- quesma/clickhouse/clickhouse.go | 1 + quesma/clickhouse/quesma_communicator.go | 6 ++++-- quesma/clickhouse/table_discovery.go | 5 +++-- quesma/model/expr.go | 9 ++++++++- quesma/model/expr_string_renderer.go | 14 ++++++++++++-- quesma/quesma/mappings_test.go | 2 +- quesma/quesma/schema_transformer.go | 3 ++- quesma/schema/hierarchical_schema_test.go | 2 +- quesma/schema/registry.go | 13 +++++++++---- quesma/schema/registry_test.go | 20 ++++++++++---------- quesma/schema/schema.go | 10 +++++++--- quesma/schema/schema_test.go | 16 ++++++++-------- 12 files changed, 66 insertions(+), 35 deletions(-) diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 3f4a175c6..9b07d58fe 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -115,6 +115,7 @@ func (lm *LogManager) Stop() { type discoveredTable struct { name string + databaseName string columnTypes map[string]string config config.IndexConfiguration comment string diff --git a/quesma/clickhouse/quesma_communicator.go b/quesma/clickhouse/quesma_communicator.go index 96e43f616..219152e9a 100644 --- a/quesma/clickhouse/quesma_communicator.go +++ b/quesma/clickhouse/quesma_communicator.go @@ -160,7 +160,10 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field // settings := make(clickhouse.Settings) - settings["readonly"] = "1" + // this "readonly" setting turned out to be causing problems with Hydrolix queries + // the queries looked pretty legit, but the key difference was the use of schema (`FROM "schema"."tableName"`) + // to be revisited in the future + // settings["readonly"] = "1" settings["allow_ddl"] = "0" if query.OptimizeHints != nil { @@ -185,7 +188,6 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field performanceResult.Error = err return nil, performanceResult, end_user_errors.GuessClickhouseErrorType(err).InternalDetails("clickhouse: query failed. err: %v, query: %v", err, queryAsString) } - res, err = read(rows, fields, rowToScan) elapsed := span.End(nil) diff --git a/quesma/clickhouse/table_discovery.go b/quesma/clickhouse/table_discovery.go index 6c32c304d..6b8f04f46 100644 --- a/quesma/clickhouse/table_discovery.go +++ b/quesma/clickhouse/table_discovery.go @@ -78,6 +78,7 @@ func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema Type: column.Type.String(), } } + table.DatabaseName = value.DatabaseName tables[tableName] = table return true }) @@ -162,7 +163,7 @@ func (td *tableDiscovery) configureTables(tables map[string]map[string]string, d comment := td.tableComment(databaseName, table) createTableQuery := td.createTableQuery(databaseName, table) // we assume here that @timestamp field is always present in the table, or it's explicitly configured - configuredTables[table] = discoveredTable{table, columns, indexConfig, comment, createTableQuery, ""} + configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, ""} } } else { notConfiguredTables = append(notConfiguredTables, table) @@ -191,7 +192,7 @@ func (td *tableDiscovery) autoConfigureTables(tables map[string]map[string]strin } else { maybeTimestampField = td.tableTimestampField(databaseName, table, ClickHouse) } - configuredTables[table] = discoveredTable{table, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField} + configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField} } for tableName, table := range configuredTables { diff --git a/quesma/model/expr.go b/quesma/model/expr.go index ce52d9ee6..83b57706d 100644 --- a/quesma/model/expr.go +++ b/quesma/model/expr.go @@ -127,7 +127,10 @@ func (s DistinctExpr) Accept(v ExprVisitor) interface{} { type TableRef struct { Name string // to be considered - alias (e.g. FROM tableName AS t) - // to be considered - database prefix (e.g. FROM databaseName.tableName) + + // DatabaseName is optional and represents what in database realm is called 'schema', e.g. 'FROM databaseName.tableName' + // ClickHouse calls this 'database' so we stick to that; FWIW - Hydrolix calls this a 'project'. + DatabaseName string } func NewTableRef(name string) TableRef { @@ -138,6 +141,10 @@ func (t TableRef) Accept(v ExprVisitor) interface{} { return v.VisitTableRef(t) } +func NewTableRefWithDatabaseName(name, databaseName string) TableRef { + return TableRef{Name: name, DatabaseName: databaseName} +} + type OrderByDirection int8 const ( diff --git a/quesma/model/expr_string_renderer.go b/quesma/model/expr_string_renderer.go index 37586def2..857015534 100644 --- a/quesma/model/expr_string_renderer.go +++ b/quesma/model/expr_string_renderer.go @@ -107,12 +107,22 @@ func (v *renderer) VisitDistinctExpr(e DistinctExpr) interface{} { } func (v *renderer) VisitTableRef(e TableRef) interface{} { + var result []string + if e.DatabaseName != "" { + if identifierRegexp.MatchString(e.DatabaseName) { + result = append(result, e.DatabaseName) + } else { + result = append(result, strconv.Quote(e.DatabaseName)) + } + } if identifierRegexp.MatchString(e.Name) { - return e.Name + result = append(result, e.Name) + } else { + result = append(result, strconv.Quote(e.Name)) } - return strconv.Quote(e.Name) + return strings.Join(result, ".") } func (v *renderer) VisitAliasedExpr(e AliasedExpr) interface{} { diff --git a/quesma/quesma/mappings_test.go b/quesma/quesma/mappings_test.go index 9a6c42366..1ccaf7994 100644 --- a/quesma/quesma/mappings_test.go +++ b/quesma/quesma/mappings_test.go @@ -104,7 +104,7 @@ func newSchemaFromColumns(fields map[string]schema.Column) schema.Schema { Type: parsedType, } } - return schema.NewSchema(schemaFields, true) + return schema.NewSchema(schemaFields, true, "") } func TestParseMappings_KibanaSampleFlights(t *testing.T) { diff --git a/quesma/quesma/schema_transformer.go b/quesma/quesma/schema_transformer.go index 676f0856b..9a1225f26 100644 --- a/quesma/quesma/schema_transformer.go +++ b/quesma/quesma/schema_transformer.go @@ -347,7 +347,7 @@ func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schem } // TODO compute physical from expression based on single table or union or whatever .... - physicalFromExpression := model.NewTableRef(query.TableName) + physicalFromExpression := model.NewTableRefWithDatabaseName(query.TableName, currentSchema.DatabaseName) visitor := model.NewBaseVisitor() @@ -362,6 +362,7 @@ func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schem if _, ok := expr.(*model.SelectCommand); ok { query.SelectCommand = *expr.(*model.SelectCommand) } + return query, nil } diff --git a/quesma/schema/hierarchical_schema_test.go b/quesma/schema/hierarchical_schema_test.go index 6993acd76..4be5ce7e1 100644 --- a/quesma/schema/hierarchical_schema_test.go +++ b/quesma/schema/hierarchical_schema_test.go @@ -15,7 +15,7 @@ func Test_SchemaToHierarchicalSchema(t *testing.T) { "product.product_id": {PropertyName: "product.product_id", InternalPropertyName: "product::product_id", Type: QuesmaTypeInteger}, "triple.nested.example1": {PropertyName: "triple.nested.example1", InternalPropertyName: "triple::nested::example1", Type: QuesmaTypeText}, "triple.nested.example2": {PropertyName: "triple.nested.example2", InternalPropertyName: "triple::nested::example2", Type: QuesmaTypeKeyword}, - }, map[FieldName]FieldName{}, true) + }, map[FieldName]FieldName{}, true, "") hs := SchemaToHierarchicalSchema(&s) assert.Equal(t, "", hs.Name) diff --git a/quesma/schema/registry.go b/quesma/schema/registry.go index 02d640639..9033b0743 100644 --- a/quesma/schema/registry.go +++ b/quesma/schema/registry.go @@ -29,7 +29,8 @@ type ( AutodiscoveryEnabled() bool } Table struct { - Columns map[string]Column + Columns map[string]Column + DatabaseName string } Column struct { Name string @@ -41,10 +42,10 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) { definitions := s.dataSourceTableProvider.TableDefinitions() schemas := make(map[TableName]Schema) if s.dataSourceTableProvider.AutodiscoveryEnabled() { - for tableName := range definitions { + for tableName, table := range definitions { fields := make(map[FieldName]Field) existsInDataSource := s.populateSchemaFromTableDefinition(definitions, tableName, fields) - schemas[TableName(tableName)] = NewSchema(fields, existsInDataSource) + schemas[TableName(tableName)] = NewSchema(fields, existsInDataSource, table.DatabaseName) } return schemas, nil } @@ -59,7 +60,11 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) { s.populateAliases(indexConfiguration, fields, aliases) s.removeIgnoredFields(indexConfiguration, fields, aliases) s.removeGeoPhysicalFields(fields) - schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource) + if tableDefinition, ok := definitions[indexName]; ok { + schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource, tableDefinition.DatabaseName) + } else { + schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource, "") + } } return schemas, nil diff --git a/quesma/schema/registry_test.go b/quesma/schema/registry_test.go index 07011e67e..ea7cfaae7 100644 --- a/quesma/schema/registry_test.go +++ b/quesma/schema/registry_test.go @@ -48,7 +48,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, - true), + true, ""), found: true, }, { @@ -77,7 +77,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, - true), + true, ""), found: true, }, { @@ -104,7 +104,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: ""}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, - true), + true, ""), found: true, }, { @@ -129,7 +129,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, - true), + true, ""), found: true, }, { @@ -145,7 +145,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { }, tableDiscovery: fixedTableProvider{tables: map[string]schema.Table{}}, tableName: "some_table", - want: schema.NewSchema(map[schema.FieldName]schema.Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword}}, false), + want: schema.NewSchema(map[schema.FieldName]schema.Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword}}, false, ""), found: true, }, { @@ -171,7 +171,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, - true), + true, ""), found: true, }, { @@ -199,7 +199,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, map[schema.FieldName]schema.FieldName{ "message_alias": "message", - }, true), + }, true, ""), found: true, }, { @@ -229,7 +229,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, map[schema.FieldName]schema.FieldName{ "message_alias": "message", - }, true), + }, true, ""), found: true, }, { @@ -299,7 +299,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) { "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"}, "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, - true) + true, "") resultSchema, resultFound := s.FindSchema(schema.TableName(tableName)) assert.True(t, resultFound, "schema not found") if !reflect.DeepEqual(resultSchema, expectedSchema) { @@ -320,7 +320,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) { "event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"}, "count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}, "new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}}, - true) + true, "") resultSchema, resultFound = s.FindSchema(schema.TableName(tableName)) assert.True(t, resultFound, "schema not found") if !reflect.DeepEqual(resultSchema, expectedSchema) { diff --git a/quesma/schema/schema.go b/quesma/schema/schema.go index c13a59d05..f77d6ee1f 100644 --- a/quesma/schema/schema.go +++ b/quesma/schema/schema.go @@ -12,6 +12,9 @@ type ( Aliases map[FieldName]FieldName ExistsInDataSource bool internalNameToField map[FieldName]Field + // DatabaseName is the name of the database/schema in the data source, + // which in query prepends the physical table name e.g. 'FROM databaseName.tableName' + DatabaseName string } Field struct { // PropertyName is how users refer to the field @@ -25,7 +28,7 @@ type ( FieldName string ) -func NewSchemaWithAliases(fields map[FieldName]Field, aliases map[FieldName]FieldName, existsInDataSource bool) Schema { +func NewSchemaWithAliases(fields map[FieldName]Field, aliases map[FieldName]FieldName, existsInDataSource bool, databaseName string) Schema { internalNameToField := make(map[FieldName]Field) for _, field := range fields { internalNameToField[field.InternalPropertyName] = field @@ -35,11 +38,12 @@ func NewSchemaWithAliases(fields map[FieldName]Field, aliases map[FieldName]Fiel Aliases: aliases, ExistsInDataSource: existsInDataSource, internalNameToField: internalNameToField, + DatabaseName: databaseName, } } -func NewSchema(fields map[FieldName]Field, existsInDataSource bool) Schema { - return NewSchemaWithAliases(fields, map[FieldName]FieldName{}, existsInDataSource) +func NewSchema(fields map[FieldName]Field, existsInDataSource bool, databaseName string) Schema { + return NewSchemaWithAliases(fields, map[FieldName]FieldName{}, existsInDataSource, databaseName) } func (f FieldName) AsString() string { diff --git a/quesma/schema/schema_test.go b/quesma/schema/schema_test.go index c1bfbb643..afaf79ffa 100644 --- a/quesma/schema/schema_test.go +++ b/quesma/schema/schema_test.go @@ -24,28 +24,28 @@ func TestSchema_ResolveField(t *testing.T) { { name: "should resolve field", fieldName: "message", - schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false), + schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false, ""), resolvedField: Field{PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}, exists: true, }, { name: "should not resolve field", fieldName: "foo", - schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false), + schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false, ""), resolvedField: Field{}, exists: false, }, { name: "should resolve aliased field", fieldName: "message_alias", - schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "message"}, false), + schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "message"}, false, ""), resolvedField: Field{PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}, exists: true, }, { name: "should not resolve aliased field", fieldName: "message_alias", - schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "foo"}, false), + schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "foo"}, false, ""), resolvedField: Field{}, exists: false, }, @@ -73,21 +73,21 @@ func TestSchema_ResolveFieldByInternalName(t *testing.T) { }{ { testName: "empty schema", - schema: NewSchemaWithAliases(map[FieldName]Field{}, map[FieldName]FieldName{}, false), + schema: NewSchemaWithAliases(map[FieldName]Field{}, map[FieldName]FieldName{}, false, ""), fieldName: "message", want: Field{}, found: false, }, { testName: "schema with fields with internal separators, lookup by property name", - schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false), + schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false, ""), fieldName: "foo.bar", want: Field{}, found: false, }, { testName: "schema with fields with internal separators, lookup by internal name", - schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false), + schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false, ""), fieldName: "foo::bar", want: Field{PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}, found: true, @@ -95,7 +95,7 @@ func TestSchema_ResolveFieldByInternalName(t *testing.T) { } for _, tt := range tests { t.Run(tt.testName, func(t *testing.T) { - s := NewSchemaWithAliases(tt.schema.Fields, tt.schema.Aliases, false) + s := NewSchemaWithAliases(tt.schema.Fields, tt.schema.Aliases, false, "") got, found := s.ResolveFieldByInternalName(tt.fieldName) if !reflect.DeepEqual(got, tt.want) { t.Errorf("ResolveFieldByInternalName() got = %v, want %v", got, tt.want)