Skip to content

Commit

Permalink
Resolve table name with schema (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
mieciu authored Sep 16, 2024
1 parent 4352fca commit 50792b1
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 35 deletions.
1 change: 1 addition & 0 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (lm *LogManager) Stop() {

type discoveredTable struct {
name string
databaseName string
columnTypes map[string]string
config config.IndexConfiguration
comment string
Expand Down
6 changes: 4 additions & 2 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
//

settings := make(clickhouse.Settings)
settings["readonly"] = "1"
// this "readonly" setting turned out to be causing problems with Hydrolix queries
// the queries looked pretty legit, but the key difference was the use of schema (`FROM "schema"."tableName"`)
// to be revisited in the future
// settings["readonly"] = "1"
settings["allow_ddl"] = "0"

if query.OptimizeHints != nil {
Expand All @@ -185,7 +188,6 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
performanceResult.Error = err
return nil, performanceResult, end_user_errors.GuessClickhouseErrorType(err).InternalDetails("clickhouse: query failed. err: %v, query: %v", err, queryAsString)
}

res, err = read(rows, fields, rowToScan)

elapsed := span.End(nil)
Expand Down
5 changes: 3 additions & 2 deletions quesma/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema
Type: column.Type.String(),
}
}
table.DatabaseName = value.DatabaseName
tables[tableName] = table
return true
})
Expand Down Expand Up @@ -162,7 +163,7 @@ func (td *tableDiscovery) configureTables(tables map[string]map[string]string, d
comment := td.tableComment(databaseName, table)
createTableQuery := td.createTableQuery(databaseName, table)
// we assume here that @timestamp field is always present in the table, or it's explicitly configured
configuredTables[table] = discoveredTable{table, columns, indexConfig, comment, createTableQuery, ""}
configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, ""}
}
} else {
notConfiguredTables = append(notConfiguredTables, table)
Expand Down Expand Up @@ -191,7 +192,7 @@ func (td *tableDiscovery) autoConfigureTables(tables map[string]map[string]strin
} else {
maybeTimestampField = td.tableTimestampField(databaseName, table, ClickHouse)
}
configuredTables[table] = discoveredTable{table, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField}
configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField}

}
for tableName, table := range configuredTables {
Expand Down
9 changes: 8 additions & 1 deletion quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ func (s DistinctExpr) Accept(v ExprVisitor) interface{} {
type TableRef struct {
Name string
// to be considered - alias (e.g. FROM tableName AS t)
// to be considered - database prefix (e.g. FROM databaseName.tableName)

// DatabaseName is optional and represents what in database realm is called 'schema', e.g. 'FROM databaseName.tableName'
// ClickHouse calls this 'database' so we stick to that; FWIW - Hydrolix calls this a 'project'.
DatabaseName string
}

func NewTableRef(name string) TableRef {
Expand All @@ -138,6 +141,10 @@ func (t TableRef) Accept(v ExprVisitor) interface{} {
return v.VisitTableRef(t)
}

func NewTableRefWithDatabaseName(name, databaseName string) TableRef {
return TableRef{Name: name, DatabaseName: databaseName}
}

type OrderByDirection int8

const (
Expand Down
14 changes: 12 additions & 2 deletions quesma/model/expr_string_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,22 @@ func (v *renderer) VisitDistinctExpr(e DistinctExpr) interface{} {
}

func (v *renderer) VisitTableRef(e TableRef) interface{} {
var result []string

if e.DatabaseName != "" {
if identifierRegexp.MatchString(e.DatabaseName) {
result = append(result, e.DatabaseName)
} else {
result = append(result, strconv.Quote(e.DatabaseName))
}
}
if identifierRegexp.MatchString(e.Name) {
return e.Name
result = append(result, e.Name)
} else {
result = append(result, strconv.Quote(e.Name))
}

return strconv.Quote(e.Name)
return strings.Join(result, ".")
}

func (v *renderer) VisitAliasedExpr(e AliasedExpr) interface{} {
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/mappings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newSchemaFromColumns(fields map[string]schema.Column) schema.Schema {
Type: parsedType,
}
}
return schema.NewSchema(schemaFields, true)
return schema.NewSchema(schemaFields, true, "")
}

func TestParseMappings_KibanaSampleFlights(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schem
}

// TODO compute physical from expression based on single table or union or whatever ....
physicalFromExpression := model.NewTableRef(query.TableName)
physicalFromExpression := model.NewTableRefWithDatabaseName(query.TableName, currentSchema.DatabaseName)

visitor := model.NewBaseVisitor()

Expand All @@ -362,6 +362,7 @@ func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schem
if _, ok := expr.(*model.SelectCommand); ok {
query.SelectCommand = *expr.(*model.SelectCommand)
}

return query, nil
}

Expand Down
2 changes: 1 addition & 1 deletion quesma/schema/hierarchical_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func Test_SchemaToHierarchicalSchema(t *testing.T) {
"product.product_id": {PropertyName: "product.product_id", InternalPropertyName: "product::product_id", Type: QuesmaTypeInteger},
"triple.nested.example1": {PropertyName: "triple.nested.example1", InternalPropertyName: "triple::nested::example1", Type: QuesmaTypeText},
"triple.nested.example2": {PropertyName: "triple.nested.example2", InternalPropertyName: "triple::nested::example2", Type: QuesmaTypeKeyword},
}, map[FieldName]FieldName{}, true)
}, map[FieldName]FieldName{}, true, "")

hs := SchemaToHierarchicalSchema(&s)
assert.Equal(t, "", hs.Name)
Expand Down
13 changes: 9 additions & 4 deletions quesma/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type (
AutodiscoveryEnabled() bool
}
Table struct {
Columns map[string]Column
Columns map[string]Column
DatabaseName string
}
Column struct {
Name string
Expand All @@ -41,10 +42,10 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) {
definitions := s.dataSourceTableProvider.TableDefinitions()
schemas := make(map[TableName]Schema)
if s.dataSourceTableProvider.AutodiscoveryEnabled() {
for tableName := range definitions {
for tableName, table := range definitions {
fields := make(map[FieldName]Field)
existsInDataSource := s.populateSchemaFromTableDefinition(definitions, tableName, fields)
schemas[TableName(tableName)] = NewSchema(fields, existsInDataSource)
schemas[TableName(tableName)] = NewSchema(fields, existsInDataSource, table.DatabaseName)
}
return schemas, nil
}
Expand All @@ -59,7 +60,11 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) {
s.populateAliases(indexConfiguration, fields, aliases)
s.removeIgnoredFields(indexConfiguration, fields, aliases)
s.removeGeoPhysicalFields(fields)
schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource)
if tableDefinition, ok := definitions[indexName]; ok {
schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource, tableDefinition.DatabaseName)
} else {
schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource, "")
}
}

return schemas, nil
Expand Down
20 changes: 10 additions & 10 deletions quesma/schema/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}},
true),
true, ""),
found: true,
},
{
Expand Down Expand Up @@ -77,7 +77,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}},

true),
true, ""),
found: true,
},
{
Expand All @@ -104,7 +104,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: ""},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}},
true),
true, ""),
found: true,
},
{
Expand All @@ -129,7 +129,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}},
true),
true, ""),
found: true,
},
{
Expand All @@ -145,7 +145,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
},
tableDiscovery: fixedTableProvider{tables: map[string]schema.Table{}},
tableName: "some_table",
want: schema.NewSchema(map[schema.FieldName]schema.Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword}}, false),
want: schema.NewSchema(map[schema.FieldName]schema.Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword}}, false, ""),
found: true,
},
{
Expand All @@ -171,7 +171,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}},
true),
true, ""),
found: true,
},
{
Expand Down Expand Up @@ -199,7 +199,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, map[schema.FieldName]schema.FieldName{
"message_alias": "message",
}, true),
}, true, ""),
found: true,
},
{
Expand Down Expand Up @@ -229,7 +229,7 @@ func Test_schemaRegistry_FindSchema(t *testing.T) {
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}}, map[schema.FieldName]schema.FieldName{
"message_alias": "message",
}, true),
}, true, ""),
found: true,
},
{
Expand Down Expand Up @@ -299,7 +299,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) {
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"},
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"}},
true)
true, "")
resultSchema, resultFound := s.FindSchema(schema.TableName(tableName))
assert.True(t, resultFound, "schema not found")
if !reflect.DeepEqual(resultSchema, expectedSchema) {
Expand All @@ -320,7 +320,7 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) {
"event_date": {PropertyName: "event_date", InternalPropertyName: "event_date", Type: schema.QuesmaTypeTimestamp, InternalPropertyType: "DateTime64"},
"count": {PropertyName: "count", InternalPropertyName: "count", Type: schema.QuesmaTypeLong, InternalPropertyType: "Int64"},
"new_column": {PropertyName: "new_column", InternalPropertyName: "new_column", Type: schema.QuesmaTypeText}},
true)
true, "")
resultSchema, resultFound = s.FindSchema(schema.TableName(tableName))
assert.True(t, resultFound, "schema not found")
if !reflect.DeepEqual(resultSchema, expectedSchema) {
Expand Down
10 changes: 7 additions & 3 deletions quesma/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ type (
Aliases map[FieldName]FieldName
ExistsInDataSource bool
internalNameToField map[FieldName]Field
// DatabaseName is the name of the database/schema in the data source,
// which in query prepends the physical table name e.g. 'FROM databaseName.tableName'
DatabaseName string
}
Field struct {
// PropertyName is how users refer to the field
Expand All @@ -25,7 +28,7 @@ type (
FieldName string
)

func NewSchemaWithAliases(fields map[FieldName]Field, aliases map[FieldName]FieldName, existsInDataSource bool) Schema {
func NewSchemaWithAliases(fields map[FieldName]Field, aliases map[FieldName]FieldName, existsInDataSource bool, databaseName string) Schema {
internalNameToField := make(map[FieldName]Field)
for _, field := range fields {
internalNameToField[field.InternalPropertyName] = field
Expand All @@ -35,11 +38,12 @@ func NewSchemaWithAliases(fields map[FieldName]Field, aliases map[FieldName]Fiel
Aliases: aliases,
ExistsInDataSource: existsInDataSource,
internalNameToField: internalNameToField,
DatabaseName: databaseName,
}
}

func NewSchema(fields map[FieldName]Field, existsInDataSource bool) Schema {
return NewSchemaWithAliases(fields, map[FieldName]FieldName{}, existsInDataSource)
func NewSchema(fields map[FieldName]Field, existsInDataSource bool, databaseName string) Schema {
return NewSchemaWithAliases(fields, map[FieldName]FieldName{}, existsInDataSource, databaseName)
}

func (f FieldName) AsString() string {
Expand Down
16 changes: 8 additions & 8 deletions quesma/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ func TestSchema_ResolveField(t *testing.T) {
{
name: "should resolve field",
fieldName: "message",
schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false),
schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false, ""),
resolvedField: Field{PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText},
exists: true,
},
{
name: "should not resolve field",
fieldName: "foo",
schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false),
schema: NewSchema(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, false, ""),
resolvedField: Field{},
exists: false,
},
{
name: "should resolve aliased field",
fieldName: "message_alias",
schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "message"}, false),
schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "message"}, false, ""),
resolvedField: Field{PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText},
exists: true,
},
{
name: "should not resolve aliased field",
fieldName: "message_alias",
schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "foo"}, false),
schema: NewSchemaWithAliases(map[FieldName]Field{"message": {PropertyName: "message", InternalPropertyName: "message", Type: QuesmaTypeText}}, map[FieldName]FieldName{"message_alias": "foo"}, false, ""),
resolvedField: Field{},
exists: false,
},
Expand Down Expand Up @@ -73,29 +73,29 @@ func TestSchema_ResolveFieldByInternalName(t *testing.T) {
}{
{
testName: "empty schema",
schema: NewSchemaWithAliases(map[FieldName]Field{}, map[FieldName]FieldName{}, false),
schema: NewSchemaWithAliases(map[FieldName]Field{}, map[FieldName]FieldName{}, false, ""),
fieldName: "message",
want: Field{},
found: false,
},
{
testName: "schema with fields with internal separators, lookup by property name",
schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false),
schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false, ""),
fieldName: "foo.bar",
want: Field{},
found: false,
},
{
testName: "schema with fields with internal separators, lookup by internal name",
schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false),
schema: NewSchema(map[FieldName]Field{"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText}}, false, ""),
fieldName: "foo::bar",
want: Field{PropertyName: "foo.bar", InternalPropertyName: "foo::bar", Type: QuesmaTypeText},
found: true,
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
s := NewSchemaWithAliases(tt.schema.Fields, tt.schema.Aliases, false)
s := NewSchemaWithAliases(tt.schema.Fields, tt.schema.Aliases, false, "")
got, found := s.ResolveFieldByInternalName(tt.fieldName)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ResolveFieldByInternalName() got = %v, want %v", got, tt.want)
Expand Down

0 comments on commit 50792b1

Please sign in to comment.