Skip to content

Commit

Permalink
Move decision about alter column into body of lm.BuildIngestSQLStatem…
Browse files Browse the repository at this point in the history
…ents
  • Loading branch information
pdelewski committed Aug 26, 2024
1 parent 97ed3f1 commit 9736da7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 23 deletions.
8 changes: 4 additions & 4 deletions quesma/clickhouse/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestAlterTable(t *testing.T) {

lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{})
for i := range rowsToInsert {
insert, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig, true)
insert, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig)
assert.Equal(t, expectedInsert[i], insert)
assert.Equal(t, alters[i], alter[0])
// Table will grow with each iteration
Expand Down Expand Up @@ -97,16 +97,16 @@ func TestAlterTableHeuristic(t *testing.T) {
rowsToInsert = append(rowsToInsert, `{`+currentRow+`}`)
previousRow = currentRow
}

attrsMap := make(map[string][]interface{})
for i := range rowsToInsert {
shouldAlterColumns := lm.shouldAlterColumns(table)
shouldAlterColumns := lm.shouldAlterColumns(table, attrsMap)
if i < maxColumns {
assert.True(t, shouldAlterColumns)
} else {
assert.False(t, shouldAlterColumns)

}
_, _, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig, true)
_, _, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowsToInsert[i]), nil, chConfig)
assert.NoError(t, err)
}

Expand Down
44 changes: 26 additions & 18 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type (
type (
// LogManager should be renamed to Connector -> TODO !!!
LogManager struct {
ctx context.Context
cancel context.CancelFunc
chDb *sql.DB
tableDiscovery TableDiscovery
cfg config.QuesmaConfiguration
phoneHomeAgent telemetry.PhoneHomeAgent
schemaRegistry schema.Registry
ctx context.Context
cancel context.CancelFunc
chDb *sql.DB
tableDiscovery TableDiscovery
cfg config.QuesmaConfiguration
phoneHomeAgent telemetry.PhoneHomeAgent
schemaRegistry schema.Registry
ingestFieldStatistics IngestFieldStatistics
}
TableMap = concurrent.Map[string, *Table]
SchemaMap = map[string]interface{} // TODO remove
Expand Down Expand Up @@ -495,12 +496,19 @@ func generateNonSchemaFieldsString(attrsMap map[string][]interface{}) (string, e
}

// This function implements heuristic for deciding if we should add new columns
func (lm *LogManager) shouldAlterColumns(table *Table) bool {
func (lm *LogManager) shouldAlterColumns(table *Table, attrsMap map[string][]interface{}) bool {
_ = attrsMap
if lm.ingestFieldStatistics == nil {
lm.ingestFieldStatistics = make(IngestFieldStatistics)
}

lm.ingestFieldStatistics[IngestFieldBucketKey{indexName: table.Name, field: "test", insertBucket: 0}]++

return len(table.Cols) < maxColumns
}

func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON, inValidJson types.JSON,
config *ChTableConfig, generateNewColumns bool) (string, []string, error) {
config *ChTableConfig) (string, []string, error) {

jsonData, err := json.Marshal(data)

Expand Down Expand Up @@ -555,7 +563,7 @@ func (lm *LogManager) BuildIngestSQLStatements(tableName string, data types.JSON
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterCmd []string
if generateNewColumns {
if lm.shouldAlterColumns(table, attrsMap) {
alterCmd = lm.generateNewColumns(attrsMap, table)
}
// If there are some invalid fields, we need to add them to the attributes map
Expand Down Expand Up @@ -677,10 +685,6 @@ func (lm *LogManager) executeStatements(ctx context.Context, queries []string) e
func (lm *LogManager) GenerateSqlStatements(ctx context.Context, tableName string, jsons []types.JSON,
config *ChTableConfig, transformer jsonprocessor.IngestTransformer) ([]string, error) {

// Below const tells if we should generate new columns for the table
// or add them to the attributes map
generateNewColumns := lm.shouldAlterColumns(lm.FindTable(tableName))

var jsonsReadyForInsertion []string
var alterCmd []string
for _, jsonValue := range jsons {
Expand All @@ -699,7 +703,7 @@ func (lm *LogManager) GenerateSqlStatements(ctx context.Context, tableName strin
inValidJson, NestedSeparator)
// Remove invalid fields from the input JSON
preprocessedJson = subtractInputJson(preprocessedJson, inValidJson)
insertJson, alter, err := lm.BuildIngestSQLStatements(tableName, preprocessedJson, inValidJson, config, generateNewColumns)
insertJson, alter, err := lm.BuildIngestSQLStatements(tableName, preprocessedJson, inValidJson, config)
alterCmd = append(alterCmd, alter...)
if err != nil {
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", tableName, PrettyJson(insertJson), err)
Expand Down Expand Up @@ -766,18 +770,22 @@ func NewEmptyLogManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeA
func NewLogManager(tables *TableMap, cfg config.QuesmaConfiguration) *LogManager {
var tableDefinitions = atomic.Pointer[TableMap]{}
tableDefinitions.Store(tables)
return &LogManager{chDb: nil, tableDiscovery: newTableDiscoveryWith(cfg, nil, *tables), cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()}
return &LogManager{chDb: nil, tableDiscovery: newTableDiscoveryWith(cfg, nil, *tables),
cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(),
ingestFieldStatistics: make(IngestFieldStatistics)}
}

// right now only for tests purposes
func NewLogManagerWithConnection(db *sql.DB, tables *TableMap) *LogManager {
return &LogManager{chDb: db, tableDiscovery: newTableDiscoveryWith(config.QuesmaConfiguration{}, db, *tables), phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()}
return &LogManager{chDb: db, tableDiscovery: newTableDiscoveryWith(config.QuesmaConfiguration{}, db, *tables),
phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)}
}

func NewLogManagerEmpty() *LogManager {
var tableDefinitions = atomic.Pointer[TableMap]{}
tableDefinitions.Store(NewTableMap())
return &LogManager{tableDiscovery: NewTableDiscovery(config.QuesmaConfiguration{}, nil), phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()}
return &LogManager{tableDiscovery: NewTableDiscovery(config.QuesmaConfiguration{}, nil),
phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)}
}

func NewOnlySchemaFieldsCHConfig() *ChTableConfig {
Expand Down
2 changes: 1 addition & 1 deletion quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {

f := func(t1, t2 TableMap) {
lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{})
j, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowToInsert), nil, hasOthersConfig, true)
j, alter, err := lm.BuildIngestSQLStatements("tableName", types.MustJSON(rowToInsert), nil, hasOthersConfig)
assert.NoError(t, err)
assert.Equal(t, 0, len(alter))
m := make(SchemaMap)
Expand Down

0 comments on commit 9736da7

Please sign in to comment.