diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index 0949bfc36..ee58f3448 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -548,11 +548,14 @@ func generateInsertJson(nonSchemaFields []NonSchemaField, onlySchemaFields types return fmt.Sprintf("{%s%s%s", nonSchemaStr, comma, schemaFieldsJson[1:]), err } -func generateSqlStatements(createTableCmd string, alterCmd []string, insert string) []string { +func generateSqlStatements(createTableCmd string, creatBufferCmd string, alterCmd []string, insert string) []string { var statements []string if createTableCmd != "" { statements = append(statements, createTableCmd) } + if creatBufferCmd != "" { + statements = append(statements, creatBufferCmd) + } statements = append(statements, alterCmd...) statements = append(statements, insert) return statements @@ -622,6 +625,9 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, table := ip.FindTable(tableName) var tableConfig *chLib.ChTableConfig var createTableCmd string + bufferName := fmt.Sprintf("quesma_buffer_%s", tableName) + var createBufferCmd string + if table == nil { tableConfig = NewOnlySchemaFieldsCHConfig() columnsFromJson := JsonToColumns(transformedJsons[0], tableConfig) @@ -645,10 +651,16 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err) return nil, err } + + createBufferCmd = fmt.Sprintf(`CREATE TABLE %s AS %s ENGINE = Buffer(currentDatabase(), %s, 1, 10, 100, 10000, 1000000, 10000000, 100000000)`, bufferName, tableName, tableName) + // Set pointer to table after creating it table = ip.FindTable(tableName) } else if !table.Created { createTableCmd = table.CreateTableString() + + createBufferCmd = fmt.Sprintf(`CREATE TABLE %s AS %s ENGINE = Buffer(currentDatabase(), %s, 1, 10, 100, 10000, 1000000, 10000000, 100000000)`, bufferName, tableName, tableName) + } if table == nil { return nil, fmt.Errorf("table %s not found", tableName) @@ -681,9 +693,9 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, } insertValues := strings.Join(jsonsReadyForInsertion, ", ") - insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues) + insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", bufferName, insertValues) - return generateSqlStatements(createTableCmd, alterCmd, insert), nil + return generateSqlStatements(createTableCmd, createBufferCmd, alterCmd, insert), nil } func (lm *IngestProcessor) Ingest(ctx context.Context, indexName string, jsonData []types.JSON) error { diff --git a/quesma/ingest/processor2.go b/quesma/ingest/processor2.go index 8f6c6dc98..b115f1515 100644 --- a/quesma/ingest/processor2.go +++ b/quesma/ingest/processor2.go @@ -430,7 +430,7 @@ func (ip *IngestProcessor2) processInsertQuery(ctx context.Context, insertValues := strings.Join(jsonsReadyForInsertion, ", ") insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues) - return generateSqlStatements(createTableCmd, alterCmd, insert), nil + return generateSqlStatements(createTableCmd, "", alterCmd, insert), nil } func (lm *IngestProcessor2) Ingest(ctx context.Context, tableName string, jsonData []types.JSON) error { diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index e25d90081..c5e908b09 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -779,7 +779,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { } conf.EnableIngest = true - conf.IngestStatistics = true + conf.IngestStatistics = c.IngestStatistics for indexName, indexConfig := range ingestProcessor.Config.IndexConfig { processedConfig, found := conf.IndexConfig[indexName]