Skip to content

Commit

Permalink
Common ProcessQuery, fixing regression (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski authored May 11, 2024
1 parent 4cad0aa commit 05a852f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
48 changes: 17 additions & 31 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"sort"
"strings"
"time"
)
Expand All @@ -26,17 +27,30 @@ func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error
return rows, err
}

// ProcessSimpleSelectQuery - only WHERE clause
// GetAllColumns - returns all columns for a given table including non-schema fields
func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string {
columns, err := table.extractColumns(query, true)
if err != nil {
logger.Error().Msgf("Failed to extract columns from query: %v", err)
return nil
}
return columns
}

// ProcessQuery - only WHERE clause
// TODO query param should be type safe Query representing all parts of
// sql statement that were already parsed and not string from which
// we have to extract again different parts like where clause and columns to build a proper result
func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query, columns []string) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
sort.Strings(colNames)
sort.Strings(columns)
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
if err != nil {
return nil, err
}
rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), append(colNames, query.NonSchemaFields...), rowToScan)

rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), columns, rowToScan)
if err == nil {
for _, row := range rows {
row.Index = table.Name
Expand All @@ -45,16 +59,6 @@ func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, quer
return rows, err
}

// TODO add support for autocomplete for attributes, if we'll find it needed
func (lm *LogManager) ProcessFacetsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), []string{"key", "doc_count"}, rowToScan)
}

var random = rand.New(rand.NewSource(time.Now().UnixNano()))

const slowQueryThreshold = 30 * time.Second
Expand Down Expand Up @@ -114,24 +118,6 @@ func executeQuery(ctx context.Context, lm *LogManager, tableName string, queryAs
return res, err
}

func (lm *LogManager) ProcessAutocompleteSuggestionsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
return executeQuery(ctx, lm, table.Name, query.String(), query.Fields, rowToScan)
}

func (lm *LogManager) ProcessGeneralAggregationQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, true)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames))
return executeQuery(ctx, lm, table.Name, query.String(), colNames, rowToScan)
}

// 'selectFields' are all values that we return from the query, both columns and non-schema fields,
// like e.g. count(), or toInt8(boolField)
func read(tableName string, rows *sql.Rows, selectFields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) {
Expand Down
28 changes: 25 additions & 3 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ func (q *Query) String() string {
func (q *Query) StringFromColumns(colNames []string) string {
var sb strings.Builder
sb.WriteString("SELECT ")
if q.IsDistinct {
sb.WriteString("DISTINCT ")
}
for i, field := range colNames {
if field != EmptyFieldSelection {
sb.WriteString(strconv.Quote(field))
} else {
if field == "*" || field == EmptyFieldSelection {
sb.WriteString(field)
} else {
sb.WriteString(strconv.Quote(field))
}
if i < len(colNames)-1 || len(q.NonSchemaFields) > 0 {
sb.WriteString(", ")
Expand All @@ -112,6 +115,25 @@ func (q *Query) StringFromColumns(colNames []string) string {
where = ""
}
sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " "))
if len(q.GroupByFields) > 0 {
sb.WriteString(" GROUP BY (")
for i, field := range q.GroupByFields {
sb.WriteString(field)
if i < len(q.GroupByFields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")

sb.WriteString(" ORDER BY (")
for i, field := range q.GroupByFields {
sb.WriteString(field)
if i < len(q.GroupByFields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")
}
return sb.String()
}

Expand Down
16 changes: 8 additions & 8 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
fieldName = "*"
}
listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size)
hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery)
hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery, q.logManager.GetAllColumns(table, listQuery))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery)
countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery, q.logManager.GetAllColumns(table, listQuery))
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
Expand Down Expand Up @@ -461,26 +461,26 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ
switch queryInfo.Typ {
case model.CountAsync:
fullQuery = queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

case model.Facets, model.FacetsNumeric:
// queryInfo = (Facets, fieldName, Limit results, Limit last rows to look into)
fullQuery = queryTranslator.BuildFacetsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessFacetsQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, []string{"key", "doc_count"})

case model.ListByField:
// queryInfo = (ListByField, fieldName, 0, LIMIT)
fullQuery = queryTranslator.BuildNRowsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

case model.ListAllFields:
// queryInfo = (ListAllFields, "*", 0, LIMIT)
fullQuery = queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

case model.Normal:
fullQuery = queryTranslator.BuildSimpleSelectQuery(simpleQuery.Sql.Stmt, queryInfo.I2)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery))

default:
logger.ErrorWithCtx(ctx).Msgf("unknown query type: %v, query body: %v", queryInfo.Typ, body)
Expand Down Expand Up @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega
for _, agg := range aggregations {
logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully
sqls += agg.Query.String() + "\n"
rows, err := q.logManager.ProcessGeneralAggregationQuery(dbQueryCtx, table, &agg.Query)
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query, q.logManager.GetAllColumns(table, &agg.Query))
if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/termsenum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser
dbQueryCtx, cancel := context.WithCancel(ctx)
// TODO this will be used to cancel goroutine that is executing the query
_ = cancel
if rows, err2 := qt.ClickhouseLM.ProcessAutocompleteSuggestionsQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery, qt.ClickhouseLM.GetAllColumns(qt.Table, selectQuery)); err2 != nil {
logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2)
result, err = json.Marshal(emptyTermsEnumResponse())
} else {
Expand Down

0 comments on commit 05a852f

Please sign in to comment.