Skip to content

Commit

Permalink
Table autodiscovery (#497)
Browse files Browse the repository at this point in the history
Adding table auto discovery logic. It's pretty basic for now, but should
elevate customer experience when they first set up Quesma.

### How it works
Auto-discovery kicks in whenever `indexes` section in configuration is
left empty. Additionally to tables, we try to detect a "timestamp field"
via simple heuristics.
  • Loading branch information
mieciu authored Jul 17, 2024
1 parent 11f1ab3 commit ebeaa89
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 19 deletions.
78 changes: 60 additions & 18 deletions quesma/clickhouse/schema_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package clickhouse
import (
"context"
"errors"
"fmt"
"quesma/end_user_errors"
"quesma/logger"
"quesma/quesma/config"
Expand Down Expand Up @@ -72,16 +73,18 @@ func (sl *tableDiscovery) TableDefinitionsFetchError() error {
return sl.ReloadTablesError
}

func (sl *tableDiscovery) TableAutodiscoveryEnabled() bool {
return sl.cfg.IndexConfig == nil
}

func (sl *tableDiscovery) ReloadTableDefinitions() {
logger.Debug().Msg("reloading tables definitions")
configuredTables := make(map[string]discoveredTable)
var explicitlyDisabledTables, notConfiguredTables []string
var configuredTables map[string]discoveredTable
databaseName := "default"
if sl.cfg.ClickHouse.Database != "" {
databaseName = sl.cfg.ClickHouse.Database
}
if tables, err := sl.SchemaManagement.readTables(databaseName); err != nil {

var endUserError *end_user_errors.EndUserError
if errors.As(err, &endUserError) {
logger.ErrorWithCtxAndReason(context.Background(), endUserError.Reason()).Msgf("could not describe tables: %v", err)
Expand All @@ -92,23 +95,36 @@ func (sl *tableDiscovery) ReloadTableDefinitions() {
sl.tableDefinitions.Store(NewTableMap())
return
} else {
for table, columns := range tables {
if indexConfig, found := sl.cfg.IndexConfig[table]; found {
if indexConfig.Enabled {
for colName := range columns {
if _, exists := indexConfig.Aliases[colName]; exists {
logger.Error().Msgf("column [%s] clashes with an existing alias, table [%s]", colName, table)
}
if sl.TableAutodiscoveryEnabled() {
configuredTables = sl.autoConfigureTables(tables, databaseName)
} else {
configuredTables = sl.configureTables(tables, databaseName)
}
}
sl.ReloadTablesError = nil
sl.populateTableDefinitions(configuredTables, databaseName, sl.cfg)
}

// configureTables confronts the tables discovered in the database with the configuration provided by the user, returning final list of tables managed by Quesma
func (sl *tableDiscovery) configureTables(tables map[string]map[string]string, databaseName string) (configuredTables map[string]discoveredTable) {
configuredTables = make(map[string]discoveredTable)
var explicitlyDisabledTables, notConfiguredTables []string
for table, columns := range tables {
if indexConfig, found := sl.cfg.IndexConfig[table]; found {
if indexConfig.Enabled {
for colName := range columns {
if _, exists := indexConfig.Aliases[colName]; exists {
logger.Error().Msgf("column [%s] clashes with an existing alias, table [%s]", colName, table)
}
comment := sl.SchemaManagement.tableComment(databaseName, table)
createTableQuery := sl.SchemaManagement.createTableQuery(databaseName, table)
configuredTables[table] = discoveredTable{columns, indexConfig, comment, createTableQuery}
} else {
explicitlyDisabledTables = append(explicitlyDisabledTables, table)
}
comment := sl.SchemaManagement.tableComment(databaseName, table)
createTableQuery := sl.SchemaManagement.createTableQuery(databaseName, table)
configuredTables[table] = discoveredTable{columns, indexConfig, comment, createTableQuery}
} else {
notConfiguredTables = append(notConfiguredTables, table)
explicitlyDisabledTables = append(explicitlyDisabledTables, table)
}
} else {
notConfiguredTables = append(notConfiguredTables, table)
}
}
logger.Info().Msgf(
Expand All @@ -117,8 +133,34 @@ func (sl *tableDiscovery) ReloadTableDefinitions() {
strings.Join(notConfiguredTables, ","),
strings.Join(explicitlyDisabledTables, ","),
)
sl.ReloadTablesError = nil
sl.populateTableDefinitions(configuredTables, databaseName, sl.cfg)
return
}

// autoConfigureTables takes the list of discovered tables and automatically configures them, returning the final list of tables managed by Quesma
func (sl *tableDiscovery) autoConfigureTables(tables map[string]map[string]string, databaseName string) (configuredTables map[string]discoveredTable) {
configuredTables = make(map[string]discoveredTable)
var autoDiscoResults strings.Builder
logger.Info().Msg("Index configuration empty, running table auto-discovery")
for table, columns := range tables {
comment := sl.SchemaManagement.tableComment(databaseName, table)
createTableQuery := sl.SchemaManagement.createTableQuery(databaseName, table)
var maybeTimestampField string
if sl.cfg.Hydrolix.IsNonEmpty() {
maybeTimestampField = sl.SchemaManagement.tableTimestampField(databaseName, table, Hydrolix)
} else {
maybeTimestampField = sl.SchemaManagement.tableTimestampField(databaseName, table, ClickHouse)
}
if maybeTimestampField != "" {
configuredTables[table] = discoveredTable{columns, config.IndexConfiguration{TimestampField: &maybeTimestampField}, comment, createTableQuery}
} else {
configuredTables[table] = discoveredTable{columns, config.IndexConfiguration{}, comment, createTableQuery}
}
}
for tableName, conf := range configuredTables {
autoDiscoResults.WriteString(fmt.Sprintf("{table: %s, timestampField: %s}, ", tableName, conf.config.GetTimestampField()))
}
logger.Info().Msgf("Table auto-discovery results -> %d tables found: [%s]", len(configuredTables), strings.TrimSuffix(autoDiscoResults.String(), ", "))
return
}

func (sl *tableDiscovery) populateTableDefinitions(configuredTables map[string]discoveredTable, databaseName string, cfg config.QuesmaConfiguration) {
Expand Down
44 changes: 43 additions & 1 deletion quesma/clickhouse/schema_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ type SchemaManagement struct {
chDb *sql.DB
}

//enum for dbKind

type DbKind int

const (
ClickHouse DbKind = iota //"clickhouse"
Hydrolix // = "hydrolix"
)

func (d DbKind) String() string {
return [...]string{"clickhouse", "hydrolix"}[d]
}

func NewSchemaManagement(chDb *sql.DB) *SchemaManagement {
return &SchemaManagement{chDb: chDb}
}
Expand Down Expand Up @@ -40,9 +53,38 @@ func (s *SchemaManagement) readTables(database string) (map[string]map[string]st
return columnsPerTable, nil
}

func (s *SchemaManagement) tableTimestampField(database, table string, dbKind DbKind) (primaryKey string) {
switch dbKind {
case Hydrolix:
return s.getTimestampFieldForHydrolix(database, table)
case ClickHouse:
return s.getTimestampFieldForClickHouse(database, table)
}
return
}

func (s *SchemaManagement) getTimestampFieldForHydrolix(database, table string) (timestampField string) {
// In Hydrolix, there's always only one column in a table set as a primary timestamp
// Ref: https://docs.hydrolix.io/docs/transforms-and-write-schema#primary-timestamp
if err := s.chDb.QueryRow("SELECT primary_key FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&timestampField); err != nil {
logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err)
}
return timestampField
}

func (s *SchemaManagement) getTimestampFieldForClickHouse(database, table string) (timestampField string) {
// In ClickHouse, there's no concept of a primary timestamp field, primary keys are often composite,
// hence we have to use following heuristic to determine the timestamp field (also just picking the first column if there are multiple)
if err := s.chDb.QueryRow("SELECT name FROM system.columns WHERE database = ? AND table = ? AND is_in_primary_key = 1 AND type iLIKE 'DateTime%'", database, table).Scan(&timestampField); err != nil {
logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err)
return
}
return timestampField
}

func (s *SchemaManagement) tableComment(database, table string) (comment string) {

err := s.chDb.QueryRow("SELECT comment FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&comment)
err := s.chDb.QueryRow("SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment)

if err != nil {
logger.Error().Msgf("could not get table comment: %v", err)
Expand Down
7 changes: 7 additions & 0 deletions quesma/quesma/config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ func (c IndexConfiguration) HasFullTextField(fieldName string) bool {
return slices.Contains(c.FullTextFields, fieldName)
}

func (c IndexConfiguration) GetTimestampField() (tsField string) {
if c.TimestampField != nil {
tsField = *c.TimestampField
}
return
}

func (c IndexConfiguration) String() string {
var extraString string
extraString = ""
Expand Down

0 comments on commit ebeaa89

Please sign in to comment.