diff --git a/quesma/clickhouse/alter_table_test.go b/quesma/clickhouse/alter_table_test.go index c1eaf0a10..0142a2532 100644 --- a/quesma/clickhouse/alter_table_test.go +++ b/quesma/clickhouse/alter_table_test.go @@ -46,7 +46,7 @@ func TestAlterTable(t *testing.T) { lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{}) for i := range rowsToInsert { - insert, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig, true) + insert, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig) assert.Equal(t, expectedInsert[i], insert) assert.Equal(t, alters[i], alter[0]) // Table will grow with each iteration @@ -97,16 +97,16 @@ func TestAlterTableHeuristic(t *testing.T) { rowsToInsert = append(rowsToInsert, `{`+currentRow+`}`) previousRow = currentRow } - + attrsMap := make(map[string][]interface{}) for i := range rowsToInsert { - shouldAlterColumns := lm.shouldAlterColumns(table) + shouldAlterColumns := lm.shouldAlterColumns(table, attrsMap) if i < maxColumns { assert.True(t, shouldAlterColumns) } else { assert.False(t, shouldAlterColumns) } - _, _, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig, true) + _, _, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig) assert.NoError(t, err) } diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 308bcdfae..bab19ab1b 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -47,13 +47,14 @@ type ( type ( // LogManager should be renamed to Connector -> TODO !!! LogManager struct { - ctx context.Context - cancel context.CancelFunc - chDb *sql.DB - tableDiscovery TableDiscovery - cfg config.QuesmaConfiguration - phoneHomeAgent telemetry.PhoneHomeAgent - schemaRegistry schema.Registry + ctx context.Context + cancel context.CancelFunc + chDb *sql.DB + tableDiscovery TableDiscovery + cfg config.QuesmaConfiguration + phoneHomeAgent telemetry.PhoneHomeAgent + schemaRegistry schema.Registry + ingestFieldStatistics IngestFieldStatistics } TableMap = concurrent.Map[string, *Table] SchemaMap = map[string]interface{} // TODO remove @@ -495,12 +496,19 @@ func generateNonSchemaFieldsString(attrsMap map[string][]interface{}) (string, e } // This function implements heuristic for deciding if we should add new columns -func (lm *LogManager) shouldAlterColumns(table *Table) bool { +func (lm *LogManager) shouldAlterColumns(table *Table, attrsMap map[string][]interface{}) bool { + _ = attrsMap + if lm.ingestFieldStatistics == nil { + lm.ingestFieldStatistics = make(IngestFieldStatistics) + } + + lm.ingestFieldStatistics[IngestFieldBucketKey{indexName: table.Name, field: "test", insertBucket: 0}]++ + return len(table.Cols) < maxColumns } func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON, inValidJson types.JSON, - config *ChTableConfig, generateNewColumns bool) (string, []string, error) { + config *ChTableConfig) (string, []string, error) { jsonData, err := json.Marshal(data) @@ -555,7 +563,7 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON // we only want to add fields that are not part of the schema e.g we don't // have columns for them var alterCmd []string - if generateNewColumns { + if lm.shouldAlterColumns(table, attrsMap) { alterCmd = lm.generateNewColumns(attrsMap, table) } // If there are some invalid fields, we need to add them to the attributes map @@ -677,10 +685,6 @@ func (lm *LogManager) executeStatements(ctx context.Context, queries []string) e func (lm *LogManager) GenerateSqlStatements(ctx context.Context, tableName string, jsons []types.JSON, config *ChTableConfig, transformer jsonprocessor.IngestTransformer) ([]string, error) { - // Below const tells if we should generate new columns for the table - // or add them to the attributes map - generateNewColumns := lm.shouldAlterColumns(lm.FindTable(tableName)) - var jsonsReadyForInsertion []string var alterCmd []string for _, jsonValue := range jsons { @@ -699,7 +703,7 @@ func (lm *LogManager) GenerateSqlStatements(ctx context.Context, tableName strin inValidJson, NestedSeparator) // Remove invalid fields from the input JSON preprocessedJson = subtractInputJson(preprocessedJson, inValidJson) - insertJson, alter, err := lm.BuildIngestSQLStatements(tableName, preprocessedJson, inValidJson, config, generateNewColumns) + insertJson, alter, err := lm.BuildIngestSQLStatements(tableName, preprocessedJson, inValidJson, config) alterCmd = append(alterCmd, alter...) if err != nil { return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", tableName, PrettyJson(insertJson), err) @@ -766,18 +770,22 @@ func NewEmptyLogManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeA func NewLogManager(tables *TableMap, cfg config.QuesmaConfiguration) *LogManager { var tableDefinitions = atomic.Pointer[TableMap]{} tableDefinitions.Store(tables) - return &LogManager{chDb: nil, tableDiscovery: newTableDiscoveryWith(cfg, nil, *tables), cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()} + return &LogManager{chDb: nil, tableDiscovery: newTableDiscoveryWith(cfg, nil, *tables), + cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), + ingestFieldStatistics: make(IngestFieldStatistics)} } // right now only for tests purposes func NewLogManagerWithConnection(db *sql.DB, tables *TableMap) *LogManager { - return &LogManager{chDb: db, tableDiscovery: newTableDiscoveryWith(config.QuesmaConfiguration{}, db, *tables), phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()} + return &LogManager{chDb: db, tableDiscovery: newTableDiscoveryWith(config.QuesmaConfiguration{}, db, *tables), + phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)} } func NewLogManagerEmpty() *LogManager { var tableDefinitions = atomic.Pointer[TableMap]{} tableDefinitions.Store(NewTableMap()) - return &LogManager{tableDiscovery: NewTableDiscovery(config.QuesmaConfiguration{}, nil), phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()} + return &LogManager{tableDiscovery: NewTableDiscovery(config.QuesmaConfiguration{}, nil), + phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)} } func NewOnlySchemaFieldsCHConfig() *ChTableConfig { diff --git a/quesma/clickhouse/clickhouse_test.go b/quesma/clickhouse/clickhouse_test.go index eba16ddeb..c7cac8adf 100644 --- a/quesma/clickhouse/clickhouse_test.go +++ b/quesma/clickhouse/clickhouse_test.go @@ -48,7 +48,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) { f := func(t1, t2 TableMap) { lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{}) - j, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowToInsert), nil, hasOthersConfig, true) + j, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowToInsert), nil, hasOthersConfig) assert.NoError(t, err) assert.Equal(t, 0, len(alter)) m := make(SchemaMap)