diff --git a/quesma/clickhouse/schema_loader.go b/quesma/clickhouse/schema_loader.go index ee24af5cb..29daeabb7 100644 --- a/quesma/clickhouse/schema_loader.go +++ b/quesma/clickhouse/schema_loader.go @@ -5,6 +5,7 @@ package clickhouse import ( "context" "errors" + "fmt" "quesma/end_user_errors" "quesma/logger" "quesma/quesma/config" @@ -72,16 +73,18 @@ func (sl *tableDiscovery) TableDefinitionsFetchError() error { return sl.ReloadTablesError } +func (sl *tableDiscovery) TableAutodiscoveryEnabled() bool { + return sl.cfg.IndexConfig == nil +} + func (sl *tableDiscovery) ReloadTableDefinitions() { logger.Debug().Msg("reloading tables definitions") - configuredTables := make(map[string]discoveredTable) - var explicitlyDisabledTables, notConfiguredTables []string + var configuredTables map[string]discoveredTable databaseName := "default" if sl.cfg.ClickHouse.Database != "" { databaseName = sl.cfg.ClickHouse.Database } if tables, err := sl.SchemaManagement.readTables(databaseName); err != nil { - var endUserError *end_user_errors.EndUserError if errors.As(err, &endUserError) { logger.ErrorWithCtxAndReason(context.Background(), endUserError.Reason()).Msgf("could not describe tables: %v", err) @@ -92,23 +95,36 @@ func (sl *tableDiscovery) ReloadTableDefinitions() { sl.tableDefinitions.Store(NewTableMap()) return } else { - for table, columns := range tables { - if indexConfig, found := sl.cfg.IndexConfig[table]; found { - if indexConfig.Enabled { - for colName := range columns { - if _, exists := indexConfig.Aliases[colName]; exists { - logger.Error().Msgf("column [%s] clashes with an existing alias, table [%s]", colName, table) - } + if sl.TableAutodiscoveryEnabled() { + configuredTables = sl.autoConfigureTables(tables, databaseName) + } else { + configuredTables = sl.configureTables(tables, databaseName) + } + } + sl.ReloadTablesError = nil + sl.populateTableDefinitions(configuredTables, databaseName, sl.cfg) +} + +// configureTables confronts the tables discovered in the database with the configuration provided by the user, returning final list of tables managed by Quesma +func (sl *tableDiscovery) configureTables(tables map[string]map[string]string, databaseName string) (configuredTables map[string]discoveredTable) { + configuredTables = make(map[string]discoveredTable) + var explicitlyDisabledTables, notConfiguredTables []string + for table, columns := range tables { + if indexConfig, found := sl.cfg.IndexConfig[table]; found { + if indexConfig.Enabled { + for colName := range columns { + if _, exists := indexConfig.Aliases[colName]; exists { + logger.Error().Msgf("column [%s] clashes with an existing alias, table [%s]", colName, table) } - comment := sl.SchemaManagement.tableComment(databaseName, table) - createTableQuery := sl.SchemaManagement.createTableQuery(databaseName, table) - configuredTables[table] = discoveredTable{columns, indexConfig, comment, createTableQuery} - } else { - explicitlyDisabledTables = append(explicitlyDisabledTables, table) } + comment := sl.SchemaManagement.tableComment(databaseName, table) + createTableQuery := sl.SchemaManagement.createTableQuery(databaseName, table) + configuredTables[table] = discoveredTable{columns, indexConfig, comment, createTableQuery} } else { - notConfiguredTables = append(notConfiguredTables, table) + explicitlyDisabledTables = append(explicitlyDisabledTables, table) } + } else { + notConfiguredTables = append(notConfiguredTables, table) } } logger.Info().Msgf( @@ -117,8 +133,34 @@ func (sl *tableDiscovery) ReloadTableDefinitions() { strings.Join(notConfiguredTables, ","), strings.Join(explicitlyDisabledTables, ","), ) - sl.ReloadTablesError = nil - sl.populateTableDefinitions(configuredTables, databaseName, sl.cfg) + return +} + +// autoConfigureTables takes the list of discovered tables and automatically configures them, returning the final list of tables managed by Quesma +func (sl *tableDiscovery) autoConfigureTables(tables map[string]map[string]string, databaseName string) (configuredTables map[string]discoveredTable) { + configuredTables = make(map[string]discoveredTable) + var autoDiscoResults strings.Builder + logger.Info().Msg("Index configuration empty, running table auto-discovery") + for table, columns := range tables { + comment := sl.SchemaManagement.tableComment(databaseName, table) + createTableQuery := sl.SchemaManagement.createTableQuery(databaseName, table) + var maybeTimestampField string + if sl.cfg.Hydrolix.IsNonEmpty() { + maybeTimestampField = sl.SchemaManagement.tableTimestampField(databaseName, table, Hydrolix) + } else { + maybeTimestampField = sl.SchemaManagement.tableTimestampField(databaseName, table, ClickHouse) + } + if maybeTimestampField != "" { + configuredTables[table] = discoveredTable{columns, config.IndexConfiguration{TimestampField: &maybeTimestampField}, comment, createTableQuery} + } else { + configuredTables[table] = discoveredTable{columns, config.IndexConfiguration{}, comment, createTableQuery} + } + } + for tableName, conf := range configuredTables { + autoDiscoResults.WriteString(fmt.Sprintf("{table: %s, timestampField: %s}, ", tableName, conf.config.GetTimestampField())) + } + logger.Info().Msgf("Table auto-discovery results -> %d tables found: [%s]", len(configuredTables), strings.TrimSuffix(autoDiscoResults.String(), ", ")) + return } func (sl *tableDiscovery) populateTableDefinitions(configuredTables map[string]discoveredTable, databaseName string, cfg config.QuesmaConfiguration) { diff --git a/quesma/clickhouse/schema_management.go b/quesma/clickhouse/schema_management.go index ca1bb0622..b79625261 100644 --- a/quesma/clickhouse/schema_management.go +++ b/quesma/clickhouse/schema_management.go @@ -12,6 +12,19 @@ type SchemaManagement struct { chDb *sql.DB } +//enum for dbKind + +type DbKind int + +const ( + ClickHouse DbKind = iota //"clickhouse" + Hydrolix // = "hydrolix" +) + +func (d DbKind) String() string { + return [...]string{"clickhouse", "hydrolix"}[d] +} + func NewSchemaManagement(chDb *sql.DB) *SchemaManagement { return &SchemaManagement{chDb: chDb} } @@ -40,9 +53,38 @@ func (s *SchemaManagement) readTables(database string) (map[string]map[string]st return columnsPerTable, nil } +func (s *SchemaManagement) tableTimestampField(database, table string, dbKind DbKind) (primaryKey string) { + switch dbKind { + case Hydrolix: + return s.getTimestampFieldForHydrolix(database, table) + case ClickHouse: + return s.getTimestampFieldForClickHouse(database, table) + } + return +} + +func (s *SchemaManagement) getTimestampFieldForHydrolix(database, table string) (timestampField string) { + // In Hydrolix, there's always only one column in a table set as a primary timestamp + // Ref: https://docs.hydrolix.io/docs/transforms-and-write-schema#primary-timestamp + if err := s.chDb.QueryRow("SELECT primary_key FROM system.tables WHERE database = ? and table = ?", database, table).Scan(×tampField); err != nil { + logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err) + } + return timestampField +} + +func (s *SchemaManagement) getTimestampFieldForClickHouse(database, table string) (timestampField string) { + // In ClickHouse, there's no concept of a primary timestamp field, primary keys are often composite, + // hence we have to use following heuristic to determine the timestamp field (also just picking the first column if there are multiple) + if err := s.chDb.QueryRow("SELECT name FROM system.columns WHERE database = ? AND table = ? AND is_in_primary_key = 1 AND type iLIKE 'DateTime%'", database, table).Scan(×tampField); err != nil { + logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err) + return + } + return timestampField +} + func (s *SchemaManagement) tableComment(database, table string) (comment string) { - err := s.chDb.QueryRow("SELECT comment FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&comment) + err := s.chDb.QueryRow("SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment) if err != nil { logger.Error().Msgf("could not get table comment: %v", err) diff --git a/quesma/quesma/config/index_config.go b/quesma/quesma/config/index_config.go index 920237840..cbc3b36c5 100644 --- a/quesma/quesma/config/index_config.go +++ b/quesma/quesma/config/index_config.go @@ -31,6 +31,13 @@ func (c IndexConfiguration) HasFullTextField(fieldName string) bool { return slices.Contains(c.FullTextFields, fieldName) } +func (c IndexConfiguration) GetTimestampField() (tsField string) { + if c.TimestampField != nil { + tsField = *c.TimestampField + } + return +} + func (c IndexConfiguration) String() string { var extraString string extraString = ""