Skip to content

Commit

Permalink
Remove leftovers
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski committed Oct 14, 2024
1 parent 6df737d commit 1759962
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 41 deletions.
4 changes: 2 additions & 2 deletions quesma/ingest/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestAlterTable(t *testing.T) {

ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
for i := range rowsToInsert {
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
insert, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.Equal(t, expectedInsert[i], insert)
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestAlterTableHeuristic(t *testing.T) {

assert.Equal(t, int64(0), ip.ingestCounter)
for i := range rowsToInsert {
_, _, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
_, _, _, err := ip.GenerateIngestContent(table, types.MustJSON(rowsToInsert[i]), nil, chConfig, encodings)
assert.NoError(t, err)
}
assert.Equal(t, tc.expected, len(table.Cols))
Expand Down
60 changes: 22 additions & 38 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,6 @@ func getAttributesByArrayName(arrayName string,
return attributes
}

type AlterDDL struct {
tableName string
columnName string
columnType string
comment string
}

// This function generates ALTER TABLE commands for adding new columns
// to the table based on the attributesMap and the table name
// AttributesMap contains the attributes that are not part of the schema
Expand All @@ -302,14 +295,12 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap map[string][]interface{},
table *chLib.Table,
alteredAttributesIndexes []int,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL) {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string {
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap)
attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap)
var deleteIndexes []int
reverseMap := reverseFieldEncoding(encodings, table.Name)

// HACK Alert:
// We must avoid altering the table.Cols map and reading at the same time.
// This should be protected by a lock or a copy of the table should be used.
Expand Down Expand Up @@ -346,7 +337,6 @@ func (ip *IngestProcessor) generateNewColumns(
alterCmd = append(alterCmd, alterTable)

alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment)
alterDDLMap[attrKeys[i]] = AlterDDL{tableName: table.Name, columnName: attrKeys[i], columnType: columnType, comment: comment}
alterCmd = append(alterCmd, alterColumn)

deleteIndexes = append(deleteIndexes, i)
Expand All @@ -366,7 +356,7 @@ func (ip *IngestProcessor) generateNewColumns(
attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...)
attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...)
}
return alterCmd, alterDDLMap
return alterCmd
}

// This struct contains the information about the columns that aren't part of the schema
Expand Down Expand Up @@ -504,39 +494,39 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
data types.JSON,
inValidJson types.JSON,
config *chLib.ChTableConfig,
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, map[string]AlterDDL, types.JSON, []NonSchemaField, error) {
encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) {

jsonAsBytesSlice, err := json.Marshal(data)

if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}

// we find all non-schema fields
jsonMap, err := types.ParseJSON(string(jsonAsBytesSlice))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}

if len(config.Attributes) == 0 {
return nil, nil, jsonMap, nil, nil
return nil, jsonMap, nil, nil
}

schemaFieldsJson, err := json.Marshal(jsonMap)

if err != nil {
return nil, nil, jsonMap, nil, err
return nil, jsonMap, nil, err
}

mDiff := DifferenceMap(jsonMap, table) // TODO change to DifferenceMap(m, t)

if len(mDiff) == 0 && string(schemaFieldsJson) == string(jsonAsBytesSlice) && len(inValidJson) == 0 { // no need to modify, just insert 'js'
return nil, nil, jsonMap, nil, nil
return nil, jsonMap, nil, nil
}

// check attributes precondition
if len(config.Attributes) <= 0 {
return nil, nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff)
}
attrsMap, _ := BuildAttrsMap(mDiff, config)

Expand All @@ -546,10 +536,9 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
// we only want to add fields that are not part of the schema e.g we don't
// have columns for them
var alterCmd []string
alterDDLMap := make(map[string]AlterDDL)
atomic.AddInt64(&ip.ingestCounter, 1)
if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok {
alterCmd, alterDDLMap = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings)
}
// If there are some invalid fields, we need to add them to the attributes map
// to not lose them and be able to store them later by
Expand All @@ -560,12 +549,12 @@ func (ip *IngestProcessor) GenerateIngestContent(table *chLib.Table,
nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields)

if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}

onlySchemaFields := RemoveNonSchemaFields(jsonMap, table)

return alterCmd, alterDDLMap, onlySchemaFields, nonSchemaFields, nil
return alterCmd, onlySchemaFields, nonSchemaFields, nil
}

func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types.JSON) (string, error) {
Expand Down Expand Up @@ -607,7 +596,7 @@ func populateFieldEncodings(jsonData []types.JSON, tableName string) map[schema.
func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, map[string]AlterDDL, error) {
tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) {
// this is pre ingest transformer
// here we transform the data before it's structure evaluation and insertion
//
Expand All @@ -616,7 +605,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
for _, jsonValue := range jsonData {
result, err := preIngestTransformer.Transform(jsonValue)
if err != nil {
return nil, nil, fmt.Errorf("error while rewriting json: %v", err)
return nil, fmt.Errorf("error while rewriting json: %v", err)
}
processed = append(processed, result)
}
Expand Down Expand Up @@ -670,7 +659,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err)
return nil, nil, err
return nil, err
}
// Set pointer to table after creating it
table = ip.FindTable(tableName)
Expand All @@ -680,38 +669,34 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
tableConfig = table.Config
var jsonsReadyForInsertion []string
var alterCmd []string
alterDDLMapGlobal := make(map[string]AlterDDL)
var preprocessedJsons []types.JSON
var invalidJsons []types.JSON
preprocessedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, jsonData, transformer)
if err != nil {
return nil, nil, fmt.Errorf("error preprocessJsons: %v", err)
return nil, fmt.Errorf("error preprocessJsons: %v", err)
}
for i, preprocessedJson := range preprocessedJsons {
alter, alterDDLMap, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson,
invalidJsons[i], tableConfig, encodings)

if err != nil {
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err)
}
insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
if err != nil {
return nil, nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
alterCmd = append(alterCmd, alter...)
for key, value := range alterDDLMap {
alterDDLMapGlobal[key] = value
}
if err != nil {
return nil, nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err)
}
jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson)
}

insertValues := strings.Join(jsonsReadyForInsertion, ", ")
insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues)

return generateSqlStatements(createTableCmd, alterCmd, insert), alterDDLMapGlobal, nil
return generateSqlStatements(createTableCmd, alterCmd, insert), nil
}

func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
Expand Down Expand Up @@ -755,8 +740,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter, isVirtualTable bool) error {

statements, _, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {
assert.True(t, exists)
f := func(t1, t2 TableMap) {
ip := newIngestProcessorWithEmptyTableMap(fieldsMap, &config.QuesmaConfiguration{})
alter, _, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(tableName, types.MustJSON(rowToInsert), nil, hasOthersConfig, encodings)
assert.NoError(t, err)
j, err := generateInsertJson(nonSchemaFields, onlySchemaFields)
assert.NoError(t, err)
Expand Down

0 comments on commit 1759962

Please sign in to comment.