From 8cbb01b2f5593faae237fb3cc75d370f2ddbc1ee Mon Sep 17 00:00:00 2001 From: Przemek Delewski Date: Thu, 9 May 2024 10:37:57 +0200 Subject: [PATCH] Generalizing sql query processing --- quesma/clickhouse/quesma_communicator.go | 26 +++++++--------------- quesma/model/query.go | 28 +++++++++++++++++++++--- quesma/quesma/search.go | 2 +- quesma/quesma/termsenum/terms_enum.go | 2 +- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/quesma/clickhouse/quesma_communicator.go b/quesma/clickhouse/quesma_communicator.go index 0a6e01e25..668d77d83 100644 --- a/quesma/clickhouse/quesma_communicator.go +++ b/quesma/clickhouse/quesma_communicator.go @@ -36,13 +36,11 @@ func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, quer if err != nil { return nil, err } - rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), append(colNames, query.NonSchemaFields...), rowToScan) - if err == nil { - for _, row := range rows { - row.Index = table.Name - } + resultColumns, err := table.extractColumns(query, true) + if err != nil { + return nil, err } - return rows, err + return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), resultColumns, rowToScan) } // TODO add support for autocomplete for attributes, if we'll find it needed @@ -114,22 +112,14 @@ func executeQuery(ctx context.Context, lm *LogManager, tableName string, queryAs return res, err } -func (lm *LogManager) ProcessAutocompleteSuggestionsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { +func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { colNames, err := table.extractColumns(query, false) if err != nil { return nil, err } - rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) - return executeQuery(ctx, lm, table.Name, query.String(), query.Fields, rowToScan) -} - -func (lm *LogManager) ProcessGeneralAggregationQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { - colNames, err := table.extractColumns(query, true) - if err != nil { - return nil, err - } - rowToScan := make([]interface{}, len(colNames)) - return executeQuery(ctx, lm, table.Name, query.String(), colNames, rowToScan) + rowToScanLen := len(colNames) + len(query.NonSchemaFields) + rowToScan := make([]interface{}, rowToScanLen) + return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), colNames, rowToScan) } // 'selectFields' are all values that we return from the query, both columns and non-schema fields, diff --git a/quesma/model/query.go b/quesma/model/query.go index 9797da988..0021114f6 100644 --- a/quesma/model/query.go +++ b/quesma/model/query.go @@ -91,11 +91,14 @@ func (q *Query) String() string { func (q *Query) StringFromColumns(colNames []string) string { var sb strings.Builder sb.WriteString("SELECT ") + if q.IsDistinct { + sb.WriteString("DISTINCT ") + } for i, field := range colNames { - if field != EmptyFieldSelection { - sb.WriteString(strconv.Quote(field)) - } else { + if field == "*" || field == EmptyFieldSelection { sb.WriteString(field) + } else { + sb.WriteString(strconv.Quote(field)) } if i < len(colNames)-1 || len(q.NonSchemaFields) > 0 { sb.WriteString(", ") @@ -112,6 +115,25 @@ func (q *Query) StringFromColumns(colNames []string) string { where = "" } sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " ")) + if len(q.GroupByFields) > 0 { + sb.WriteString(" GROUP BY (") + for i, field := range q.GroupByFields { + sb.WriteString(field) + if i < len(q.GroupByFields)-1 { + sb.WriteString(", ") + } + } + sb.WriteString(")") + + sb.WriteString(" ORDER BY (") + for i, field := range q.GroupByFields { + sb.WriteString(field) + if i < len(q.GroupByFields)-1 { + sb.WriteString(", ") + } + } + sb.WriteString(")") + } return sb.String() } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 7dd3767c5..6c95ce4a5 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega for _, agg := range aggregations { logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully sqls += agg.Query.String() + "\n" - rows, err := q.logManager.ProcessGeneralAggregationQuery(dbQueryCtx, table, &agg.Query) + rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query) if err != nil { logger.ErrorWithCtx(ctx).Msg(err.Error()) continue diff --git a/quesma/quesma/termsenum/terms_enum.go b/quesma/quesma/termsenum/terms_enum.go index 32753d279..97f236830 100644 --- a/quesma/quesma/termsenum/terms_enum.go +++ b/quesma/quesma/termsenum/terms_enum.go @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser dbQueryCtx, cancel := context.WithCancel(ctx) // TODO this will be used to cancel goroutine that is executing the query _ = cancel - if rows, err2 := qt.ClickhouseLM.ProcessAutocompleteSuggestionsQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil { + if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil { logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2) result, err = json.Marshal(emptyTermsEnumResponse()) } else {