diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index a231da1ba..00bc933ca 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -19,6 +19,7 @@ import ( "quesma/quesma/config" "quesma/quesma/recovery" "quesma/quesma/types" + "quesma/schema" "quesma/telemetry" "quesma/util" "slices" @@ -41,6 +42,7 @@ type ( schemaLoader TableDiscovery cfg config.QuesmaConfiguration phoneHomeAgent telemetry.PhoneHomeAgent + schemaRegistry schema.Registry } TableMap = concurrent.Map[string, *Table] SchemaMap = map[string]interface{} // TODO remove @@ -349,18 +351,26 @@ func (lm *LogManager) ProcessCreateTableQuery(ctx context.Context, query string, return lm.sendCreateTableQuery(ctx, addOurFieldsToCreateTableQuery(query, config, table)) } -func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData types.JSON, tableConfig *ChTableConfig, cfg config.QuesmaConfiguration) (string, error) { +func findSchemaPointer(schemaRegistry schema.Registry, tableName string) *schema.Schema { + if foundSchema, found := schemaRegistry.FindSchema(schema.TableName(tableName)); found { + return &foundSchema + } + return nil +} + +func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData types.JSON, tableConfig *ChTableConfig, cfg config.QuesmaConfiguration, schemaRegistry schema.Registry) (string, error) { nameFormatter, err := registry.TableColumNameFormatterFor(tableName, cfg) if err != nil { return "", err } - columns := FieldsMapToCreateTableString("", jsonData, 1, tableConfig, nameFormatter) + Indexes(jsonData) + columns := FieldsMapToCreateTableString(jsonData, tableConfig, nameFormatter, findSchemaPointer(schemaRegistry, tableName)) + Indexes(jsonData) createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s" ( - %s + +%s ) %s COMMENT 'created by Quesma'`, @@ -387,7 +397,7 @@ func Indexes(m SchemaMap) string { func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData types.JSON, config *ChTableConfig) error { // TODO fix lm.AddTableIfDoesntExist(name, jsonData) - query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config, lm.cfg) + query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config, lm.cfg, lm.schemaRegistry) if err != nil { return err } @@ -597,9 +607,9 @@ func (lm *LogManager) Ping() error { return lm.chDb.Ping() } -func NewEmptyLogManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader TableDiscovery) *LogManager { +func NewEmptyLogManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader TableDiscovery, schemaRegistry schema.Registry) *LogManager { ctx, cancel := context.WithCancel(context.Background()) - return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, schemaLoader: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent} + return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, schemaLoader: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry} } func NewLogManager(tables *TableMap, cfg config.QuesmaConfiguration) *LogManager { diff --git a/quesma/clickhouse/clickhouse_test.go b/quesma/clickhouse/clickhouse_test.go index e0c8acc86..514db4a55 100644 --- a/quesma/clickhouse/clickhouse_test.go +++ b/quesma/clickhouse/clickhouse_test.go @@ -8,6 +8,7 @@ import ( "quesma/concurrent" "quesma/quesma/config" "quesma/quesma/types" + "quesma/schema" "strings" "sync/atomic" "testing" @@ -106,6 +107,26 @@ func TestInsertNonSchemaFields_2(t *testing.T) { } */ +type staticRegistry struct { + tables map[schema.TableName]schema.Schema +} + +func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema { + if e.tables != nil { + return e.tables + } else { + return map[schema.TableName]schema.Schema{} + } +} + +func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) { + if e.tables == nil { + return schema.Schema{}, false + } + s, found := e.tables[name] + return s, found +} + func TestAddTimestamp(t *testing.T) { tableConfig := &ChTableConfig{ hasTimestamp: true, @@ -120,7 +141,7 @@ func TestAddTimestamp(t *testing.T) { castUnsupportedAttrValueTypesToString: false, preferCastingToOthers: false, } - query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), tableConfig, config.QuesmaConfiguration{}) + query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), tableConfig, config.QuesmaConfiguration{}, staticRegistry{}) assert.NoError(t, err) assert.True(t, strings.Contains(query, timestampFieldName)) } diff --git a/quesma/clickhouse/insert_test.go b/quesma/clickhouse/insert_test.go index ba5ef9d46..dc69fcf15 100644 --- a/quesma/clickhouse/insert_test.go +++ b/quesma/clickhouse/insert_test.go @@ -138,7 +138,9 @@ func logManagersNonEmpty(cfg *ChTableConfig) []logManagerHelper { } func logManagers(config *ChTableConfig) []logManagerHelper { - return append([]logManagerHelper{{NewLogManagerEmpty(), false}}, logManagersNonEmpty(config)...) + logManager := NewLogManagerEmpty() + logManager.schemaRegistry = staticRegistry{} + return append([]logManagerHelper{{logManager, false}}, logManagersNonEmpty(config)...) } func TestAutomaticTableCreationAtInsert(t *testing.T) { @@ -148,7 +150,7 @@ func TestAutomaticTableCreationAtInsert(t *testing.T) { for index3, lm := range logManagers(tableConfig) { t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], logManager["+strconv.Itoa(index3)+"]", func(t *testing.T) { - query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, types.MustJSON(tt.insertJson), tableConfig, cfg) + query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, types.MustJSON(tt.insertJson), tableConfig, cfg, staticRegistry{}) assert.NoError(t, err) table, err := NewTable(query, tableConfig) assert.NoError(t, err) diff --git a/quesma/clickhouse/parser.go b/quesma/clickhouse/parser.go index 41ea0ac57..b69286cf3 100644 --- a/quesma/clickhouse/parser.go +++ b/quesma/clickhouse/parser.go @@ -4,7 +4,9 @@ package clickhouse import ( "fmt" + "quesma/logger" "quesma/plugins" + "quesma/schema" "quesma/util" "slices" "strings" @@ -12,33 +14,65 @@ import ( const NestedSeparator = "::" +type CreateTableEntry struct { + ClickHouseColumnName string + ClickHouseType string +} + // m: unmarshalled json from HTTP request // Returns nicely formatted string for CREATE TABLE command -func FieldsMapToCreateTableString(namespace string, m SchemaMap, indentLvl int, config *ChTableConfig, nameFormatter plugins.TableColumNameFormatter) string { - +func FieldsMapToCreateTableString(m SchemaMap, config *ChTableConfig, nameFormatter plugins.TableColumNameFormatter, schemaMapping *schema.Schema) string { var result strings.Builder - i := 0 - for name, value := range m { - if namespace == "" { - result.WriteString("\n") + + columnsFromJson := JsonToColumns("", m, 1, config, nameFormatter) + columnsFromSchema := SchemaToColumns(schemaMapping, nameFormatter) + + first := true + for _, columnFromJson := range columnsFromJson { + if first { + first = false + } else { + result.WriteString(",\n") + } + result.WriteString(util.Indent(1)) + + if columnFromSchema, found := columnsFromSchema[schema.FieldName(columnFromJson.ClickHouseColumnName)]; found && !strings.Contains(columnFromJson.ClickHouseType, "Array") { + // Schema takes precedence over JSON (except for Arrays which are not currently handled) + result.WriteString(fmt.Sprintf("\"%s\" %s", columnFromSchema.ClickHouseColumnName, columnFromSchema.ClickHouseType)) + } else { + result.WriteString(fmt.Sprintf("\"%s\" %s", columnFromJson.ClickHouseColumnName, columnFromJson.ClickHouseType)) } + delete(columnsFromSchema, schema.FieldName(columnFromJson.ClickHouseColumnName)) + } + + // There might be some columns from schema which were not present in the JSON + for _, column := range columnsFromSchema { + if first { + first = false + } else { + result.WriteString(",\n") + } + result.WriteString(util.Indent(1)) + result.WriteString(fmt.Sprintf("\"%s\" %s", column.ClickHouseColumnName, column.ClickHouseType)) + } + + return result.String() +} + +func JsonToColumns(namespace string, m SchemaMap, indentLvl int, config *ChTableConfig, nameFormatter plugins.TableColumNameFormatter) []CreateTableEntry { + var resultColumns []CreateTableEntry + + for name, value := range m { listValue, isListValue := value.([]interface{}) if isListValue { value = listValue } nestedValue, ok := value.(SchemaMap) if (ok && nestedValue != nil && len(nestedValue) > 0) && !isListValue { - var nested []string - if namespace == "" { - nested = append(nested, FieldsMapToCreateTableString(name, nestedValue, indentLvl, config, nameFormatter)) - } else { - nested = append(nested, FieldsMapToCreateTableString(nameFormatter.Format(namespace, name), nestedValue, indentLvl, config, nameFormatter)) - } - - result.WriteString(strings.Join(nested, ",\n")) + nested := JsonToColumns(nameFormatter.Format(namespace, name), nestedValue, indentLvl, config, nameFormatter) + resultColumns = append(resultColumns, nested...) } else { - // value is a single field. Only String/Bool/DateTime64 supported for now. var fType string if value == nil { // HACK ALERT -> We're treating null values as strings for now, so that we don't completely discard documents with empty values fType = "Nullable(String)" @@ -52,24 +86,57 @@ func FieldsMapToCreateTableString(namespace string, m SchemaMap, indentLvl int, if indentLvl == 1 && name == timestampFieldName && config.timestampDefaultsNow { fType += " DEFAULT now64()" } - result.WriteString(util.Indent(indentLvl)) - if namespace == "" { - result.WriteString(fmt.Sprintf("\"%s\" %s", name, fType)) - } else { - result.WriteString(fmt.Sprintf("\"%s\" %s", nameFormatter.Format(namespace, name), fType)) - } - } - if i+1 < len(m) { - result.WriteString(",") + resultColumns = append(resultColumns, CreateTableEntry{ClickHouseColumnName: nameFormatter.Format(namespace, name), ClickHouseType: fType}) } + } + return resultColumns +} - if namespace != "" && i+1 < len(m) { - result.WriteString("\n") - } +func SchemaToColumns(schemaMapping *schema.Schema, nameFormatter plugins.TableColumNameFormatter) map[schema.FieldName]CreateTableEntry { + resultColumns := make(map[schema.FieldName]CreateTableEntry) - i++ + if schemaMapping == nil { + return resultColumns } - return result.String() + + for _, field := range schemaMapping.Fields { + var fType string + + // FIXME: shouldn't InternalPropertyName already have "::"? (it currently doesn't) + internalPropertyName := strings.Replace(field.InternalPropertyName.AsString(), ".", "::", -1) + + switch field.Type.Name { + default: + logger.Warn().Msgf("Unsupported field type '%s' for field '%s' when trying to create a table. Ignoring that field.", field.Type.Name, field.PropertyName.AsString()) + continue + case schema.TypePoint.Name: + lat := nameFormatter.Format(internalPropertyName, "lat") + lon := nameFormatter.Format(internalPropertyName, "lon") + resultColumns[schema.FieldName(lat)] = CreateTableEntry{ClickHouseColumnName: lat, ClickHouseType: "Nullable(String)"} + resultColumns[schema.FieldName(lon)] = CreateTableEntry{ClickHouseColumnName: lon, ClickHouseType: "Nullable(String)"} + continue + + // Simple types: + case schema.TypeText.Name: + fType = "Nullable(String)" + case schema.TypeKeyword.Name: + fType = "Nullable(String)" + case schema.TypeLong.Name: + fType = "Nullable(Int64)" + case schema.TypeUnsignedLong.Name: + fType = "Nullable(Uint64)" + case schema.TypeTimestamp.Name: + fType = "Nullable(DateTime64)" + case schema.TypeDate.Name: + fType = "Nullable(Date)" + case schema.TypeFloat.Name: + fType = "Nullable(Float64)" + case schema.TypeBoolean.Name: + fType = "Nullable(Bool)" + } + resultColumns[schema.FieldName(internalPropertyName)] = CreateTableEntry{ClickHouseColumnName: internalPropertyName, ClickHouseType: fType} + } + return resultColumns } // Returns map with fields that are in 'sm', but not in our table schema 't'. diff --git a/quesma/connectors/connector.go b/quesma/connectors/connector.go index e71f49b56..4e07640c8 100644 --- a/quesma/connectors/connector.go +++ b/quesma/connectors/connector.go @@ -9,6 +9,7 @@ import ( "quesma/licensing" "quesma/logger" "quesma/quesma/config" + "quesma/schema" "quesma/telemetry" ) @@ -36,27 +37,27 @@ func (c *ConnectorManager) GetConnector() *clickhouse.LogManager { return c.connectors[0].GetConnector() } -func NewConnectorManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery) *ConnectorManager { +func NewConnectorManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery, registry schema.Registry) *ConnectorManager { return &ConnectorManager{ - connectors: registerConnectors(cfg, chDb, phoneHomeAgent, loader), + connectors: registerConnectors(cfg, chDb, phoneHomeAgent, loader, registry), } } -func registerConnectors(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery) (conns []Connector) { +func registerConnectors(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery, registry schema.Registry) (conns []Connector) { for connName, conn := range cfg.Connectors { logger.Info().Msgf("Registering connector named [%s] of type [%s]", connName, conn.ConnectorType) switch conn.ConnectorType { case clickHouseConnectorTypeName: conns = append(conns, &ClickHouseConnector{ - Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader), + Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader, registry), }) case clickHouseOSConnectorTypeName: conns = append(conns, &ClickHouseOSConnector{ - Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader), + Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader, registry), }) case hydrolixConnectorTypeName: conns = append(conns, &HydrolixConnector{ - Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader), + Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader, registry), }) default: logger.Error().Msgf("Unknown connector type [%s]", conn.ConnectorType) diff --git a/quesma/main.go b/quesma/main.go index aeae7e05d..f89c65f24 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -75,7 +75,7 @@ func main() { schemaLoader := clickhouse.NewTableDiscovery(cfg, schemaManagement) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: schemaLoader}, cfg, clickhouse.SchemaTypeAdapter{}) - connManager := connectors.NewConnectorManager(cfg, connectionPool, phoneHomeAgent, schemaLoader) + connManager := connectors.NewConnectorManager(cfg, connectionPool, phoneHomeAgent, schemaLoader, schemaRegistry) lm := connManager.GetConnector() im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String()) diff --git a/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go b/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go index ea9ba87da..1d4499cac 100644 --- a/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go +++ b/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go @@ -111,6 +111,9 @@ type columNameFormatter struct { } func (t *columNameFormatter) Format(namespace, columnName string) string { + if namespace == "" { + return columnName + } return fmt.Sprintf("%s%s%s", namespace, t.separator, columnName) } diff --git a/quesma/queryparser/aggregation_parser_test.go b/quesma/queryparser/aggregation_parser_test.go index 5b930b4ac..c1cb5ccd7 100644 --- a/quesma/queryparser/aggregation_parser_test.go +++ b/quesma/queryparser/aggregation_parser_test.go @@ -42,6 +42,9 @@ func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema { } func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) { + if e.tables == nil { + return schema.Schema{}, false + } s, found := e.tables[name] return s, found } diff --git a/quesma/queryparser/query_parser_test.go b/quesma/queryparser/query_parser_test.go index c0c5da2e9..d71b0cc17 100644 --- a/quesma/queryparser/query_parser_test.go +++ b/quesma/queryparser/query_parser_test.go @@ -47,7 +47,7 @@ func TestQueryParserStringAttrConfig(t *testing.T) { cfg.IndexConfig[indexConfig.Name] = indexConfig - lm := clickhouse.NewEmptyLogManager(cfg, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil)) + lm := clickhouse.NewEmptyLogManager(cfg, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil), staticRegistry{}) lm.AddTableIfDoesntExist(table) s := staticRegistry{ tables: map[schema.TableName]schema.Schema{ @@ -105,7 +105,7 @@ func TestQueryParserNoFullTextFields(t *testing.T) { }, Created: true, } - lm := clickhouse.NewEmptyLogManager(config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil)) + lm := clickhouse.NewEmptyLogManager(config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil), staticRegistry{}) lm.AddTableIfDoesntExist(&table) indexConfig := config.IndexConfiguration{ Name: "logs-generic-default", diff --git a/quesma/quesma/functionality/field_capabilities/field_caps_test.go b/quesma/quesma/functionality/field_capabilities/field_caps_test.go index a2e90e60c..81341fb79 100644 --- a/quesma/quesma/functionality/field_capabilities/field_caps_test.go +++ b/quesma/quesma/functionality/field_capabilities/field_caps_test.go @@ -426,6 +426,9 @@ func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema { } func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) { + if e.tables == nil { + return schema.Schema{}, false + } s, found := e.tables[name] return s, found } diff --git a/quesma/quesma/functionality/terms_enum/terms_enum_test.go b/quesma/quesma/functionality/terms_enum/terms_enum_test.go index 01701b50a..2656ea934 100644 --- a/quesma/quesma/functionality/terms_enum/terms_enum_test.go +++ b/quesma/quesma/functionality/terms_enum/terms_enum_test.go @@ -70,6 +70,9 @@ func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema { } func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) { + if e.tables == nil { + return schema.Schema{}, false + } s, found := e.tables[name] return s, found } diff --git a/quesma/quesma/highlight_test.go b/quesma/quesma/highlight_test.go index eca7ee811..a66e685ca 100644 --- a/quesma/quesma/highlight_test.go +++ b/quesma/quesma/highlight_test.go @@ -109,7 +109,7 @@ func TestParseHighLight(t *testing.T) { Config: clickhouse.NewDefaultCHConfig(), } - lm := clickhouse.NewEmptyLogManager(config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), nil) + lm := clickhouse.NewEmptyLogManager(config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), nil, staticRegistry{}) cw := queryparser.ClickhouseQueryTranslator{ ClickhouseLM: lm, diff --git a/quesma/quesma/static_registry_test.go b/quesma/quesma/static_registry_test.go index 80873a620..6d57ffc6d 100644 --- a/quesma/quesma/static_registry_test.go +++ b/quesma/quesma/static_registry_test.go @@ -18,6 +18,9 @@ func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema { } func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) { + if e.tables == nil { + return schema.Schema{}, false + } s, found := e.tables[name] return s, found }