Skip to content

Commit

Permalink
Revert "Generalizing sql query processing 1 (#68)"
Browse files Browse the repository at this point in the history
This reverts commit f82655a.
  • Loading branch information
jakozaur committed May 10, 2024
1 parent 0946901 commit d55f5ae
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 41 deletions.
29 changes: 21 additions & 8 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,17 @@ func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error
return rows, err
}

// ProcessQuery - only WHERE clause
// ProcessSimpleSelectQuery - only WHERE clause
// TODO query param should be type safe Query representing all parts of
// sql statement that were already parsed and not string from which
// we have to extract again different parts like where clause and columns to build a proper result
func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
func (lm *LogManager) ProcessSelectQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
if err != nil {
return nil, err
}

resultColumns, err := table.extractColumns(query, true)
if err != nil {
return nil, err
}
rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), resultColumns, rowToScan)
rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), append(colNames, query.NonSchemaFields...), rowToScan)
if err == nil {
for _, row := range rows {
row.Index = table.Name
Expand Down Expand Up @@ -119,6 +114,24 @@ func executeQuery(ctx context.Context, lm *LogManager, tableName string, queryAs
return res, err
}

func (lm *LogManager) ProcessAutocompleteSuggestionsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, false)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields))
return executeQuery(ctx, lm, table.Name, query.String(), query.Fields, rowToScan)
}

func (lm *LogManager) ProcessGeneralAggregationQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) {
colNames, err := table.extractColumns(query, true)
if err != nil {
return nil, err
}
rowToScan := make([]interface{}, len(colNames))
return executeQuery(ctx, lm, table.Name, query.String(), colNames, rowToScan)
}

// 'selectFields' are all values that we return from the query, both columns and non-schema fields,
// like e.g. count(), or toInt8(boolField)
func read(tableName string, rows *sql.Rows, selectFields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) {
Expand Down
28 changes: 3 additions & 25 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,11 @@ func (q *Query) String() string {
func (q *Query) StringFromColumns(colNames []string) string {
var sb strings.Builder
sb.WriteString("SELECT ")
if q.IsDistinct {
sb.WriteString("DISTINCT ")
}
for i, field := range colNames {
if field == "*" || field == EmptyFieldSelection {
sb.WriteString(field)
} else {
if field != EmptyFieldSelection {
sb.WriteString(strconv.Quote(field))
} else {
sb.WriteString(field)
}
if i < len(colNames)-1 || len(q.NonSchemaFields) > 0 {
sb.WriteString(", ")
Expand All @@ -115,25 +112,6 @@ func (q *Query) StringFromColumns(colNames []string) string {
where = ""
}
sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " "))
if len(q.GroupByFields) > 0 {
sb.WriteString(" GROUP BY (")
for i, field := range q.GroupByFields {
sb.WriteString(field)
if i < len(q.GroupByFields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")

sb.WriteString(" ORDER BY (")
for i, field := range q.GroupByFields {
sb.WriteString(field)
if i < len(q.GroupByFields)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")
}
return sb.String()
}

Expand Down
14 changes: 7 additions & 7 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
fieldName = "*"
}
listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size)
hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery)
hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery)
countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
Expand Down Expand Up @@ -461,7 +461,7 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ
switch queryInfo.Typ {
case model.CountAsync:
fullQuery = queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)

case model.Facets, model.FacetsNumeric:
// queryInfo = (Facets, fieldName, Limit results, Limit last rows to look into)
Expand All @@ -471,16 +471,16 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ
case model.ListByField:
// queryInfo = (ListByField, fieldName, 0, LIMIT)
fullQuery = queryTranslator.BuildNRowsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)

case model.ListAllFields:
// queryInfo = (ListAllFields, "*", 0, LIMIT)
fullQuery = queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.I2)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)

case model.Normal:
fullQuery = queryTranslator.BuildSimpleSelectQuery(simpleQuery.Sql.Stmt, queryInfo.I2)
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery)
hits, err = q.logManager.ProcessSelectQuery(dbQueryCtx, table, fullQuery)

default:
logger.ErrorWithCtx(ctx).Msgf("unknown query type: %v, query body: %v", queryInfo.Typ, body)
Expand Down Expand Up @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega
for _, agg := range aggregations {
logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully
sqls += agg.Query.String() + "\n"
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query)
rows, err := q.logManager.ProcessGeneralAggregationQuery(dbQueryCtx, table, &agg.Query)
if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/termsenum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser
dbQueryCtx, cancel := context.WithCancel(ctx)
// TODO this will be used to cancel goroutine that is executing the query
_ = cancel
if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
if rows, err2 := qt.ClickhouseLM.ProcessAutocompleteSuggestionsQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil {
logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2)
result, err = json.Marshal(emptyTermsEnumResponse())
} else {
Expand Down

0 comments on commit d55f5ae

Please sign in to comment.