Skip to content

Commit

Permalink
Use schema registry information in CREATE TABLE (#536)
Browse files Browse the repository at this point in the history
When ingesting data into ClickHouse via Quesma, the first insert can
automatically create a corresponding table in ClickHouse (if a user
didn't do it beforehand).

That CREATE TABLE statement was previously based solely on the contents
of the first inserted data (the first JSON). However, the information in
that first JSON is often not sufficient to create a good ClickHouse
schema. Prior to this change, the only way for the user to affect the
CREATE TABLE was to perform it manually beforehand.

Extend the CREATE TABLE generation code to also take into account the
information in schema the registry. Therefore the user can have some
control over the CREATE TABLE statement via the "mapping" configuration
in the config file. In practice this allows for:

1. Overriding the type of column (for example the initial JSON might
have int values, but now the user can specify they should be stored as
floats instead)
2. Adding columns that are missing in the initial JSON

One shortcoming of this solution is missing support for arrays. For
example, `products.product_name: "text"` defined in
"kibana_sample_data_ecommerce" is actually an array of text, but the
configuration is not expressive enough to encode this fact yet. In such
case the JSON type takes precedence over the user-specified
configuration.
  • Loading branch information
avelanarius authored Jul 17, 2024
1 parent ebeaa89 commit 7b84d19
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 48 deletions.
22 changes: 16 additions & 6 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"quesma/quesma/config"
"quesma/quesma/recovery"
"quesma/quesma/types"
"quesma/schema"
"quesma/telemetry"
"quesma/util"
"slices"
Expand All @@ -41,6 +42,7 @@ type (
schemaLoader TableDiscovery
cfg config.QuesmaConfiguration
phoneHomeAgent telemetry.PhoneHomeAgent
schemaRegistry schema.Registry
}
TableMap = concurrent.Map[string, *Table]
SchemaMap = map[string]interface{} // TODO remove
Expand Down Expand Up @@ -349,18 +351,26 @@ func (lm *LogManager) ProcessCreateTableQuery(ctx context.Context, query string,
return lm.sendCreateTableQuery(ctx, addOurFieldsToCreateTableQuery(query, config, table))
}

func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData types.JSON, tableConfig *ChTableConfig, cfg config.QuesmaConfiguration) (string, error) {
func findSchemaPointer(schemaRegistry schema.Registry, tableName string) *schema.Schema {
if foundSchema, found := schemaRegistry.FindSchema(schema.TableName(tableName)); found {
return &foundSchema
}
return nil
}

func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData types.JSON, tableConfig *ChTableConfig, cfg config.QuesmaConfiguration, schemaRegistry schema.Registry) (string, error) {

nameFormatter, err := registry.TableColumNameFormatterFor(tableName, cfg)
if err != nil {
return "", err
}

columns := FieldsMapToCreateTableString("", jsonData, 1, tableConfig, nameFormatter) + Indexes(jsonData)
columns := FieldsMapToCreateTableString(jsonData, tableConfig, nameFormatter, findSchemaPointer(schemaRegistry, tableName)) + Indexes(jsonData)

createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s"
(
%s
%s
)
%s
COMMENT 'created by Quesma'`,
Expand All @@ -387,7 +397,7 @@ func Indexes(m SchemaMap) string {
func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData types.JSON, config *ChTableConfig) error {
// TODO fix lm.AddTableIfDoesntExist(name, jsonData)

query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config, lm.cfg)
query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config, lm.cfg, lm.schemaRegistry)
if err != nil {
return err
}
Expand Down Expand Up @@ -597,9 +607,9 @@ func (lm *LogManager) Ping() error {
return lm.chDb.Ping()
}

func NewEmptyLogManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader TableDiscovery) *LogManager {
func NewEmptyLogManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader TableDiscovery, schemaRegistry schema.Registry) *LogManager {
ctx, cancel := context.WithCancel(context.Background())
return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, schemaLoader: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent}
return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, schemaLoader: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry}
}

func NewLogManager(tables *TableMap, cfg config.QuesmaConfiguration) *LogManager {
Expand Down
23 changes: 22 additions & 1 deletion quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"quesma/concurrent"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/schema"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -106,6 +107,26 @@ func TestInsertNonSchemaFields_2(t *testing.T) {
}
*/

type staticRegistry struct {
tables map[schema.TableName]schema.Schema
}

func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema {
if e.tables != nil {
return e.tables
} else {
return map[schema.TableName]schema.Schema{}
}
}

func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) {
if e.tables == nil {
return schema.Schema{}, false
}
s, found := e.tables[name]
return s, found
}

func TestAddTimestamp(t *testing.T) {
tableConfig := &ChTableConfig{
hasTimestamp: true,
Expand All @@ -120,7 +141,7 @@ func TestAddTimestamp(t *testing.T) {
castUnsupportedAttrValueTypesToString: false,
preferCastingToOthers: false,
}
query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), tableConfig, config.QuesmaConfiguration{})
query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), tableConfig, config.QuesmaConfiguration{}, staticRegistry{})
assert.NoError(t, err)
assert.True(t, strings.Contains(query, timestampFieldName))
}
Expand Down
6 changes: 4 additions & 2 deletions quesma/clickhouse/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func logManagersNonEmpty(cfg *ChTableConfig) []logManagerHelper {
}

func logManagers(config *ChTableConfig) []logManagerHelper {
return append([]logManagerHelper{{NewLogManagerEmpty(), false}}, logManagersNonEmpty(config)...)
logManager := NewLogManagerEmpty()
logManager.schemaRegistry = staticRegistry{}
return append([]logManagerHelper{{logManager, false}}, logManagersNonEmpty(config)...)
}

func TestAutomaticTableCreationAtInsert(t *testing.T) {
Expand All @@ -148,7 +150,7 @@ func TestAutomaticTableCreationAtInsert(t *testing.T) {
for index3, lm := range logManagers(tableConfig) {
t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], logManager["+strconv.Itoa(index3)+"]", func(t *testing.T) {

query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, types.MustJSON(tt.insertJson), tableConfig, cfg)
query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, types.MustJSON(tt.insertJson), tableConfig, cfg, staticRegistry{})
assert.NoError(t, err)
table, err := NewTable(query, tableConfig)
assert.NoError(t, err)
Expand Down
125 changes: 96 additions & 29 deletions quesma/clickhouse/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,75 @@ package clickhouse

import (
"fmt"
"quesma/logger"
"quesma/plugins"
"quesma/schema"
"quesma/util"
"slices"
"strings"
)

const NestedSeparator = "::"

type CreateTableEntry struct {
ClickHouseColumnName string
ClickHouseType string
}

// m: unmarshalled json from HTTP request
// Returns nicely formatted string for CREATE TABLE command
func FieldsMapToCreateTableString(namespace string, m SchemaMap, indentLvl int, config *ChTableConfig, nameFormatter plugins.TableColumNameFormatter) string {

func FieldsMapToCreateTableString(m SchemaMap, config *ChTableConfig, nameFormatter plugins.TableColumNameFormatter, schemaMapping *schema.Schema) string {
var result strings.Builder
i := 0
for name, value := range m {
if namespace == "" {
result.WriteString("\n")

columnsFromJson := JsonToColumns("", m, 1, config, nameFormatter)
columnsFromSchema := SchemaToColumns(schemaMapping, nameFormatter)

first := true
for _, columnFromJson := range columnsFromJson {
if first {
first = false
} else {
result.WriteString(",\n")
}
result.WriteString(util.Indent(1))

if columnFromSchema, found := columnsFromSchema[schema.FieldName(columnFromJson.ClickHouseColumnName)]; found && !strings.Contains(columnFromJson.ClickHouseType, "Array") {
// Schema takes precedence over JSON (except for Arrays which are not currently handled)
result.WriteString(fmt.Sprintf("\"%s\" %s", columnFromSchema.ClickHouseColumnName, columnFromSchema.ClickHouseType))
} else {
result.WriteString(fmt.Sprintf("\"%s\" %s", columnFromJson.ClickHouseColumnName, columnFromJson.ClickHouseType))
}

delete(columnsFromSchema, schema.FieldName(columnFromJson.ClickHouseColumnName))
}

// There might be some columns from schema which were not present in the JSON
for _, column := range columnsFromSchema {
if first {
first = false
} else {
result.WriteString(",\n")
}
result.WriteString(util.Indent(1))
result.WriteString(fmt.Sprintf("\"%s\" %s", column.ClickHouseColumnName, column.ClickHouseType))
}

return result.String()
}

func JsonToColumns(namespace string, m SchemaMap, indentLvl int, config *ChTableConfig, nameFormatter plugins.TableColumNameFormatter) []CreateTableEntry {
var resultColumns []CreateTableEntry

for name, value := range m {
listValue, isListValue := value.([]interface{})
if isListValue {
value = listValue
}
nestedValue, ok := value.(SchemaMap)
if (ok && nestedValue != nil && len(nestedValue) > 0) && !isListValue {
var nested []string
if namespace == "" {
nested = append(nested, FieldsMapToCreateTableString(name, nestedValue, indentLvl, config, nameFormatter))
} else {
nested = append(nested, FieldsMapToCreateTableString(nameFormatter.Format(namespace, name), nestedValue, indentLvl, config, nameFormatter))
}

result.WriteString(strings.Join(nested, ",\n"))
nested := JsonToColumns(nameFormatter.Format(namespace, name), nestedValue, indentLvl, config, nameFormatter)
resultColumns = append(resultColumns, nested...)
} else {
// value is a single field. Only String/Bool/DateTime64 supported for now.
var fType string
if value == nil { // HACK ALERT -> We're treating null values as strings for now, so that we don't completely discard documents with empty values
fType = "Nullable(String)"
Expand All @@ -52,24 +86,57 @@ func FieldsMapToCreateTableString(namespace string, m SchemaMap, indentLvl int,
if indentLvl == 1 && name == timestampFieldName && config.timestampDefaultsNow {
fType += " DEFAULT now64()"
}
result.WriteString(util.Indent(indentLvl))
if namespace == "" {
result.WriteString(fmt.Sprintf("\"%s\" %s", name, fType))
} else {
result.WriteString(fmt.Sprintf("\"%s\" %s", nameFormatter.Format(namespace, name), fType))
}
}
if i+1 < len(m) {
result.WriteString(",")
resultColumns = append(resultColumns, CreateTableEntry{ClickHouseColumnName: nameFormatter.Format(namespace, name), ClickHouseType: fType})
}
}
return resultColumns
}

if namespace != "" && i+1 < len(m) {
result.WriteString("\n")
}
func SchemaToColumns(schemaMapping *schema.Schema, nameFormatter plugins.TableColumNameFormatter) map[schema.FieldName]CreateTableEntry {
resultColumns := make(map[schema.FieldName]CreateTableEntry)

i++
if schemaMapping == nil {
return resultColumns
}
return result.String()

for _, field := range schemaMapping.Fields {
var fType string

// FIXME: shouldn't InternalPropertyName already have "::"? (it currently doesn't)
internalPropertyName := strings.Replace(field.InternalPropertyName.AsString(), ".", "::", -1)

switch field.Type.Name {
default:
logger.Warn().Msgf("Unsupported field type '%s' for field '%s' when trying to create a table. Ignoring that field.", field.Type.Name, field.PropertyName.AsString())
continue
case schema.TypePoint.Name:
lat := nameFormatter.Format(internalPropertyName, "lat")
lon := nameFormatter.Format(internalPropertyName, "lon")
resultColumns[schema.FieldName(lat)] = CreateTableEntry{ClickHouseColumnName: lat, ClickHouseType: "Nullable(String)"}
resultColumns[schema.FieldName(lon)] = CreateTableEntry{ClickHouseColumnName: lon, ClickHouseType: "Nullable(String)"}
continue

// Simple types:
case schema.TypeText.Name:
fType = "Nullable(String)"
case schema.TypeKeyword.Name:
fType = "Nullable(String)"
case schema.TypeLong.Name:
fType = "Nullable(Int64)"
case schema.TypeUnsignedLong.Name:
fType = "Nullable(Uint64)"
case schema.TypeTimestamp.Name:
fType = "Nullable(DateTime64)"
case schema.TypeDate.Name:
fType = "Nullable(Date)"
case schema.TypeFloat.Name:
fType = "Nullable(Float64)"
case schema.TypeBoolean.Name:
fType = "Nullable(Bool)"
}
resultColumns[schema.FieldName(internalPropertyName)] = CreateTableEntry{ClickHouseColumnName: internalPropertyName, ClickHouseType: fType}
}
return resultColumns
}

// Returns map with fields that are in 'sm', but not in our table schema 't'.
Expand Down
13 changes: 7 additions & 6 deletions quesma/connectors/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"quesma/licensing"
"quesma/logger"
"quesma/quesma/config"
"quesma/schema"
"quesma/telemetry"
)

Expand Down Expand Up @@ -36,27 +37,27 @@ func (c *ConnectorManager) GetConnector() *clickhouse.LogManager {
return c.connectors[0].GetConnector()
}

func NewConnectorManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery) *ConnectorManager {
func NewConnectorManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery, registry schema.Registry) *ConnectorManager {
return &ConnectorManager{
connectors: registerConnectors(cfg, chDb, phoneHomeAgent, loader),
connectors: registerConnectors(cfg, chDb, phoneHomeAgent, loader, registry),
}
}

func registerConnectors(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery) (conns []Connector) {
func registerConnectors(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery, registry schema.Registry) (conns []Connector) {
for connName, conn := range cfg.Connectors {
logger.Info().Msgf("Registering connector named [%s] of type [%s]", connName, conn.ConnectorType)
switch conn.ConnectorType {
case clickHouseConnectorTypeName:
conns = append(conns, &ClickHouseConnector{
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader, registry),
})
case clickHouseOSConnectorTypeName:
conns = append(conns, &ClickHouseOSConnector{
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader, registry),
})
case hydrolixConnectorTypeName:
conns = append(conns, &HydrolixConnector{
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader, registry),
})
default:
logger.Error().Msgf("Unknown connector type [%s]", conn.ConnectorType)
Expand Down
2 changes: 1 addition & 1 deletion quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
schemaLoader := clickhouse.NewTableDiscovery(cfg, schemaManagement)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: schemaLoader}, cfg, clickhouse.SchemaTypeAdapter{})

connManager := connectors.NewConnectorManager(cfg, connectionPool, phoneHomeAgent, schemaLoader)
connManager := connectors.NewConnectorManager(cfg, connectionPool, phoneHomeAgent, schemaLoader, schemaRegistry)
lm := connManager.GetConnector()

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type columNameFormatter struct {
}

func (t *columNameFormatter) Format(namespace, columnName string) string {
if namespace == "" {
return columnName
}
return fmt.Sprintf("%s%s%s", namespace, t.separator, columnName)
}

Expand Down
3 changes: 3 additions & 0 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema {
}

func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) {
if e.tables == nil {
return schema.Schema{}, false
}
s, found := e.tables[name]
return s, found
}
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/query_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestQueryParserStringAttrConfig(t *testing.T) {

cfg.IndexConfig[indexConfig.Name] = indexConfig

lm := clickhouse.NewEmptyLogManager(cfg, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil))
lm := clickhouse.NewEmptyLogManager(cfg, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil), staticRegistry{})
lm.AddTableIfDoesntExist(table)
s := staticRegistry{
tables: map[schema.TableName]schema.Schema{
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestQueryParserNoFullTextFields(t *testing.T) {
},
Created: true,
}
lm := clickhouse.NewEmptyLogManager(config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil))
lm := clickhouse.NewEmptyLogManager(config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(config.QuesmaConfiguration{}, nil), staticRegistry{})
lm.AddTableIfDoesntExist(&table)
indexConfig := config.IndexConfiguration{
Name: "logs-generic-default",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ func (e staticRegistry) AllSchemas() map[schema.TableName]schema.Schema {
}

func (e staticRegistry) FindSchema(name schema.TableName) (schema.Schema, bool) {
if e.tables == nil {
return schema.Schema{}, false
}
s, found := e.tables[name]
return s, found
}
Loading

0 comments on commit 7b84d19

Please sign in to comment.