diff --git a/quesma/clickhouse/quesma_communicator.go b/quesma/clickhouse/quesma_communicator.go index 0a6e01e25..87bc3a426 100644 --- a/quesma/clickhouse/quesma_communicator.go +++ b/quesma/clickhouse/quesma_communicator.go @@ -7,6 +7,7 @@ import ( "math/rand" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" + "sort" "strings" "time" ) @@ -26,17 +27,30 @@ func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error return rows, err } -// ProcessSimpleSelectQuery - only WHERE clause +// GetAllColumns - returns all columns for a given table including non-schema fields +func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string { + columns, err := table.extractColumns(query, true) + if err != nil { + logger.Error().Msgf("Failed to extract columns from query: %v", err) + return nil + } + return columns +} + +// ProcessQuery - only WHERE clause // TODO query param should be type safe Query representing all parts of // sql statement that were already parsed and not string from which // we have to extract again different parts like where clause and columns to build a proper result -func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { +func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query, columns []string) ([]model.QueryResultRow, error) { colNames, err := table.extractColumns(query, false) + sort.Strings(colNames) + sort.Strings(columns) rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) if err != nil { return nil, err } - rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), append(colNames, query.NonSchemaFields...), rowToScan) + + rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), columns, rowToScan) if err == nil { for _, row := range rows { row.Index = table.Name @@ -45,16 +59,6 @@ func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, quer return rows, err } -// TODO add support for autocomplete for attributes, if we'll find it needed -func (lm *LogManager) ProcessFacetsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { - colNames, err := table.extractColumns(query, false) - if err != nil { - return nil, err - } - rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) - return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), []string{"key", "doc_count"}, rowToScan) -} - var random = rand.New(rand.NewSource(time.Now().UnixNano())) const slowQueryThreshold = 30 * time.Second @@ -114,24 +118,6 @@ func executeQuery(ctx context.Context, lm *LogManager, tableName string, queryAs return res, err } -func (lm *LogManager) ProcessAutocompleteSuggestionsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { - colNames, err := table.extractColumns(query, false) - if err != nil { - return nil, err - } - rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) - return executeQuery(ctx, lm, table.Name, query.String(), query.Fields, rowToScan) -} - -func (lm *LogManager) ProcessGeneralAggregationQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { - colNames, err := table.extractColumns(query, true) - if err != nil { - return nil, err - } - rowToScan := make([]interface{}, len(colNames)) - return executeQuery(ctx, lm, table.Name, query.String(), colNames, rowToScan) -} - // 'selectFields' are all values that we return from the query, both columns and non-schema fields, // like e.g. count(), or toInt8(boolField) func read(tableName string, rows *sql.Rows, selectFields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) { diff --git a/quesma/model/query.go b/quesma/model/query.go index 9797da988..0021114f6 100644 --- a/quesma/model/query.go +++ b/quesma/model/query.go @@ -91,11 +91,14 @@ func (q *Query) String() string { func (q *Query) StringFromColumns(colNames []string) string { var sb strings.Builder sb.WriteString("SELECT ") + if q.IsDistinct { + sb.WriteString("DISTINCT ") + } for i, field := range colNames { - if field != EmptyFieldSelection { - sb.WriteString(strconv.Quote(field)) - } else { + if field == "*" || field == EmptyFieldSelection { sb.WriteString(field) + } else { + sb.WriteString(strconv.Quote(field)) } if i < len(colNames)-1 || len(q.NonSchemaFields) > 0 { sb.WriteString(", ") @@ -112,6 +115,25 @@ func (q *Query) StringFromColumns(colNames []string) string { where = "" } sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " ")) + if len(q.GroupByFields) > 0 { + sb.WriteString(" GROUP BY (") + for i, field := range q.GroupByFields { + sb.WriteString(field) + if i < len(q.GroupByFields)-1 { + sb.WriteString(", ") + } + } + sb.WriteString(")") + + sb.WriteString(" ORDER BY (") + for i, field := range q.GroupByFields { + sb.WriteString(field) + if i < len(q.GroupByFields)-1 { + sb.WriteString(", ") + } + } + sb.WriteString(")") + } return sb.String() } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 7dd3767c5..cc0dfd060 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -241,14 +241,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin fieldName = "*" } listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size) - hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery) + hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery, q.logManager.GetAllColumns(table, listQuery)) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery) pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery) + countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery, q.logManager.GetAllColumns(table, listQuery)) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery) pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) @@ -461,26 +461,26 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ switch queryInfo.Typ { case model.CountAsync: fullQuery = queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) case model.Facets, model.FacetsNumeric: // queryInfo = (Facets, fieldName, Limit results, Limit last rows to look into) fullQuery = queryTranslator.BuildFacetsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessFacetsQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, []string{"key", "doc_count"}) case model.ListByField: // queryInfo = (ListByField, fieldName, 0, LIMIT) fullQuery = queryTranslator.BuildNRowsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) case model.ListAllFields: // queryInfo = (ListAllFields, "*", 0, LIMIT) fullQuery = queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) case model.Normal: fullQuery = queryTranslator.BuildSimpleSelectQuery(simpleQuery.Sql.Stmt, queryInfo.I2) - hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) default: logger.ErrorWithCtx(ctx).Msgf("unknown query type: %v, query body: %v", queryInfo.Typ, body) @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega for _, agg := range aggregations { logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully sqls += agg.Query.String() + "\n" - rows, err := q.logManager.ProcessGeneralAggregationQuery(dbQueryCtx, table, &agg.Query) + rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query, q.logManager.GetAllColumns(table, &agg.Query)) if err != nil { logger.ErrorWithCtx(ctx).Msg(err.Error()) continue diff --git a/quesma/quesma/termsenum/terms_enum.go b/quesma/quesma/termsenum/terms_enum.go index 32753d279..5dd115baa 100644 --- a/quesma/quesma/termsenum/terms_enum.go +++ b/quesma/quesma/termsenum/terms_enum.go @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser dbQueryCtx, cancel := context.WithCancel(ctx) // TODO this will be used to cancel goroutine that is executing the query _ = cancel - if rows, err2 := qt.ClickhouseLM.ProcessAutocompleteSuggestionsQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil { + if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery, qt.ClickhouseLM.GetAllColumns(qt.Table, selectQuery)); err2 != nil { logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2) result, err = json.Marshal(emptyTermsEnumResponse()) } else {